Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:26:39 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:26:39 -0500
commite781a841b177cdb330c3ae4ec453ec4010ce8470 (patch)
tree6f7a1f062b4b5f32cf04d1eab184749ccb2a2288
parentfaa7e15760d1136ef44b15a914c5dd582f86a903 (diff)
downloadliza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.gz
liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.bz2
liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.zip
[DEV-5312] Reconnect AMQP when connection drops
-rw-r--r--bin/delta-processor.ts66
-rw-r--r--src/system/AmqpPublisher.ts33
-rw-r--r--src/system/DeltaLogger.ts13
-rw-r--r--src/system/DeltaPublisher.ts72
-rw-r--r--src/system/MetricsCollector.ts29
-rw-r--r--test/system/DeltaPublisherTest.ts6
6 files changed, 140 insertions, 79 deletions
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index 1dde6df..92dd7a3 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -41,15 +41,19 @@ const {
} = require( 'mongodb' );
// TODO: fix this
-process.env.NODE_ENV = 'dev';
-process.env.amqp_hostname = 'localhost';
-process.env.amqp_port = '5672';
-process.env.amqp_username = 'quote_referral';
-process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
-process.env.amqp_vhost = 'quote';
-process.env.amqp_exchange = 'quoteupdate';
-process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
-process.env.prom_port = '9091';
+process.env.NODE_ENV = 'dev';
+process.env.amqp_hostname = 'localhost';
+process.env.amqp_port = '5672';
+process.env.amqp_username = 'quote_referral';
+process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
+process.env.amqp_frameMax = '0';
+process.env.amqp_heartbeat = '2';
+process.env.amqp_vhost = 'quote';
+process.env.amqp_exchange = 'quoteupdate';
+process.env.amqp_retries = '30';
+process.env.amqp_retry_wait = '1';
+process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
+process.env.prom_port = '9091';
// Environment variables
const amqp_conf = _getAmqpConfig( process.env );
@@ -58,19 +62,19 @@ const prom_conf = _getPrometheusConfig( process.env );
const env = process.env.NODE_ENV || 'Unknown Environment';
// Event handling
-const event_emitter = new EventEmitter();
-const event_dispatcher = new EventDispatcher( event_emitter );
-const event_subscriber = new EventSubscriber( event_emitter );
+const emitter = new EventEmitter();
+const dispatcher = new EventDispatcher( emitter );
+const subscriber = new EventSubscriber( emitter );
// Event subscribers
-new DeltaLogger( env, event_subscriber, ts_ctr );
-const metrics = new MetricsCollector( prom_conf, event_subscriber );
+new DeltaLogger( env, subscriber, ts_ctr );
+const metrics = new MetricsCollector( prom_conf, subscriber );
// Instantiate classes for processor
const db = _createDB( db_conf );
const dao = new MongoDeltaDao( db );
-const publisher = new DeltaPublisher( amqp_conf, event_dispatcher, ts_ctr );
-const processor = new DeltaProcessor( dao, publisher, event_dispatcher );
+const publisher = new DeltaPublisher( amqp_conf, dispatcher, ts_ctr );
+const processor = new DeltaProcessor( dao, publisher, dispatcher );
// If the dao intializes successfully then process on a two second interval
const interval_ms = 2000;
@@ -80,7 +84,11 @@ let process_interval: NodeJS.Timer;
dao.init()
.then( _ =>
{
- publisher.connect();
+ publisher.connect()
+ .catch( e =>
+ {
+ console.error( 'AMQP connection error: ' + e );
+ } );
} )
.then( _ =>
{
@@ -98,7 +106,7 @@ dao.init()
interval_ms,
);
} )
-.catch( err => { console.error( 'Mongo Error: ' + err ); } );
+.catch( err => { console.error( 'Error: ' + err ); } );
/**
@@ -253,16 +261,18 @@ function _getMongoConfig( env: any ): MongoDbConfig
function _getAmqpConfig( env: any ): AmqpConfig
{
return <AmqpConfig>{
- "protocol": "amqp",
- "hostname": env.amqp_hostname,
- "port": +( env.amqp_port || 0 ),
- "username": env.amqp_username,
- "password": env.amqp_password,
- "locale": "en_US",
- "frameMax": 0,
- "heartbeat": 0,
- "vhost": env.amqp_vhost,
- "exchange": env.amqp_exchange,
+ "protocol": "amqp",
+ "hostname": env.amqp_hostname,
+ "port": +( env.amqp_port || 0 ),
+ "username": env.amqp_username,
+ "password": env.amqp_password,
+ "locale": "en_US",
+ "frameMax": env.amqp_frameMax,
+ "heartbeat": env.amqp_heartbeat,
+ "vhost": env.amqp_vhost,
+ "exchange": env.amqp_exchange,
+ "retries": env.amqp_retries || 30,
+ "retry_wait": env.amqp_retry_wait || 1,
};
}
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index 622d659..26b4949 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -26,8 +26,41 @@ import { Options } from 'amqplib';
export interface AmqpConfig extends Options.Connect {
+ /** The protocol to connect with (should always be "amqp") */
+ protocol: string;
+
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** A username if one if required */
+ username?: string;
+
+ /** A password if one if required */
+ password?: string;
+
+ /** Locale (should always be "en_US") */
+ locale: string;
+
+ /** The size in bytes of the maximum frame allowed */
+ frameMax: number;
+
+ /** How often to check for a live connection */
+ heartBeat: number;
+
+ /** The virtual host we are on (e.g. live, demo, test) */
+ vhost?: string;
+
/** The name of a queue or exchange to publish to */
exchange: string;
+
+ /** The number of times to retry connecting */
+ retries: number;
+
+ /** The time to wait in between retries */
+ retry_wait: number;
}
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
index f286387..3d06157 100644
--- a/src/system/DeltaLogger.ts
+++ b/src/system/DeltaLogger.ts
@@ -65,11 +65,14 @@ export class DeltaLogger
*/
init(): void
{
- this._registerEvent( 'document-processed', LogLevel.NOTICE );
- this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- this._registerEvent( 'avro-err', LogLevel.ERROR );
- this._registerEvent( 'mongodb-err', LogLevel.ERROR );
- this._registerEvent( 'publish-err', LogLevel.ERROR );
+ this._registerEvent( 'document-processed', LogLevel.NOTICE );
+ this._registerEvent( 'delta-publish', LogLevel.NOTICE );
+ this._registerEvent( 'amqp-conn-error', LogLevel.WARNING );
+ this._registerEvent( 'amqp-reconnect', LogLevel.WARNING );
+ this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR );
+ this._registerEvent( 'avro-err', LogLevel.ERROR );
+ this._registerEvent( 'mongodb-err', LogLevel.ERROR );
+ this._registerEvent( 'publish-err', LogLevel.ERROR );
}
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index c718756..1fe9e3c 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -21,23 +21,17 @@
* Publish delta message to a queue
*/
-import { AmqpPublisher } from './AmqpPublisher';
+import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
import { DeltaResult } from '../bucket/delta';
import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
- Options,
Channel,
Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
-export interface AmqpConfig extends Options.Connect {
- /** The name of a queue or exchange to publish to */
- exchange: string;
-}
-
export interface AvroSchema {
/** Write data to a buffer */
@@ -94,6 +88,56 @@ export class DeltaPublisher implements AmqpPublisher
{
this._conn = conn;
+ // If there is an error, attemp to reconnect
+ this._conn.on( 'error', e =>
+ {
+ this._dispatcher.dispatch( 'amqp-conn-error', e );
+
+ let reconnect_interval: NodeJS.Timer;
+
+ let retry_count = 0;
+
+ const reconnect = () =>
+ {
+ if ( ++retry_count >= this._conf.retries )
+ {
+ clearInterval( reconnect_interval );
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect-fail',
+ 'Could not re-establish AMQP connection.'
+ );
+
+ return;
+ }
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect',
+ '...attempting to re-establish AMQP connection'
+ );
+
+ this.connect()
+ .then( _ =>
+ {
+ clearInterval( reconnect_interval );
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect',
+ 'AMQP re-connected'
+ );
+ } )
+ .catch( e =>
+ {
+ this._dispatcher.dispatch( 'amqp-conn-error', e );
+ } );
+ }
+
+ reconnect_interval = setInterval(
+ reconnect,
+ ( this._conf.retry_wait * 1000 )
+ );
+ } );
+
return this._conn.createChannel();
} )
.then( ( ch: Channel ) =>
@@ -141,8 +185,6 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
- const startTime = process.hrtime();
-
this.sendMessage( delta )
.then( _ =>
{
@@ -153,8 +195,6 @@ export class DeltaPublisher implements AmqpPublisher
+ '" exchange',
);
- console.log('#publish: '
- + process.hrtime( startTime )[0] / 10000 );
resolve();
return;
} )
@@ -184,13 +224,9 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
- const startTime = process.hrtime();
-
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
const delta_data = this.avroFormat( delta.data );
- console.log('#sendmessage 1: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
const event_id = this.DELTA_MAP[ delta.type ];
const avro_buffer = this.avroEncode( {
event: {
@@ -223,8 +259,6 @@ export class DeltaPublisher implements AmqpPublisher
},
},
} );
- console.log('#sendmessage 2: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
if ( !this._conn )
{
@@ -241,8 +275,6 @@ export class DeltaPublisher implements AmqpPublisher
reject( 'Error sending message: No avro buffer' );
return;
}
- console.log('#sendmessage 3: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
// we don't use a routing key; fanout exchange
const published_successfully = this._channel.publish(
@@ -254,8 +286,6 @@ export class DeltaPublisher implements AmqpPublisher
if ( published_successfully )
{
- console.log('#sendmessage 4: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
resolve();
return;
}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 2d7ca24..4f38c46 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -130,7 +130,14 @@ export class MetricsCollector
} );
// Push metrics on a specific intervals
- setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
+ setInterval(
+ () =>
+ {
+ this._gateway.pushAdd(
+ { jobName: 'liza_delta_metrics' }, this.pushCallback
+ );
+ }, this._push_interval_ms
+ );
// Subsribe metrics to events
this.subscribeMetrics();
@@ -146,7 +153,6 @@ export class MetricsCollector
'delta-process-complete',
( val ) =>
{
- console.log( 'Got time: ' + val + 'ms' );
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
@@ -160,25 +166,6 @@ export class MetricsCollector
/**
- * Push metrics to Prometheus PushGateway
- */
- private pushMetrics(): void
- {
- // this._gateway.pushAdd( this._process_time_params, this.pushCallback );
- // this._gateway.pushAdd( this._process_error_params, this.pushCallback );
- // this._gateway.pushAdd( this._current_error_params, this.pushCallback );
- // this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
-
- this._gateway.pushAdd(
- {
- jobName: 'liza_delta_metrics'
- },
- this.pushCallback
- );
- }
-
-
- /**
* Handle push error
*
* @param error - Any errors that occurred
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
index fecef2d..da9a553 100644
--- a/test/system/DeltaPublisherTest.ts
+++ b/test/system/DeltaPublisherTest.ts
@@ -20,10 +20,8 @@
*/
import { EventDispatcher } from '../../src/system/event/EventDispatcher';
-import {
- DeltaPublisher as Sut,
- AmqpConfig
-} from "../../src/system/DeltaPublisher";
+import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
+import { AmqpConfig } from '../../src/system/AmqpPublisher';
import { expect, use as chai_use } from 'chai';
import { EventEmitter } from "events";