Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:26:39 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:26:39 -0500
commite781a841b177cdb330c3ae4ec453ec4010ce8470 (patch)
tree6f7a1f062b4b5f32cf04d1eab184749ccb2a2288 /src
parentfaa7e15760d1136ef44b15a914c5dd582f86a903 (diff)
downloadliza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.gz
liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.tar.bz2
liza-e781a841b177cdb330c3ae4ec453ec4010ce8470.zip
[DEV-5312] Reconnect AMQP when connection drops
Diffstat (limited to 'src')
-rw-r--r--src/system/AmqpPublisher.ts33
-rw-r--r--src/system/DeltaLogger.ts13
-rw-r--r--src/system/DeltaPublisher.ts72
-rw-r--r--src/system/MetricsCollector.ts29
4 files changed, 100 insertions, 47 deletions
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index 622d659..26b4949 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -26,8 +26,41 @@ import { Options } from 'amqplib';
export interface AmqpConfig extends Options.Connect {
+ /** The protocol to connect with (should always be "amqp") */
+ protocol: string;
+
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** A username if one if required */
+ username?: string;
+
+ /** A password if one if required */
+ password?: string;
+
+ /** Locale (should always be "en_US") */
+ locale: string;
+
+ /** The size in bytes of the maximum frame allowed */
+ frameMax: number;
+
+ /** How often to check for a live connection */
+ heartBeat: number;
+
+ /** The virtual host we are on (e.g. live, demo, test) */
+ vhost?: string;
+
/** The name of a queue or exchange to publish to */
exchange: string;
+
+ /** The number of times to retry connecting */
+ retries: number;
+
+ /** The time to wait in between retries */
+ retry_wait: number;
}
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
index f286387..3d06157 100644
--- a/src/system/DeltaLogger.ts
+++ b/src/system/DeltaLogger.ts
@@ -65,11 +65,14 @@ export class DeltaLogger
*/
init(): void
{
- this._registerEvent( 'document-processed', LogLevel.NOTICE );
- this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- this._registerEvent( 'avro-err', LogLevel.ERROR );
- this._registerEvent( 'mongodb-err', LogLevel.ERROR );
- this._registerEvent( 'publish-err', LogLevel.ERROR );
+ this._registerEvent( 'document-processed', LogLevel.NOTICE );
+ this._registerEvent( 'delta-publish', LogLevel.NOTICE );
+ this._registerEvent( 'amqp-conn-error', LogLevel.WARNING );
+ this._registerEvent( 'amqp-reconnect', LogLevel.WARNING );
+ this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR );
+ this._registerEvent( 'avro-err', LogLevel.ERROR );
+ this._registerEvent( 'mongodb-err', LogLevel.ERROR );
+ this._registerEvent( 'publish-err', LogLevel.ERROR );
}
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index c718756..1fe9e3c 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -21,23 +21,17 @@
* Publish delta message to a queue
*/
-import { AmqpPublisher } from './AmqpPublisher';
+import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
import { DeltaResult } from '../bucket/delta';
import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
- Options,
Channel,
Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
-export interface AmqpConfig extends Options.Connect {
- /** The name of a queue or exchange to publish to */
- exchange: string;
-}
-
export interface AvroSchema {
/** Write data to a buffer */
@@ -94,6 +88,56 @@ export class DeltaPublisher implements AmqpPublisher
{
this._conn = conn;
+ // If there is an error, attemp to reconnect
+ this._conn.on( 'error', e =>
+ {
+ this._dispatcher.dispatch( 'amqp-conn-error', e );
+
+ let reconnect_interval: NodeJS.Timer;
+
+ let retry_count = 0;
+
+ const reconnect = () =>
+ {
+ if ( ++retry_count >= this._conf.retries )
+ {
+ clearInterval( reconnect_interval );
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect-fail',
+ 'Could not re-establish AMQP connection.'
+ );
+
+ return;
+ }
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect',
+ '...attempting to re-establish AMQP connection'
+ );
+
+ this.connect()
+ .then( _ =>
+ {
+ clearInterval( reconnect_interval );
+
+ this._dispatcher.dispatch(
+ 'amqp-reconnect',
+ 'AMQP re-connected'
+ );
+ } )
+ .catch( e =>
+ {
+ this._dispatcher.dispatch( 'amqp-conn-error', e );
+ } );
+ }
+
+ reconnect_interval = setInterval(
+ reconnect,
+ ( this._conf.retry_wait * 1000 )
+ );
+ } );
+
return this._conn.createChannel();
} )
.then( ( ch: Channel ) =>
@@ -141,8 +185,6 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
- const startTime = process.hrtime();
-
this.sendMessage( delta )
.then( _ =>
{
@@ -153,8 +195,6 @@ export class DeltaPublisher implements AmqpPublisher
+ '" exchange',
);
- console.log('#publish: '
- + process.hrtime( startTime )[0] / 10000 );
resolve();
return;
} )
@@ -184,13 +224,9 @@ export class DeltaPublisher implements AmqpPublisher
{
return new Promise<NullableError>( ( resolve, reject ) =>
{
- const startTime = process.hrtime();
-
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
const delta_data = this.avroFormat( delta.data );
- console.log('#sendmessage 1: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
const event_id = this.DELTA_MAP[ delta.type ];
const avro_buffer = this.avroEncode( {
event: {
@@ -223,8 +259,6 @@ export class DeltaPublisher implements AmqpPublisher
},
},
} );
- console.log('#sendmessage 2: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
if ( !this._conn )
{
@@ -241,8 +275,6 @@ export class DeltaPublisher implements AmqpPublisher
reject( 'Error sending message: No avro buffer' );
return;
}
- console.log('#sendmessage 3: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
// we don't use a routing key; fanout exchange
const published_successfully = this._channel.publish(
@@ -254,8 +286,6 @@ export class DeltaPublisher implements AmqpPublisher
if ( published_successfully )
{
- console.log('#sendmessage 4: '
- + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
resolve();
return;
}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 2d7ca24..4f38c46 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -130,7 +130,14 @@ export class MetricsCollector
} );
// Push metrics on a specific intervals
- setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
+ setInterval(
+ () =>
+ {
+ this._gateway.pushAdd(
+ { jobName: 'liza_delta_metrics' }, this.pushCallback
+ );
+ }, this._push_interval_ms
+ );
// Subsribe metrics to events
this.subscribeMetrics();
@@ -146,7 +153,6 @@ export class MetricsCollector
'delta-process-complete',
( val ) =>
{
- console.log( 'Got time: ' + val + 'ms' );
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
@@ -160,25 +166,6 @@ export class MetricsCollector
/**
- * Push metrics to Prometheus PushGateway
- */
- private pushMetrics(): void
- {
- // this._gateway.pushAdd( this._process_time_params, this.pushCallback );
- // this._gateway.pushAdd( this._process_error_params, this.pushCallback );
- // this._gateway.pushAdd( this._current_error_params, this.pushCallback );
- // this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
-
- this._gateway.pushAdd(
- {
- jobName: 'liza_delta_metrics'
- },
- this.pushCallback
- );
- }
-
-
- /**
* Handle push error
*
* @param error - Any errors that occurred