Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 16:41:31 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 17:04:54 -0500
commitc5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead (patch)
tree65d696c6a0c6a339f5d43ae05a5e0b5b4b67f513 /src
parentde94f69e8f34553a58de23ab6b16fda62838e44d (diff)
downloadliza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.tar.gz
liza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.tar.bz2
liza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.zip
[DEV-5312] Add interface for amqp publisher and implement a delta publisher
Diffstat (limited to 'src')
-rw-r--r--src/server/db/MongoServerDao.js482
-rw-r--r--src/system/AmqpPublisher.ts42
-rw-r--r--src/system/DeltaProcessor.ts14
-rw-r--r--src/system/DeltaPublisher.ts133
4 files changed, 179 insertions, 492 deletions
diff --git a/src/server/db/MongoServerDao.js b/src/server/db/MongoServerDao.js
deleted file mode 100644
index 34878c4..0000000
--- a/src/server/db/MongoServerDao.js
+++ /dev/null
@@ -1,482 +0,0 @@
-"use strict";
-/**
- * Mongo DB DAO for program server
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of the Liza Data Collection Framework.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-var __extends = (this && this.__extends) || (function () {
- var extendStatics = function (d, b) {
- extendStatics = Object.setPrototypeOf ||
- ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
- function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
- return extendStatics(d, b);
- };
- return function (d, b) {
- extendStatics(d, b);
- function __() { this.constructor = d; }
- d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
- };
-})();
-Object.defineProperty(exports, "__esModule", { value: true });
-var EventEmitter = require('events').EventEmitter;
-/**
- * Uses MongoDB as a data store
- */
-var MongoServerDao = /** @class */ (function (_super) {
- __extends(MongoServerDao, _super);
- /**
- * Initializes DAO
- *
- * @param {Mongo.Db} db mongo database connection
- */
- function MongoServerDao(_db) {
- var _this = _super.call(this) || this;
- _this._db = _db;
- /** Collection used to store quotes */
- _this.COLLECTION = 'quotes';
- /** Sequence (auto-increment) collection */
- _this.COLLECTION_SEQ = 'seq';
- /** Sequence key for quote ids */
- _this.SEQ_QUOTE_ID = 'quoteId';
- /** Sequence quoteId default */
- _this.SEQ_QUOTE_ID_DEFAULT = 200000;
- /** Whether the DAO is initialized and ready to be used */
- _this._ready = false;
- return _this;
- }
- /**
- * Initializes error events and attempts to connect to the database
- *
- * connectError event will be emitted on failure.
- *
- * @param Function callback function to call when connection is complete
- * (will not be called if connection fails)
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.init = function (callback) {
- var dao = this;
- // map db error event (on connection error) to our connectError event
- this._db.on('error', function (err) {
- dao._ready = false;
- dao._collection = null;
- dao.emit('connectError', err);
- });
- this.connect(callback);
- return this;
- };
- /**
- * Attempts to connect to the database
- *
- * connectError event will be emitted on failure.
- *
- * @param Function callback function to call when connection is complete
- * (will not be called if connection fails)
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.connect = function (callback) {
- var dao = this;
- // attempt to connect to the database
- this._db.open(function (err, db) {
- // if there was an error, don't bother with anything else
- if (err) {
- // in some circumstances, it may just be telling us that we're
- // already connected (even though the connection may have been
- // broken)
- if (err.errno !== undefined) {
- dao.emit('connectError', err);
- return;
- }
- }
- var ready_count = 0;
- var check_ready = function () {
- if (++ready_count < 2) {
- return;
- }
- // we're ready to roll!
- dao._ready = true;
- dao.emit('ready');
- // connection was successful; call the callback
- if (callback instanceof Function) {
- callback.call(dao);
- }
- };
- // quotes collection
- db.collection(dao.COLLECTION, function (_err, collection) {
- // for some reason this gets called more than once
- if (collection == null) {
- return;
- }
- // initialize indexes
- collection.createIndex([['id', 1]], true, function (_err, _index) {
- // mark the DAO as ready to be used
- dao._collection = collection;
- check_ready();
- });
- });
- // seq collection
- db.collection(dao.COLLECTION_SEQ, function (err, collection) {
- if (err) {
- dao.emit('seqError', err);
- return;
- }
- if (collection == null) {
- return;
- }
- dao._seqCollection = collection;
- // has the sequence we'll be referencing been initialized?
- collection.find({ _id: dao.SEQ_QUOTE_ID }, { limit: 1 }, function (err, cursor) {
- if (err) {
- dao._initQuoteIdSeq(check_ready);
- return;
- }
- cursor.toArray(function (_err, data) {
- if (data.length == 0) {
- dao._initQuoteIdSeq(check_ready);
- return;
- }
- check_ready();
- });
- });
- });
- });
- return this;
- };
- MongoServerDao.prototype._initQuoteIdSeq = function (callback) {
- var dao = this;
- this._seqCollection.insert({
- _id: this.SEQ_QUOTE_ID,
- val: this.SEQ_QUOTE_ID_DEFAULT,
- }, function (err, _docs) {
- if (err) {
- dao.emit('seqError', err);
- return;
- }
- dao.emit('seqInit', dao.SEQ_QUOTE_ID);
- callback.call(dao);
- });
- };
- /**
- * Saves a quote to the database
- *
- * A full save will include all metadata. This should not cause any
- * problems with race conditions for pending Data API calls on meta
- * fields because those results write to individual indexes and do not
- * rely on existing data.
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- * @param Object save_data quote data to save (optional)
- * @param Object push_data quote data to push (optional)
- */
- MongoServerDao.prototype.saveQuote = function (quote, success_callback, failure_callback, save_data, push_data) {
- var dao = this;
- var meta = {};
- // if we're not ready, then we can't save the quote!
- if (this._ready === false) {
- this.emit('saveQuoteError', { message: 'Database server not ready' }, Error('Database not ready'), quote);
- failure_callback.call(this, quote);
- return dao;
- }
- if (save_data === undefined) {
- save_data = {
- data: quote.getBucket().getData(),
- };
- // full save will include all metadata
- meta = quote.getMetabucket().getData();
- }
- var id = quote.getId();
- // some data should always be saved because the quote will be created if
- // it does not yet exist
- save_data.id = id;
- save_data.pver = quote.getProgramVersion();
- save_data.importDirty = 1;
- save_data.lastPremDate = quote.getLastPremiumDate();
- save_data.initialRatedDate = quote.getRatedDate();
- save_data.explicitLock = quote.getExplicitLockReason();
- save_data.explicitLockStepId = quote.getExplicitLockStep();
- save_data.importedInd = +quote.isImported();
- save_data.boundInd = +quote.isBound();
- save_data.lastUpdate = Math.round((new Date()).getTime() / 1000);
- // meta will eventually take over for much of the above data
- meta.liza_timestamp_initial_rated = [quote.getRatedDate()];
- // save the stack so we can track this call via the oplog
- save_data._stack = (new Error()).stack;
- // avoid wiping out other metadata (since this may not be a full set)
- Object.keys(meta).forEach(function (key) { return save_data['meta.' + key] = meta[key]; });
- // do not push empty objects
- var document = (!push_data || !Object.keys(push_data).length)
- ? { '$set': save_data }
- : { '$set': save_data, '$push': push_data };
- // update the quote data if it already exists (same id), otherwise
- // insert it
- this._collection.update({ id: id }, document,
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err, _docs) {
- // if an error occurred, then we cannot continue
- if (err) {
- dao.emit('saveQuoteError', err, quote);
- // let the caller handle the error
- if (failure_callback instanceof Function) {
- failure_callback.call(dao, quote);
- }
- return;
- }
- // successful
- if (success_callback instanceof Function) {
- success_callback.call(dao, quote);
- }
- });
- return this;
- };
- /**
- * Merges quote data with the existing (rather than overwriting)
- *
- * @param {Quote} quote quote to save
- * @param {Object} data quote data
- * @param {Function} scallback successful callback
- * @param {Function} fcallback failure callback
- */
- MongoServerDao.prototype.mergeData = function (quote, data, scallback, fcallback) {
- // we do not want to alter the original data; use it as a prototype
- var update = data;
- // save the stack so we can track this call via the oplog
- var _self = this;
- this._collection.update({ id: quote.getId() }, { '$set': update }, {}, function (err, _docs) {
- if (err) {
- _self.emit('saveQuoteError', err, quote);
- if (typeof fcallback === 'function') {
- fcallback(quote);
- }
- return;
- }
- if (typeof scallback === 'function') {
- scallback(quote);
- }
- });
- return this;
- };
- /**
- * Merges bucket data with the existing bucket (rather than overwriting the
- * entire bucket)
- *
- * @param {Quote} quote quote to save
- * @param {Object} data bucket data
- * @param {Function} scallback successful callback
- * @param {Function} fcallback failure callback
- *
- * @return {MongoServerDao} self
- */
- MongoServerDao.prototype.mergeBucket = function (quote, data, success, failure) {
- var update = {};
- for (var field in data) {
- if (!field) {
- continue;
- }
- update['data.' + field] = data[field];
- }
- return this.mergeData(quote, update, success, failure);
- };
- /**
- * Saves the quote state to the database
- *
- * The quote state includes the current step, the top visited step and the
- * explicit lock message.
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- *
- * @return MongoServerDao self
- */
- MongoServerDao.prototype.saveQuoteState = function (quote, success_callback, failure_callback) {
- var update = {
- currentStepId: quote.getCurrentStepId(),
- topVisitedStepId: quote.getTopVisitedStepId(),
- topSavedStepId: quote.getTopSavedStepId(),
- };
- return this.mergeData(quote, update, success_callback, failure_callback);
- };
- MongoServerDao.prototype.saveQuoteClasses = function (quote, classes, success, failure) {
- return this.mergeData(quote, { classData: classes }, success, failure);
- };
- /**
- * Save document metadata (meta field on document)
- *
- * Only the provided indexes will be modified (that is---data will be
- * merged with what is already in the database).
- *
- * @param {Quote} quote destination quote
- * @param {Object} new_meta bucket-formatted data to write
- * @param {Function} success callback on success
- * @param {Function} failure callback on error
- *
- * @return {undefined}
- */
- MongoServerDao.prototype.saveQuoteMeta = function (quote, new_meta, success, failure) {
- var update = {};
- for (var key in new_meta) {
- var meta = new_meta[key];
- for (var i in meta) {
- update['meta.' + key + '.' + i] = new_meta[key][i];
- }
- }
- this.mergeData(quote, update, success, failure);
- };
- /**
- * Saves the quote lock state to the database
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- *
- * @return MongoServerDao self
- */
- MongoServerDao.prototype.saveQuoteLockState = function (quote, success_callback, failure_callback) {
- // lock state is saved by default
- return this.saveQuote(quote, success_callback, failure_callback, {});
- };
- /**
- * Pulls quote data from the database
- *
- * @param Integer quote_id id of quote
- * @param Function( data ) callback function to call when data is available
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.pullQuote = function (quote_id, callback) {
- var dao = this;
- // XXX: TODO: Do not read whole of record into memory; filter out
- // revisions!
- this._collection.find({ id: quote_id }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if (data.length == 0) {
- callback.call(dao, null);
- return;
- }
- // return the quote data
- callback.call(dao, data[0]);
- });
- });
- return this;
- };
- MongoServerDao.prototype.getMinQuoteId = function (callback) {
- // just in case it's asynchronous later on
- callback.call(this, this.SEQ_QUOTE_ID_DEFAULT);
- return this;
- };
- MongoServerDao.prototype.getMaxQuoteId = function (callback) {
- var dao = this;
- this._seqCollection.find({ _id: this.SEQ_QUOTE_ID }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- if (data.length == 0) {
- callback.call(dao, 0);
- return;
- }
- // return the max quote id
- callback.call(dao, data[0].val);
- });
- });
- };
- MongoServerDao.prototype.getNextQuoteId = function (callback) {
- var dao = this;
- this._seqCollection.findAndModify({ _id: this.SEQ_QUOTE_ID }, [['val', 'descending']], { $inc: { val: 1 } }, { 'new': true }, function (err, doc) {
- if (err) {
- dao.emit('seqError', err);
- callback.call(dao, 0);
- return;
- }
- // return the new id
- callback.call(dao, doc.val);
- });
- return this;
- };
- /**
- * Create a new revision with the provided quote data
- *
- * The revision will contain the whole the quote. If space is a concern, we
- * can (in the future) calculate a delta instead (Mike recommends the Git
- * model of storing the deltas in previous revisions and the whole of the
- * bucket in the most recently created revision).
- */
- MongoServerDao.prototype.createRevision = function (quote, callback) {
- var _self = this, qid = quote.getId(), data = quote.getBucket().getData();
- this._collection.update({ id: qid }, { '$push': { revisions: { data: data } } },
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err) {
- if (err) {
- _self.emit('mkrevError', err);
- }
- callback(err);
- return;
- });
- };
- MongoServerDao.prototype.getRevision = function (quote, revid, callback) {
- revid = +revid;
- // XXX: TODO: Filter out all but the revision we want
- this._collection.find({ id: quote.getId() }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if ((data.length === 0)
- || (data[0].revisions.length < (revid + 1))) {
- callback(null);
- return;
- }
- // return the quote data
- callback(data[0].revisions[revid]);
- });
- });
- };
- MongoServerDao.prototype.setWorksheets = function (qid, data, callback) {
- this._collection.update({ id: qid }, { '$set': { worksheets: { data: data } } },
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err) {
- callback(err);
- return;
- });
- };
- MongoServerDao.prototype.getWorksheet = function (qid, supplier, index, callback) {
- this._collection.find({ id: qid }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if ((data.length === 0)
- || (!data[0].worksheets)
- || (!data[0].worksheets.data)
- || (!data[0].worksheets.data[supplier])) {
- callback(null);
- return;
- }
- // return the quote data
- callback(data[0].worksheets.data[supplier][index]);
- });
- });
- };
- return MongoServerDao;
-}(EventEmitter));
-exports.MongoServerDao = MongoServerDao;
-;
-//# sourceMappingURL=data:application/json;base64, \ No newline at end of file
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
new file mode 100644
index 0000000..bfd6dc3
--- /dev/null
+++ b/src/system/AmqpPublisher.ts
@@ -0,0 +1,42 @@
+/**
+ * Amqp Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish Amqp message to a queue
+ */
+
+import { DeltaResult } from "../bucket/delta";
+import { Options } from 'amqplib';
+
+
+export interface AmqpConfig extends Options.Connect {
+ /** The name of a queue or exchange to publish to */
+ exchange: string;
+}
+
+
+export interface AmqpPublisher
+{
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param delta - The delta to publish
+ */
+ publish( delta: DeltaResult<any> ): void;
+}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 6103f20..67e372d 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -23,6 +23,7 @@ import { DeltaDao } from "../system/db/DeltaDao";
import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
+import { AmqpPublisher } from "./AmqpPublisher";
/**
@@ -36,12 +37,6 @@ export class DeltaProcessor
/** The data delta type */
readonly DELTA_DATA: MongoDeltaType = 'data';
- /** A mapping of which delta type translated to which avro event */
- readonly DELTA_MAP: Record<string, string> = {
- DELTA_RATEDATA: 'rate',
- DELTA_DATA: 'update',
- };
-
/**
* Initialize processor
@@ -49,7 +44,8 @@ export class DeltaProcessor
* @param _collection Mongo collection
*/
constructor(
- private readonly _dao: DeltaDao,
+ private readonly _dao: DeltaDao,
+ private readonly _publisher: AmqpPublisher,
) {}
@@ -68,9 +64,7 @@ export class DeltaProcessor
deltas.forEach( delta => {
- // TODO: publish delta
- // publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
- console.log( delta, self.DELTA_MAP[ delta.type ] );
+ self._publisher.publish( delta );
});
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
new file mode 100644
index 0000000..2606c56
--- /dev/null
+++ b/src/system/DeltaPublisher.ts
@@ -0,0 +1,133 @@
+/**
+ * Delta Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish delta message to a queue
+ */
+
+import { AmqpPublisher } from "./AmqpPublisher";
+import { DeltaResult } from "../bucket/delta";
+import {
+ connect as amqpConnect,
+ Options,
+ Channel
+} from 'amqplib';
+
+
+export interface AmqpConfig extends Options.Connect {
+ /** The name of a queue or exchange to publish to */
+ exchange: string;
+}
+
+
+export class DeltaPublisher implements AmqpPublisher
+{
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record<string, string> = {
+ data: 'rate',
+ ratedata: 'update',
+ };
+
+
+ /**
+ * Initialize trait
+ *
+ * @param {Object} conf AMQP configuration
+ * @param {DebugLog} logger logger instance
+ */
+ constructor(
+ private readonly _conf: AmqpConfig,
+ private readonly _logger: any
+ ) {}
+
+
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param delta - The delta to publish
+ */
+ publish( delta: DeltaResult<any> ): void
+ {
+ // check both as we transition from one to the other
+ const exchange = this._conf.exchange;
+
+ amqpConnect( this._conf )
+ .then( conn =>
+ {
+ setTimeout( () => conn.close(), 10000 );
+ return conn.createChannel();
+ } )
+ .then( ch => {
+ ch.assertExchange( exchange, 'fanout', { durable: true } );
+
+ return this._sendMessage( ch, exchange, delta );
+ } )
+ .then( () => this._logger.log(
+ this._logger.PRIORITY_INFO,
+ "Published " + delta.type + " delta with timestamp '" +
+ delta.timestamp + "' to quote-update exchange '"+
+ exchange + "'"
+ ) )
+ .catch( e => this._logger.log(
+ this._logger.PRIORITY_ERROR,
+ "Error publishing " + delta.type + " delta with timestamp '" +
+ delta.timestamp + "' to quote-update exchange '"+
+ exchange + "'" + ": " + e
+ ) );
+ }
+
+
+ /**
+ * Send message to exchange
+ *
+ * @param channel - AMQP channel
+ * @param exchange - exchange name
+ * @param delta - The delta to publish
+ *
+ * @return whether publish was successful
+ */
+ _sendMessage(
+ channel: Channel,
+ exchange: string,
+ delta: DeltaResult<any>,
+ ): boolean
+ {
+ const headers = {
+ version: 1,
+ created: Date.now(),
+ };
+
+ const event_id = this.DELTA_MAP[ delta.type ];
+
+ const data = new Buffer( JSON.stringify( {
+ delta: delta,
+ event: event_id,
+ } ) );
+
+ // we don't use a routing key; fanout exchange
+ const routing_key = '';
+
+ return channel.publish(
+ exchange,
+ routing_key,
+ data,
+ { headers: headers },
+ );
+ }
+}