diff options
author | Austin Schaffer <austin.schaffer@ryansg.com> | 2019-11-25 12:26:39 -0500 |
---|---|---|
committer | Austin Schaffer <austin.schaffer@ryansg.com> | 2019-11-25 12:26:39 -0500 |
commit | e781a841b177cdb330c3ae4ec453ec4010ce8470 (patch) | |
tree | 6f7a1f062b4b5f32cf04d1eab184749ccb2a2288 /src | |
parent | faa7e15760d1136ef44b15a914c5dd582f86a903 (diff) | |
download | liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.gz liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.bz2 liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.zip |
[DEV-5312] Reconnect AMQP when connection drops
Diffstat (limited to 'src')
-rw-r--r-- | src/system/AmqpPublisher.ts | 33 | ||||
-rw-r--r-- | src/system/DeltaLogger.ts | 13 | ||||
-rw-r--r-- | src/system/DeltaPublisher.ts | 72 | ||||
-rw-r--r-- | src/system/MetricsCollector.ts | 29 |
4 files changed, 100 insertions, 47 deletions
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index 622d659..26b4949 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -26,8 +26,41 @@ import { Options } from 'amqplib'; export interface AmqpConfig extends Options.Connect { + /** The protocol to connect with (should always be "amqp") */ + protocol: string; + + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** A username if one if required */ + username?: string; + + /** A password if one if required */ + password?: string; + + /** Locale (should always be "en_US") */ + locale: string; + + /** The size in bytes of the maximum frame allowed */ + frameMax: number; + + /** How often to check for a live connection */ + heartBeat: number; + + /** The virtual host we are on (e.g. live, demo, test) */ + vhost?: string; + /** The name of a queue or exchange to publish to */ exchange: string; + + /** The number of times to retry connecting */ + retries: number; + + /** The time to wait in between retries */ + retry_wait: number; } diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts index f286387..3d06157 100644 --- a/src/system/DeltaLogger.ts +++ b/src/system/DeltaLogger.ts @@ -65,11 +65,14 @@ export class DeltaLogger */ init(): void { - this._registerEvent( 'document-processed', LogLevel.NOTICE ); - this._registerEvent( 'delta-publish', LogLevel.NOTICE ); - this._registerEvent( 'avro-err', LogLevel.ERROR ); - this._registerEvent( 'mongodb-err', LogLevel.ERROR ); - this._registerEvent( 'publish-err', LogLevel.ERROR ); + this._registerEvent( 'document-processed', LogLevel.NOTICE ); + this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + this._registerEvent( 'amqp-conn-error', LogLevel.WARNING ); + this._registerEvent( 'amqp-reconnect', LogLevel.WARNING ); + this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR ); + this._registerEvent( 'avro-err', LogLevel.ERROR ); + this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + this._registerEvent( 'publish-err', LogLevel.ERROR ); } diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index c718756..1fe9e3c 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,23 +21,17 @@ * Publish delta message to a queue */ -import { AmqpPublisher } from './AmqpPublisher'; +import { AmqpPublisher, AmqpConfig } from './AmqpPublisher'; import { DeltaResult } from '../bucket/delta'; import { EventDispatcher } from './event/EventDispatcher'; import { connect as amqpConnect, - Options, Channel, Connection, } from 'amqplib'; const avro = require( 'avro-js' ); -export interface AmqpConfig extends Options.Connect { - /** The name of a queue or exchange to publish to */ - exchange: string; -} - export interface AvroSchema { /** Write data to a buffer */ @@ -94,6 +88,56 @@ export class DeltaPublisher implements AmqpPublisher { this._conn = conn; + // If there is an error, attemp to reconnect + this._conn.on( 'error', e => + { + this._dispatcher.dispatch( 'amqp-conn-error', e ); + + let reconnect_interval: NodeJS.Timer; + + let retry_count = 0; + + const reconnect = () => + { + if ( ++retry_count >= this._conf.retries ) + { + clearInterval( reconnect_interval ); + + this._dispatcher.dispatch( + 'amqp-reconnect-fail', + 'Could not re-establish AMQP connection.' + ); + + return; + } + + this._dispatcher.dispatch( + 'amqp-reconnect', + '...attempting to re-establish AMQP connection' + ); + + this.connect() + .then( _ => + { + clearInterval( reconnect_interval ); + + this._dispatcher.dispatch( + 'amqp-reconnect', + 'AMQP re-connected' + ); + } ) + .catch( e => + { + this._dispatcher.dispatch( 'amqp-conn-error', e ); + } ); + } + + reconnect_interval = setInterval( + reconnect, + ( this._conf.retry_wait * 1000 ) + ); + } ); + return this._conn.createChannel(); } ) .then( ( ch: Channel ) => @@ -141,8 +185,6 @@ export class DeltaPublisher implements AmqpPublisher { return new Promise<NullableError>( ( resolve, reject ) => { - const startTime = process.hrtime(); - this.sendMessage( delta ) .then( _ => { @@ -153,8 +195,6 @@ export class DeltaPublisher implements AmqpPublisher + '" exchange', ); - console.log('#publish: ' - + process.hrtime( startTime )[0] / 10000 ); resolve(); return; } ) @@ -184,13 +224,9 @@ export class DeltaPublisher implements AmqpPublisher { return new Promise<NullableError>( ( resolve, reject ) => { - const startTime = process.hrtime(); - const ts = this._ts_ctr(); const headers = { version: 1, created: ts }; const delta_data = this.avroFormat( delta.data ); - console.log('#sendmessage 1: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); const event_id = this.DELTA_MAP[ delta.type ]; const avro_buffer = this.avroEncode( { event: { @@ -223,8 +259,6 @@ export class DeltaPublisher implements AmqpPublisher }, }, } ); - console.log('#sendmessage 2: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); if ( !this._conn ) { @@ -241,8 +275,6 @@ export class DeltaPublisher implements AmqpPublisher reject( 'Error sending message: No avro buffer' ); return; } - console.log('#sendmessage 3: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); // we don't use a routing key; fanout exchange const published_successfully = this._channel.publish( @@ -254,8 +286,6 @@ export class DeltaPublisher implements AmqpPublisher if ( published_successfully ) { - console.log('#sendmessage 4: ' - + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms'); resolve(); return; } diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts index 2d7ca24..4f38c46 100644 --- a/src/system/MetricsCollector.ts +++ b/src/system/MetricsCollector.ts @@ -130,7 +130,14 @@ export class MetricsCollector } ); // Push metrics on a specific intervals - setInterval( () => { this.pushMetrics(); }, this._push_interval_ms ); + setInterval( + () => + { + this._gateway.pushAdd( + { jobName: 'liza_delta_metrics' }, this.pushCallback + ); + }, this._push_interval_ms + ); // Subsribe metrics to events this.subscribeMetrics(); @@ -146,7 +153,6 @@ export class MetricsCollector 'delta-process-complete', ( val ) => { - console.log( 'Got time: ' + val + 'ms' ); this._process_time_hist.observe( val ); this._process_delta_count.inc(); } @@ -160,25 +166,6 @@ export class MetricsCollector /** - * Push metrics to Prometheus PushGateway - */ - private pushMetrics(): void - { - // this._gateway.pushAdd( this._process_time_params, this.pushCallback ); - // this._gateway.pushAdd( this._process_error_params, this.pushCallback ); - // this._gateway.pushAdd( this._current_error_params, this.pushCallback ); - // this._gateway.pushAdd( this._process_delta_params, this.pushCallback ); - - this._gateway.pushAdd( - { - jobName: 'liza_delta_metrics' - }, - this.pushCallback - ); - } - - - /** * Handle push error * * @param error - Any errors that occurred |