Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--conf/vanilla-server.json14
-rw-r--r--src/server/daemon/Daemon.js39
-rw-r--r--src/server/daemon/controller.js15
-rw-r--r--src/server/service/RatingServicePublish.js143
4 files changed, 199 insertions, 12 deletions
diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json
index 452dc4c..53d14e4 100644
--- a/conf/vanilla-server.json
+++ b/conf/vanilla-server.json
@@ -50,7 +50,19 @@
"host": "localhost",
"domain": ""
},
- "noResultsUrl": ""
+ "noResultsUrl": "",
+ "postRatePublish": {
+ "protocol": "amqp",
+ "hostname": "localhost",
+ "port": 5672,
+ "username": "",
+ "password": "",
+ "locale": "en_US",
+ "frameMax": 0,
+ "heartbeat": 0,
+ "vhost": "/",
+ "queueName": "postrate"
+ }
},
"c1export": {
"host": "localhost",
diff --git a/src/server/daemon/Daemon.js b/src/server/daemon/Daemon.js
index b9e2895..306fe58 100644
--- a/src/server/daemon/Daemon.js
+++ b/src/server/daemon/Daemon.js
@@ -114,7 +114,8 @@ module.exports = AbstractClass( 'Daemon',
this._createAccessLog(),
this._conf.get( 'skey' ),
this._conf.get( 'services.rating.noResultsUrl' ),
- ] ).then( ([ debug_log, access_log, skey, no_results_url ]) =>
+ this._conf.get( 'services.rating.postRatePublish' ),
+ ] ).then( ([ debug_log, access_log, skey, no_results_url, post_rate ]) =>
{
this._debugLog = debug_log;
this._accessLog = access_log;
@@ -123,7 +124,21 @@ module.exports = AbstractClass( 'Daemon',
this._rater = liza.server.rater.ProcessManager();
this._encService = this.getEncryptionService();
this._memcache = this.getMemcacheClient();
- this._routers = this.getRouters( skey, no_results_url );
+
+ post_rate.reduce(
+ ( accum, value, key ) =>
+ {
+ accum[ key ] = value;
+ return accum;
+ },
+ {}
+ ).then( post_rate_publish =>
+ this._routers = this.getRouters(
+ skey,
+ no_results_url,
+ post_rate_publish
+ )
+ );
} )
.then( () => this._startDaemon() );
},
@@ -190,18 +205,24 @@ module.exports = AbstractClass( 'Daemon',
* all-submit notification URL NO_RESULTS_URL if they are provided,
* respectively.
*
- * @param {string=} skey session key
- * @param {no_results_url=} no_results_url URL for all-submit notification
+ * @param {string=} skey session key
+ * @param {string=} no_results_url URL for all-submit notification
+ * @param {Object=} post_rate_publish configuration for post-rate messages
*
* @return {Object} controller
*/
- 'protected getProgramController': function( skey, no_results_url )
+ 'protected getProgramController': function(
+ skey, no_results_url, post_rate_publish
+ )
{
var controller = require( './controller' );
controller.rater = this._rater;
controller.no_results_url = no_results_url || controller.no_results_url;
+ controller.post_rate_publish =
+ post_rate_publish || controller.post_rate_publish;
+
if ( skey )
{
controller.skey = skey;
@@ -291,10 +312,14 @@ module.exports = AbstractClass( 'Daemon',
'abstract protected getEncryptionService': [],
- 'protected getRouters': function( skey, no_results_url )
+ 'protected getRouters': function(
+ skey, no_results_url, post_rate_publish
+ )
{
return [
- this.getProgramController( skey, no_results_url ),
+ this.getProgramController(
+ skey, no_results_url, post_rate_publish
+ ),
this.getScriptsController(),
this.getClientErrorController(),
];
diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js
index 4ec8b49..52f10b8 100644
--- a/src/server/daemon/controller.js
+++ b/src/server/daemon/controller.js
@@ -83,6 +83,7 @@ const {
},
RatingService,
+ RatingServicePublish,
RatingServiceSubmitNotify,
TokenedService,
TokenDao,
@@ -98,6 +99,8 @@ const {
store,
} = require( '../..' );
+const amqplib = require( 'amqplib' );
+
// read and write locks, as separate semaphores
var rlock = Semaphore(),
@@ -108,9 +111,10 @@ var sflag = {};
// TODO: kluge to get liza somewhat decoupled from lovullo (rating module)
-exports.rater = {};
-exports.skey = "";
-exports.no_results_url = "";
+exports.rater = {};
+exports.skey = "";
+exports.no_results_url = "";
+exports.post_rate_publish = {};
exports.init = function( logger, enc_service, conf )
@@ -161,7 +165,10 @@ exports.init = function( logger, enc_service, conf )
)
: RatingService;
- rating_service = RatingServiceBase(
+ // TODO: temporary proof-of-concept
+ rating_service = RatingServiceBase.use(
+ RatingServicePublish( amqplib, exports.post_rate_publish )
+ )(
logger, dao, server, exports.rater
);
diff --git a/src/server/service/RatingServicePublish.js b/src/server/service/RatingServicePublish.js
new file mode 100644
index 0000000..fa0ac14
--- /dev/null
+++ b/src/server/service/RatingServicePublish.js
@@ -0,0 +1,143 @@
+/**
+ * Publishes message to queue after rating
+ *
+ * Copyright (C) 2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+'use strict';
+
+const { Trait } = require( 'easejs' );
+const RatingService = require( './RatingService' );
+
+
+/**
+ * Publish message to a queue after rating
+ *
+ * This is an initial proof-of-concept implementation. In particular, we
+ * have the following considerations:
+ *
+ * - A fresh connection is made for each message until we can ensure that
+ * we can auto-reconnect on failure;
+ * - This trait is not yet tested;
+ * - It does not use an exchange;
+ * - It does a poor job checking for and reporting errors.
+ *
+ * The message consists of a `version' header that is set to 1. Future
+ * changes to the message format will bump this version. There is also a
+ * `created' header holding a Unix timestamp of the moment that the message
+ * was created.
+ *
+ * Version 1 of the body consists of four fields:
+ * - quote_id
+ * - agent_id
+ * - entity_id
+ * - entity_name
+ *
+ * See the body of `#_sendMessage' for their values.
+ */
+module.exports = Trait( 'RatingServicePublish' )
+ .extend( RatingService,
+{
+ /**
+ * AMQP library (amqplib API)
+ *
+ * @type {amqplib}
+ */
+ 'private _amqp': null,
+
+ /**
+ * AMQP configuration
+ *
+ * This should be the configuration expected by amqplib's #connect. It
+ * should additionally contain a `queueName' field.
+ *
+ * @type {Object}
+ */
+ 'private _conf': {},
+
+
+ __mixin( amqp, conf )
+ {
+ this._amqp = amqp;
+ this._conf = conf;
+ },
+
+
+ /**
+ * Queue message post rating
+ *
+ * @param {UserRequest} request user request
+ * @param {Object} data rating data returned
+ * @param {Array} actions actions to send to client
+ * @param {Program} program program used to perform rating
+ * @param {Quote} quote quote used for rating
+ *
+ * @return {undefined}
+ */
+ 'override protected postProcessRaterData'(
+ request, data, actions, program, quote
+ )
+ {
+ const queue = this._conf.queueName;
+
+ let connection = null;
+
+ this._amqp.connect( this._conf )
+ .then( conn => connection = conn.createChannel() )
+ .then( ch => {
+ ch.assertQueue( queue, { durable: true } );
+
+ return this._sendMessage(
+ ch,
+ queue,
+ request.getSession(),
+ quote
+ );
+ } );
+ },
+
+
+ /**
+ * Send message to queue
+ *
+ * @param {Channel} channel AMQP channel
+ * @param {string} queue queue name
+ * @param {UserSession} session user session
+ * @param {Quote} quote rated quote
+ *
+ * @return {Promise} whether sendToQueue was successful
+ */
+ 'private _sendMessage'( channel, queue, session, quote )
+ {
+ const headers = {
+ version: 1,
+ created: Date.now(),
+ };
+
+ const data = new Buffer( JSON.stringify( {
+ quote_id: quote.getId(),
+ agent_id: session.agentId(),
+ entity_id: session.agentEntityId(),
+ entity_name: session.agentName(),
+ } ) );
+
+ return Promise.resolve(
+ channel.sendToQueue( queue, data, { headers: headers } )
+ );
+ },
+} );