Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 16:41:31 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 17:04:54 -0500
commitc5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead (patch)
tree65d696c6a0c6a339f5d43ae05a5e0b5b4b67f513 /src
parentde94f69e8f34553a58de23ab6b16fda62838e44d (diff)
downloadliza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.tar.gz
liza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.tar.bz2
liza-c5733d1dfff5bd71ecfc344dc5c7021c5bbc7ead.zip
[DEV-5312] Add interface for amqp publisher and implement a delta publisher
Diffstat (limited to 'src')
-rw-r--r--src/server/db/MongoServerDao.js482
-rw-r--r--src/system/AmqpPublisher.ts42
-rw-r--r--src/system/DeltaProcessor.ts14
-rw-r--r--src/system/DeltaPublisher.ts133
4 files changed, 179 insertions, 492 deletions
diff --git a/src/server/db/MongoServerDao.js b/src/server/db/MongoServerDao.js
deleted file mode 100644
index 34878c4..0000000
--- a/src/server/db/MongoServerDao.js
+++ /dev/null
@@ -1,482 +0,0 @@
-"use strict";
-/**
- * Mongo DB DAO for program server
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of the Liza Data Collection Framework.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-var __extends = (this && this.__extends) || (function () {
- var extendStatics = function (d, b) {
- extendStatics = Object.setPrototypeOf ||
- ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
- function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
- return extendStatics(d, b);
- };
- return function (d, b) {
- extendStatics(d, b);
- function __() { this.constructor = d; }
- d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
- };
-})();
-Object.defineProperty(exports, "__esModule", { value: true });
-var EventEmitter = require('events').EventEmitter;
-/**
- * Uses MongoDB as a data store
- */
-var MongoServerDao = /** @class */ (function (_super) {
- __extends(MongoServerDao, _super);
- /**
- * Initializes DAO
- *
- * @param {Mongo.Db} db mongo database connection
- */
- function MongoServerDao(_db) {
- var _this = _super.call(this) || this;
- _this._db = _db;
- /** Collection used to store quotes */
- _this.COLLECTION = 'quotes';
- /** Sequence (auto-increment) collection */
- _this.COLLECTION_SEQ = 'seq';
- /** Sequence key for quote ids */
- _this.SEQ_QUOTE_ID = 'quoteId';
- /** Sequence quoteId default */
- _this.SEQ_QUOTE_ID_DEFAULT = 200000;
- /** Whether the DAO is initialized and ready to be used */
- _this._ready = false;
- return _this;
- }
- /**
- * Initializes error events and attempts to connect to the database
- *
- * connectError event will be emitted on failure.
- *
- * @param Function callback function to call when connection is complete
- * (will not be called if connection fails)
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.init = function (callback) {
- var dao = this;
- // map db error event (on connection error) to our connectError event
- this._db.on('error', function (err) {
- dao._ready = false;
- dao._collection = null;
- dao.emit('connectError', err);
- });
- this.connect(callback);
- return this;
- };
- /**
- * Attempts to connect to the database
- *
- * connectError event will be emitted on failure.
- *
- * @param Function callback function to call when connection is complete
- * (will not be called if connection fails)
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.connect = function (callback) {
- var dao = this;
- // attempt to connect to the database
- this._db.open(function (err, db) {
- // if there was an error, don't bother with anything else
- if (err) {
- // in some circumstances, it may just be telling us that we're
- // already connected (even though the connection may have been
- // broken)
- if (err.errno !== undefined) {
- dao.emit('connectError', err);
- return;
- }
- }
- var ready_count = 0;
- var check_ready = function () {
- if (++ready_count < 2) {
- return;
- }
- // we're ready to roll!
- dao._ready = true;
- dao.emit('ready');
- // connection was successful; call the callback
- if (callback instanceof Function) {
- callback.call(dao);
- }
- };
- // quotes collection
- db.collection(dao.COLLECTION, function (_err, collection) {
- // for some reason this gets called more than once
- if (collection == null) {
- return;
- }
- // initialize indexes
- collection.createIndex([['id', 1]], true, function (_err, _index) {
- // mark the DAO as ready to be used
- dao._collection = collection;
- check_ready();
- });
- });
- // seq collection
- db.collection(dao.COLLECTION_SEQ, function (err, collection) {
- if (err) {
- dao.emit('seqError', err);
- return;
- }
- if (collection == null) {
- return;
- }
- dao._seqCollection = collection;
- // has the sequence we'll be referencing been initialized?
- collection.find({ _id: dao.SEQ_QUOTE_ID }, { limit: 1 }, function (err, cursor) {
- if (err) {
- dao._initQuoteIdSeq(check_ready);
- return;
- }
- cursor.toArray(function (_err, data) {
- if (data.length == 0) {
- dao._initQuoteIdSeq(check_ready);
- return;
- }
- check_ready();
- });
- });
- });
- });
- return this;
- };
- MongoServerDao.prototype._initQuoteIdSeq = function (callback) {
- var dao = this;
- this._seqCollection.insert({
- _id: this.SEQ_QUOTE_ID,
- val: this.SEQ_QUOTE_ID_DEFAULT,
- }, function (err, _docs) {
- if (err) {
- dao.emit('seqError', err);
- return;
- }
- dao.emit('seqInit', dao.SEQ_QUOTE_ID);
- callback.call(dao);
- });
- };
- /**
- * Saves a quote to the database
- *
- * A full save will include all metadata. This should not cause any
- * problems with race conditions for pending Data API calls on meta
- * fields because those results write to individual indexes and do not
- * rely on existing data.
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- * @param Object save_data quote data to save (optional)
- * @param Object push_data quote data to push (optional)
- */
- MongoServerDao.prototype.saveQuote = function (quote, success_callback, failure_callback, save_data, push_data) {
- var dao = this;
- var meta = {};
- // if we're not ready, then we can't save the quote!
- if (this._ready === false) {
- this.emit('saveQuoteError', { message: 'Database server not ready' }, Error('Database not ready'), quote);
- failure_callback.call(this, quote);
- return dao;
- }
- if (save_data === undefined) {
- save_data = {
- data: quote.getBucket().getData(),
- };
- // full save will include all metadata
- meta = quote.getMetabucket().getData();
- }
- var id = quote.getId();
- // some data should always be saved because the quote will be created if
- // it does not yet exist
- save_data.id = id;
- save_data.pver = quote.getProgramVersion();
- save_data.importDirty = 1;
- save_data.lastPremDate = quote.getLastPremiumDate();
- save_data.initialRatedDate = quote.getRatedDate();
- save_data.explicitLock = quote.getExplicitLockReason();
- save_data.explicitLockStepId = quote.getExplicitLockStep();
- save_data.importedInd = +quote.isImported();
- save_data.boundInd = +quote.isBound();
- save_data.lastUpdate = Math.round((new Date()).getTime() / 1000);
- // meta will eventually take over for much of the above data
- meta.liza_timestamp_initial_rated = [quote.getRatedDate()];
- // save the stack so we can track this call via the oplog
- save_data._stack = (new Error()).stack;
- // avoid wiping out other metadata (since this may not be a full set)
- Object.keys(meta).forEach(function (key) { return save_data['meta.' + key] = meta[key]; });
- // do not push empty objects
- var document = (!push_data || !Object.keys(push_data).length)
- ? { '$set': save_data }
- : { '$set': save_data, '$push': push_data };
- // update the quote data if it already exists (same id), otherwise
- // insert it
- this._collection.update({ id: id }, document,
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err, _docs) {
- // if an error occurred, then we cannot continue
- if (err) {
- dao.emit('saveQuoteError', err, quote);
- // let the caller handle the error
- if (failure_callback instanceof Function) {
- failure_callback.call(dao, quote);
- }
- return;
- }
- // successful
- if (success_callback instanceof Function) {
- success_callback.call(dao, quote);
- }
- });
- return this;
- };
- /**
- * Merges quote data with the existing (rather than overwriting)
- *
- * @param {Quote} quote quote to save
- * @param {Object} data quote data
- * @param {Function} scallback successful callback
- * @param {Function} fcallback failure callback
- */
- MongoServerDao.prototype.mergeData = function (quote, data, scallback, fcallback) {
- // we do not want to alter the original data; use it as a prototype
- var update = data;
- // save the stack so we can track this call via the oplog
- var _self = this;
- this._collection.update({ id: quote.getId() }, { '$set': update }, {}, function (err, _docs) {
- if (err) {
- _self.emit('saveQuoteError', err, quote);
- if (typeof fcallback === 'function') {
- fcallback(quote);
- }
- return;
- }
- if (typeof scallback === 'function') {
- scallback(quote);
- }
- });
- return this;
- };
- /**
- * Merges bucket data with the existing bucket (rather than overwriting the
- * entire bucket)
- *
- * @param {Quote} quote quote to save
- * @param {Object} data bucket data
- * @param {Function} scallback successful callback
- * @param {Function} fcallback failure callback
- *
- * @return {MongoServerDao} self
- */
- MongoServerDao.prototype.mergeBucket = function (quote, data, success, failure) {
- var update = {};
- for (var field in data) {
- if (!field) {
- continue;
- }
- update['data.' + field] = data[field];
- }
- return this.mergeData(quote, update, success, failure);
- };
- /**
- * Saves the quote state to the database
- *
- * The quote state includes the current step, the top visited step and the
- * explicit lock message.
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- *
- * @return MongoServerDao self
- */
- MongoServerDao.prototype.saveQuoteState = function (quote, success_callback, failure_callback) {
- var update = {
- currentStepId: quote.getCurrentStepId(),
- topVisitedStepId: quote.getTopVisitedStepId(),
- topSavedStepId: quote.getTopSavedStepId(),
- };
- return this.mergeData(quote, update, success_callback, failure_callback);
- };
- MongoServerDao.prototype.saveQuoteClasses = function (quote, classes, success, failure) {
- return this.mergeData(quote, { classData: classes }, success, failure);
- };
- /**
- * Save document metadata (meta field on document)
- *
- * Only the provided indexes will be modified (that is---data will be
- * merged with what is already in the database).
- *
- * @param {Quote} quote destination quote
- * @param {Object} new_meta bucket-formatted data to write
- * @param {Function} success callback on success
- * @param {Function} failure callback on error
- *
- * @return {undefined}
- */
- MongoServerDao.prototype.saveQuoteMeta = function (quote, new_meta, success, failure) {
- var update = {};
- for (var key in new_meta) {
- var meta = new_meta[key];
- for (var i in meta) {
- update['meta.' + key + '.' + i] = new_meta[key][i];
- }
- }
- this.mergeData(quote, update, success, failure);
- };
- /**
- * Saves the quote lock state to the database
- *
- * @param Quote quote the quote to save
- * @param Function success_callback function to call on success
- * @param Function failure_callback function to call if save fails
- *
- * @return MongoServerDao self
- */
- MongoServerDao.prototype.saveQuoteLockState = function (quote, success_callback, failure_callback) {
- // lock state is saved by default
- return this.saveQuote(quote, success_callback, failure_callback, {});
- };
- /**
- * Pulls quote data from the database
- *
- * @param Integer quote_id id of quote
- * @param Function( data ) callback function to call when data is available
- *
- * @return MongoServerDao self to allow for method chaining
- */
- MongoServerDao.prototype.pullQuote = function (quote_id, callback) {
- var dao = this;
- // XXX: TODO: Do not read whole of record into memory; filter out
- // revisions!
- this._collection.find({ id: quote_id }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if (data.length == 0) {
- callback.call(dao, null);
- return;
- }
- // return the quote data
- callback.call(dao, data[0]);
- });
- });
- return this;
- };
- MongoServerDao.prototype.getMinQuoteId = function (callback) {
- // just in case it's asynchronous later on
- callback.call(this, this.SEQ_QUOTE_ID_DEFAULT);
- return this;
- };
- MongoServerDao.prototype.getMaxQuoteId = function (callback) {
- var dao = this;
- this._seqCollection.find({ _id: this.SEQ_QUOTE_ID }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- if (data.length == 0) {
- callback.call(dao, 0);
- return;
- }
- // return the max quote id
- callback.call(dao, data[0].val);
- });
- });
- };
- MongoServerDao.prototype.getNextQuoteId = function (callback) {
- var dao = this;
- this._seqCollection.findAndModify({ _id: this.SEQ_QUOTE_ID }, [['val', 'descending']], { $inc: { val: 1 } }, { 'new': true }, function (err, doc) {
- if (err) {
- dao.emit('seqError', err);
- callback.call(dao, 0);
- return;
- }
- // return the new id
- callback.call(dao, doc.val);
- });
- return this;
- };
- /**
- * Create a new revision with the provided quote data
- *
- * The revision will contain the whole the quote. If space is a concern, we
- * can (in the future) calculate a delta instead (Mike recommends the Git
- * model of storing the deltas in previous revisions and the whole of the
- * bucket in the most recently created revision).
- */
- MongoServerDao.prototype.createRevision = function (quote, callback) {
- var _self = this, qid = quote.getId(), data = quote.getBucket().getData();
- this._collection.update({ id: qid }, { '$push': { revisions: { data: data } } },
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err) {
- if (err) {
- _self.emit('mkrevError', err);
- }
- callback(err);
- return;
- });
- };
- MongoServerDao.prototype.getRevision = function (quote, revid, callback) {
- revid = +revid;
- // XXX: TODO: Filter out all but the revision we want
- this._collection.find({ id: quote.getId() }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if ((data.length === 0)
- || (data[0].revisions.length < (revid + 1))) {
- callback(null);
- return;
- }
- // return the quote data
- callback(data[0].revisions[revid]);
- });
- });
- };
- MongoServerDao.prototype.setWorksheets = function (qid, data, callback) {
- this._collection.update({ id: qid }, { '$set': { worksheets: { data: data } } },
- // create record if it does not yet exist
- { upsert: true },
- // on complete
- function (err) {
- callback(err);
- return;
- });
- };
- MongoServerDao.prototype.getWorksheet = function (qid, supplier, index, callback) {
- this._collection.find({ id: qid }, { limit: 1 }, function (_err, cursor) {
- cursor.toArray(function (_err, data) {
- // was the quote found?
- if ((data.length === 0)
- || (!data[0].worksheets)
- || (!data[0].worksheets.data)
- || (!data[0].worksheets.data[supplier])) {
- callback(null);
- return;
- }
- // return the quote data
- callback(data[0].worksheets.data[supplier][index]);
- });
- });
- };
- return MongoServerDao;
-}(EventEmitter));
-exports.MongoServerDao = MongoServerDao;
-;
-//# sourceMappingURL=data:application/json;base64,{"version":3,"file":"MongoServerDao.js","sourceRoot":"","sources":["MongoServerDao.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;;;;;;GAmBG;;;;;;;;;;;;;;;AASH,IAAM,YAAY,GAAG,OAAO,CAAE,QAAQ,CAAE,CAAC,YAAY,CAAC;AAItD;;GAEG;AACH;IAAoC,kCAAY;IAyB5C;;;;OAIG;IACH,wBACqB,GAAQ;QAD7B,YAII,iBAAO,SACV;QAJoB,SAAG,GAAH,GAAG,CAAK;QA7B7B,sCAAsC;QAC7B,gBAAU,GAAW,QAAQ,CAAC;QAEvC,2CAA2C;QAClC,oBAAc,GAAW,KAAK,CAAC;QAExC,iCAAiC;QACxB,kBAAY,GAAW,SAAS,CAAC;QAE1C,+BAA+B;QACtB,0BAAoB,GAAW,MAAM,CAAC;QAG/C,0DAA0D;QAClD,YAAM,GAAY,KAAK,CAAC;;IAmBhC,CAAC;IAGD;;;;;;;;;OASG;IACH,6BAAI,GAAJ,UAAM,QAAkB;QAEpB,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,qEAAqE;QACrE,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,OAAO,EAAE,UAAU,GAAQ;YAEpC,GAAG,CAAC,MAAM,GAAQ,KAAK,CAAC;YACxB,GAAG,CAAC,WAAW,GAAG,IAAI,CAAC;YAEvB,GAAG,CAAC,IAAI,CAAE,cAAc,EAAE,GAAG,CAAE,CAAC;QACpC,CAAC,CAAC,CAAC;QAEH,IAAI,CAAC,OAAO,CAAE,QAAQ,CAAE,CAAC;QACzB,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;;;OASG;IACH,gCAAO,GAAP,UAAS,QAAkB;QAEvB,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,qCAAqC;QACrC,IAAI,CAAC,GAAG,CAAC,IAAI,CAAE,UAAU,GAAQ,EAAE,EAAO;YAEtC,yDAAyD;YACzD,IAAK,GAAG,EACR;gBACI,8DAA8D;gBAC9D,8DAA8D;gBAC9D,UAAU;gBACV,IAAK,GAAG,CAAC,KAAK,KAAK,SAAS,EAC5B;oBACI,GAAG,CAAC,IAAI,CAAE,cAAc,EAAE,GAAG,CAAE,CAAC;oBAChC,OAAO;iBACV;aACJ;YAED,IAAI,WAAW,GAAG,CAAC,CAAC;YACpB,IAAI,WAAW,GAAG;gBAEd,IAAK,EAAE,WAAW,GAAG,CAAC,EACtB;oBACI,OAAO;iBACV;gBAED,uBAAuB;gBACvB,GAAG,CAAC,MAAM,GAAG,IAAI,CAAC;gBAClB,GAAG,CAAC,IAAI,CAAE,OAAO,CAAE,CAAC;gBAEpB,+CAA+C;gBAC/C,IAAK,QAAQ,YAAY,QAAQ,EACjC;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,CAAE,CAAC;iBACxB;YACL,CAAC,CAAA;YAED,oBAAoB;YACpB,EAAE,CAAC,UAAU,CACT,GAAG,CAAC,UAAU,EACd,UACI,IAAe,EACf,UAA2B;gBAE3B,kDAAkD;gBAClD,IAAK,UAAU,IAAI,IAAI,EACvB;oBACI,OAAO;iBACV;gBAED,qBAAqB;gBACrB,UAAU,CAAC,WAAW,CAClB,CAAE,CAAC,IAAI,EAAE,CAAC,CAAC,CAAE,EACb,IAAI,EACJ,UAAU,IAAS,EAAE,MAA4B;oBAE7C,mCAAmC;oBACnC,GAAG,CAAC,WAAW,GAAG,UAAU,CAAC;oBAC7B,WAAW,EAAE,CAAC;gBAClB,CAAC,CACJ,CAAC;YACN,CAAC,CACJ,CAAC;YAEF,iBAAiB;YACjB,EAAE,CAAC,UAAU,CACT,GAAG,CAAC,cAAc,EAClB,UACI,GAAe,EACf,UAA2B;gBAE3B,IAAK,GAAG,EACR;oBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;oBAC5B,OAAO;iBACV;gBAED,IAAK,UAAU,IAAI,IAAI,EACvB;oBACI,OAAO;iBACV;gBAED,GAAG,CAAC,cAAc,GAAG,UAAU,CAAC;gBAEhC,0DAA0D;gBAC1D,UAAU,CAAC,IAAI,CACX,EAAE,GAAG,EAAE,GAAG,CAAC,YAAY,EAAE,EACzB,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,GAAQ,EAAE,MAAM;oBAEtB,IAAK,GAAG,EACR;wBACI,GAAG,CAAC,eAAe,CAAE,WAAW,CAAE,CAAA;wBAClC,OAAO;qBACV;oBAED,MAAM,CAAC,OAAO,CAAE,UAAU,IAAS,EAAE,IAAW;wBAE5C,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;4BACI,GAAG,CAAC,eAAe,CAAE,WAAW,CAAE,CAAC;4BACnC,OAAO;yBACV;wBAED,WAAW,EAAE,CAAC;oBAClB,CAAC,CAAC,CAAC;gBACP,CAAC,CACJ,CAAC;YACN,CAAC,CACJ,CAAC;QACN,CAAC,CAAC,CAAC;QAEH,OAAO,IAAI,CAAC;IAChB,CAAC;IAGO,wCAAe,GAAvB,UAAyB,QAAoB;QAEzC,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,MAAM,CACvB;YACI,GAAG,EAAE,IAAI,CAAC,YAAY;YACtB,GAAG,EAAE,IAAI,CAAC,oBAAoB;SACjC,EACD,UAAU,GAAQ,EAAE,KAAU;YAE1B,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;gBAC5B,OAAO;aACV;YAED,GAAG,CAAC,IAAI,CAAE,SAAS,EAAE,GAAG,CAAC,YAAY,CAAE,CAAC;YACxC,QAAQ,CAAC,IAAI,CAAE,GAAG,CAAE,CAAC;QACzB,CAAC,CACJ,CAAC;IACN,CAAC;IAGD;;;;;;;;;;;;;OAaG;IACH,kCAAS,GAAT,UACI,KAAiC,EACjC,gBAA0B,EAC1B,gBAA0B,EAC1B,SAAqB,EACrB,SAAqB;QAGrB,IAAI,GAAG,GAAyB,IAAI,CAAC;QACrC,IAAI,IAAI,GAAwB,EAAE,CAAC;QAEnC,oDAAoD;QACpD,IAAK,IAAI,CAAC,MAAM,KAAK,KAAK,EAC1B;YACI,IAAI,CAAC,IAAI,CAAE,gBAAgB,EACvB,EAAE,OAAO,EAAE,2BAA2B,EAAE,EACxC,KAAK,CAAE,oBAAoB,CAAE,EAC7B,KAAK,CACR,CAAC;YAEF,gBAAgB,CAAC,IAAI,CAAE,IAAI,EAAE,KAAK,CAAE,CAAC;YACrC,OAAO,GAAG,CAAC;SACd;QAED,IAAK,SAAS,KAAK,SAAS,EAC5B;YACI,SAAS,GAAG;gBACR,IAAI,EAAE,KAAK,CAAC,SAAS,EAAE,CAAC,OAAO,EAAE;aACpC,CAAC;YAEF,sCAAsC;YACtC,IAAI,GAAG,KAAK,CAAC,aAAa,EAAE,CAAC,OAAO,EAAE,CAAC;SAC1C;QAED,IAAI,EAAE,GAAG,KAAK,CAAC,KAAK,EAAE,CAAC;QAEvB,wEAAwE;QACxE,wBAAwB;QACxB,SAAS,CAAC,EAAE,GAAmB,EAAE,CAAC;QAClC,SAAS,CAAC,IAAI,GAAiB,KAAK,CAAC,iBAAiB,EAAE,CAAC;QACzD,SAAS,CAAC,WAAW,GAAU,CAAC,CAAC;QACjC,SAAS,CAAC,YAAY,GAAS,KAAK,CAAC,kBAAkB,EAAE,CAAC;QAC1D,SAAS,CAAC,gBAAgB,GAAK,KAAK,CAAC,YAAY,EAAE,CAAC;QACpD,SAAS,CAAC,YAAY,GAAS,KAAK,CAAC,qBAAqB,EAAE,CAAC;QAC7D,SAAS,CAAC,kBAAkB,GAAG,KAAK,CAAC,mBAAmB,EAAE,CAAC;QAC3D,SAAS,CAAC,WAAW,GAAU,CAAC,KAAK,CAAC,UAAU,EAAE,CAAC;QACnD,SAAS,CAAC,QAAQ,GAAa,CAAC,KAAK,CAAC,OAAO,EAAE,CAAC;QAChD,SAAS,CAAC,UAAU,GAAW,IAAI,CAAC,KAAK,CACrC,CAAE,IAAI,IAAI,EAAE,CAAE,CAAC,OAAO,EAAE,GAAG,IAAI,CAClC,CAAC;QAEF,4DAA4D;QAC5D,IAAI,CAAC,4BAA4B,GAAG,CAAE,KAAK,CAAC,YAAY,EAAE,CAAE,CAAC;QAE7D,yDAAyD;QACzD,SAAS,CAAC,MAAM,GAAG,CAAE,IAAI,KAAK,EAAE,CAAE,CAAC,KAAK,CAAC;QAEzC,qEAAqE;QACrE,MAAM,CAAC,IAAI,CAAE,IAAI,CAAE,CAAC,OAAO,CACvB,UAAA,GAAG,IAAI,OAAA,SAAS,CAAE,OAAO,GAAG,GAAG,CAAE,GAAG,IAAI,CAAE,GAAG,CAAE,EAAxC,CAAwC,CAClD,CAAC;QAEF,4BAA4B;QAC5B,IAAM,QAAQ,GAAG,CAAE,CAAC,SAAS,IAAI,CAAC,MAAM,CAAC,IAAI,CAAE,SAAS,CAAE,CAAC,MAAM,CAAE;YAC/D,CAAC,CAAC,EAAE,MAAM,EAAE,SAAS,EAAE;YACvB,CAAC,CAAC,EAAE,MAAM,EAAE,SAAS,EAAE,OAAO,EAAE,SAAS,EAAE,CAAC;QAEhD,kEAAkE;QAClE,YAAY;QACZ,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,EAAE,EAAE,EAChC,QAAQ;QAER,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG,EAAE,KAAK;YAEhB,gDAAgD;YAChD,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,gBAAgB,EAAE,GAAG,EAAE,KAAK,CAAE,CAAC;gBAEzC,kCAAkC;gBAClC,IAAK,gBAAgB,YAAY,QAAQ,EACzC;oBACI,gBAAgB,CAAC,IAAI,CAAE,GAAG,EAAE,KAAK,CAAE,CAAC;iBACvC;gBAED,OAAO;aACV;YAED,aAAa;YACb,IAAK,gBAAgB,YAAY,QAAQ,EACzC;gBACI,gBAAgB,CAAC,IAAI,CAAE,GAAG,EAAE,KAAK,CAAE,CAAC;aACvC;QACL,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;OAOG;IACH,kCAAS,GAAT,UACI,KAA0B,EAC1B,IAAsB,EACtB,SAAmB,EACnB,SAAmB;QAGnB,mEAAmE;QACnE,IAAI,MAAM,GAAG,IAAI,CAAC;QAElB,yDAAyD;QACzD,IAAI,KAAK,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,KAAK,CAAC,KAAK,EAAE,EAAE,EAC3C,EAAE,MAAM,EAAE,MAAM,EAAE,EAClB,EAAE,EAEF,UAAU,GAAG,EAAE,KAAK;YAEhB,IAAK,GAAG,EACR;gBACI,KAAK,CAAC,IAAI,CAAE,gBAAgB,EAAE,GAAG,EAAE,KAAK,CAAE,CAAC;gBAE3C,IAAK,OAAO,SAAS,KAAK,UAAU,EACpC;oBACI,SAAS,CAAE,KAAK,CAAE,CAAC;iBACtB;gBAED,OAAO;aACV;YAED,IAAK,OAAO,SAAS,KAAK,UAAU,EACpC;gBACI,SAAS,CAAE,KAAK,CAAE,CAAC;aACtB;QACL,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;;;;OAUG;IACH,oCAAW,GAAX,UACI,KAAwB,EACxB,IAAoB,EACpB,OAAiB,EACjB,OAAiB;QAGjB,IAAI,MAAM,GAAgB,EAAE,CAAC;QAE7B,KAAM,IAAI,KAAK,IAAI,IAAI,EACvB;YACI,IAAK,CAAC,KAAK,EACX;gBACI,SAAS;aACZ;YAED,MAAM,CAAE,OAAO,GAAG,KAAK,CAAE,GAAG,IAAI,CAAE,KAAK,CAAE,CAAC;SAC7C;QAED,OAAO,IAAI,CAAC,SAAS,CAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,OAAO,CAAE,CAAC;IAC7D,CAAC;IAGD;;;;;;;;;;;OAWG;IACH,uCAAc,GAAd,UACI,KAAiC,EACjC,gBAAqB,EACrB,gBAAqB;QAGrB,IAAI,MAAM,GAAG;YACT,aAAa,EAAK,KAAK,CAAC,gBAAgB,EAAE;YAC1C,gBAAgB,EAAE,KAAK,CAAC,mBAAmB,EAAE;YAC7C,cAAc,EAAI,KAAK,CAAC,iBAAiB,EAAE;SAC9C,CAAC;QAEF,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EAAE,MAAM,EAAE,gBAAgB,EAAE,gBAAgB,CACpD,CAAC;IACN,CAAC;IAGD,yCAAgB,GAAhB,UACI,KAAwB,EACxB,OAAY,EACZ,OAAY,EACZ,OAAY;QAGZ,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EACL,EAAE,SAAS,EAAE,OAAO,EAAE,EACtB,OAAO,EACP,OAAO,CACV,CAAC;IACN,CAAC;IAGD;;;;;;;;;;;;OAYG;IACH,sCAAa,GAAb,UACI,KAAyB,EACzB,QAAa,EACb,OAAkB,EAClB,OAAkB;QAGlB,IAAM,MAAM,GAAgB,EAAE,CAAC;QAE/B,KAAM,IAAI,GAAG,IAAI,QAAQ,EACzB;YACI,IAAI,IAAI,GAAG,QAAQ,CAAE,GAAG,CAAE,CAAC;YAE3B,KAAM,IAAI,CAAC,IAAI,IAAI,EACnB;gBACI,MAAM,CAAE,OAAO,GAAG,GAAG,GAAG,GAAG,GAAG,CAAC,CAAE,GAAG,QAAQ,CAAE,GAAG,CAAE,CAAE,CAAC,CAAE,CAAC;aAC5D;SACJ;QAED,IAAI,CAAC,SAAS,CAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,OAAO,CAAE,CAAC;IACtD,CAAC;IAGD;;;;;;;;OAQG;IACH,2CAAkB,GAAlB,UACI,KAAiC,EACjC,gBAA0B,EAC1B,gBAA0B;QAG1B,iCAAiC;QACjC,OAAO,IAAI,CAAC,SAAS,CACjB,KAAK,EACL,gBAAgB,EAChB,gBAAgB,EAChB,EAAE,CACL,CAAC;IACN,CAAC;IAGD;;;;;;;OAOG;IACH,kCAAS,GAAT,UACI,QAAyB,EACzB,QAAsD;QAGtD,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,iEAAiE;QACjE,aAAa;QACb,IAAI,CAAC,WAAY,CAAC,IAAI,CAAE,EAAE,EAAE,EAAE,QAAQ,EAAE,EAAE,EAAE,KAAK,EAAmB,CAAC,EAAE,EACnE,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC;oBAC3B,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC,CAAE,CAAE,CAAC;YACpC,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD,sCAAa,GAAb,UAAe,QAAoC;QAE/C,0CAA0C;QAC1C,QAAQ,CAAC,IAAI,CAAE,IAAI,EAAE,IAAI,CAAC,oBAAoB,CAAE,CAAC;QAEjD,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD,sCAAa,GAAb,UAAe,QAAoC;QAE/C,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,IAAI,CACrB,EAAE,GAAG,EAAE,IAAI,CAAC,YAAY,EAAE,EAC1B,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,IAAK,IAAI,CAAC,MAAM,IAAI,CAAC,EACrB;oBACI,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,CAAC,CAAE,CAAC;oBACxB,OAAO;iBACV;gBAED,0BAA0B;gBAC1B,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,IAAI,CAAE,CAAC,CAAE,CAAC,GAAG,CAAE,CAAC;YACxC,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,uCAAc,GAAd,UAAgB,QAAsC;QAElD,IAAI,GAAG,GAAG,IAAI,CAAC;QAEf,IAAI,CAAC,cAAe,CAAC,aAAa,CAC9B,EAAE,GAAG,EAAE,IAAI,CAAC,YAAY,EAAE,EAC1B,CAAE,CAAE,KAAK,EAAE,YAAY,CAAE,CAAE,EAC3B,EAAE,IAAI,EAAE,EAAE,GAAG,EAAE,CAAC,EAAE,EAAE,EACpB,EAAE,KAAK,EAAE,IAAI,EAAE,EAEf,UAAU,GAAG,EAAE,GAAG;YAEd,IAAK,GAAG,EACR;gBACI,GAAG,CAAC,IAAI,CAAE,UAAU,EAAE,GAAG,CAAE,CAAC;gBAE5B,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,CAAC,CAAE,CAAC;gBACxB,OAAO;aACV;YAED,oBAAoB;YACpB,QAAQ,CAAC,IAAI,CAAE,GAAG,EAAE,GAAG,CAAC,GAAG,CAAE,CAAC;QAClC,CAAC,CACJ,CAAC;QAEF,OAAO,IAAI,CAAC;IAChB,CAAC;IAGD;;;;;;;OAOG;IACH,uCAAc,GAAd,UACI,KAAyB,EACzB,QAAuB;QAGvB,IAAI,KAAK,GAAG,IAAI,EACZ,GAAG,GAAK,KAAK,CAAC,KAAK,EAAE,EACrB,IAAI,GAAI,KAAK,CAAC,SAAS,EAAE,CAAC,OAAO,EAAE,CAAC;QAExC,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,GAAG,EAAE,EACjC,EAAE,OAAO,EAAE,EAAE,SAAS,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,EAAE,EAAE;QAE1C,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG;YAET,IAAK,GAAG,EACR;gBACI,KAAK,CAAC,IAAI,CAAE,YAAY,EAAE,GAAG,CAAE,CAAC;aACnC;YAED,QAAQ,CAAE,GAAG,CAAE,CAAC;YAChB,OAAO;QACX,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,oCAAW,GAAX,UACI,KAAyB,EACzB,KAAyB,EACzB,QAAuB;QAGvB,KAAK,GAAoB,CAAC,KAAK,CAAC;QAEhC,qDAAqD;QACrD,IAAI,CAAC,WAAY,CAAC,IAAI,CAClB,EAAE,EAAE,EAAE,KAAK,CAAC,KAAK,EAAE,EAAE,EACrB,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,CAAE,IAAI,CAAC,MAAM,KAAK,CAAC,CAAE;uBACnB,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,SAAS,CAAC,MAAM,GAAG,CAAE,KAAK,GAAG,CAAC,CAAE,CAAE,EAErD;oBACI,QAAQ,CAAE,IAAI,CAAE,CAAC;oBACjB,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,SAAS,CAAE,KAAK,CAAE,CAAE,CAAC;YAC7C,CAAC,CAAC,CAAC;QACP,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,sCAAa,GAAb,UACI,GAAiB,EACjB,IAAqB,EACrB,QAA4B;QAG5B,IAAI,CAAC,WAAY,CAAC,MAAM,CAAE,EAAE,EAAE,EAAE,GAAG,EAAE,EACjC,EAAE,MAAM,EAAE,EAAE,UAAU,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,EAAE,EAAE;QAE1C,yCAAyC;QACzC,EAAE,MAAM,EAAE,IAAI,EAAE;QAEhB,cAAc;QACd,UAAU,GAAG;YAET,QAAQ,CAAE,GAAG,CAAE,CAAC;YAChB,OAAO;QACX,CAAC,CACJ,CAAC;IACN,CAAC;IAGD,qCAAY,GAAZ,UACI,GAAiB,EACjB,QAAgB,EAChB,KAAyB,EACzB,QAAgD;QAGhD,IAAI,CAAC,WAAY,CAAC,IAAI,CAClB,EAAE,EAAE,EAAE,GAAG,EAAE,EACX,EAAE,KAAK,EAAmB,CAAC,EAAE,EAC7B,UAAU,IAAI,EAAE,MAAM;YAElB,MAAM,CAAC,OAAO,CAAE,UAAU,IAAmB,EAAE,IAAW;gBAEtD,uBAAuB;gBACvB,IAAK,CAAE,IAAI,CAAC,MAAM,KAAK,CAAC,CAAE;uBACnB,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAE;uBACzB,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE;uBAC9B,CAAE,CAAC,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE,QAAQ,CAAE,CAAE,EAEjD;oBACI,QAAQ,CAAE,IAAI,CAAE,CAAC;oBACjB,OAAO;iBACV;gBAED,wBAAwB;gBACxB,QAAQ,CAAE,IAAI,CAAE,CAAC,CAAE,CAAC,UAAU,CAAC,IAAI,CAAE,QAAQ,CAAE,CAAE,KAAK,CAAE,CAAE,CAAC;YAC/D,CAAC,CAAE,CAAC;QACR,CAAC,CACJ,CAAC;IACN,CAAC;IACL,qBAAC;AAAD,CAAC,AAhvBD,CAAoC,YAAY,GAgvB/C;AAhvBY,wCAAc;AAgvB1B,CAAC"} \ No newline at end of file
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
new file mode 100644
index 0000000..bfd6dc3
--- /dev/null
+++ b/src/system/AmqpPublisher.ts
@@ -0,0 +1,42 @@
+/**
+ * Amqp Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish Amqp message to a queue
+ */
+
+import { DeltaResult } from "../bucket/delta";
+import { Options } from 'amqplib';
+
+
+export interface AmqpConfig extends Options.Connect {
+ /** The name of a queue or exchange to publish to */
+ exchange: string;
+}
+
+
+export interface AmqpPublisher
+{
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param delta - The delta to publish
+ */
+ publish( delta: DeltaResult<any> ): void;
+}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 6103f20..67e372d 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -23,6 +23,7 @@ import { DeltaDao } from "../system/db/DeltaDao";
import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
+import { AmqpPublisher } from "./AmqpPublisher";
/**
@@ -36,12 +37,6 @@ export class DeltaProcessor
/** The data delta type */
readonly DELTA_DATA: MongoDeltaType = 'data';
- /** A mapping of which delta type translated to which avro event */
- readonly DELTA_MAP: Record<string, string> = {
- DELTA_RATEDATA: 'rate',
- DELTA_DATA: 'update',
- };
-
/**
* Initialize processor
@@ -49,7 +44,8 @@ export class DeltaProcessor
* @param _collection Mongo collection
*/
constructor(
- private readonly _dao: DeltaDao,
+ private readonly _dao: DeltaDao,
+ private readonly _publisher: AmqpPublisher,
) {}
@@ -68,9 +64,7 @@ export class DeltaProcessor
deltas.forEach( delta => {
- // TODO: publish delta
- // publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
- console.log( delta, self.DELTA_MAP[ delta.type ] );
+ self._publisher.publish( delta );
});
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
new file mode 100644
index 0000000..2606c56
--- /dev/null
+++ b/src/system/DeltaPublisher.ts
@@ -0,0 +1,133 @@
+/**
+ * Delta Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish delta message to a queue
+ */
+
+import { AmqpPublisher } from "./AmqpPublisher";
+import { DeltaResult } from "../bucket/delta";
+import {
+ connect as amqpConnect,
+ Options,
+ Channel
+} from 'amqplib';
+
+
+export interface AmqpConfig extends Options.Connect {
+ /** The name of a queue or exchange to publish to */
+ exchange: string;
+}
+
+
+export class DeltaPublisher implements AmqpPublisher
+{
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record<string, string> = {
+ data: 'rate',
+ ratedata: 'update',
+ };
+
+
+ /**
+ * Initialize trait
+ *
+ * @param {Object} conf AMQP configuration
+ * @param {DebugLog} logger logger instance
+ */
+ constructor(
+ private readonly _conf: AmqpConfig,
+ private readonly _logger: any
+ ) {}
+
+
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param delta - The delta to publish
+ */
+ publish( delta: DeltaResult<any> ): void
+ {
+ // check both as we transition from one to the other
+ const exchange = this._conf.exchange;
+
+ amqpConnect( this._conf )
+ .then( conn =>
+ {
+ setTimeout( () => conn.close(), 10000 );
+ return conn.createChannel();
+ } )
+ .then( ch => {
+ ch.assertExchange( exchange, 'fanout', { durable: true } );
+
+ return this._sendMessage( ch, exchange, delta );
+ } )
+ .then( () => this._logger.log(
+ this._logger.PRIORITY_INFO,
+ "Published " + delta.type + " delta with timestamp '" +
+ delta.timestamp + "' to quote-update exchange '"+
+ exchange + "'"
+ ) )
+ .catch( e => this._logger.log(
+ this._logger.PRIORITY_ERROR,
+ "Error publishing " + delta.type + " delta with timestamp '" +
+ delta.timestamp + "' to quote-update exchange '"+
+ exchange + "'" + ": " + e
+ ) );
+ }
+
+
+ /**
+ * Send message to exchange
+ *
+ * @param channel - AMQP channel
+ * @param exchange - exchange name
+ * @param delta - The delta to publish
+ *
+ * @return whether publish was successful
+ */
+ _sendMessage(
+ channel: Channel,
+ exchange: string,
+ delta: DeltaResult<any>,
+ ): boolean
+ {
+ const headers = {
+ version: 1,
+ created: Date.now(),
+ };
+
+ const event_id = this.DELTA_MAP[ delta.type ];
+
+ const data = new Buffer( JSON.stringify( {
+ delta: delta,
+ event: event_id,
+ } ) );
+
+ // we don't use a routing key; fanout exchange
+ const routing_key = '';
+
+ return channel.publish(
+ exchange,
+ routing_key,
+ data,
+ { headers: headers },
+ );
+ }
+}