Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:42:03 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 12:42:03 -0500
commit5ee9a5d3409c07e59644a5680a818c2c301eba77 (patch)
tree45aa911b4231ab5185e9ab3e8168af932a8da99d /src
parente781a841b177cdb330c3ae4ec453ec4010ce8470 (diff)
downloadliza-5ee9a5d3409c07e59644a5680a818c2c301eba77.tar.gz
liza-5ee9a5d3409c07e59644a5680a818c2c301eba77.tar.bz2
liza-5ee9a5d3409c07e59644a5680a818c2c301eba77.zip
[DEV-5312] Generalize event subscribers and dispatchers
Diffstat (limited to 'src')
-rw-r--r--src/system/DeltaLogger.ts12
-rw-r--r--src/system/DeltaProcessor.ts24
-rw-r--r--src/system/DeltaPublisher.ts26
-rw-r--r--src/system/MetricsCollector.ts54
-rw-r--r--src/system/db/DeltaDao.ts8
-rw-r--r--src/system/db/MongoDeltaDao.ts10
-rw-r--r--src/system/event/EventDispatcher.ts44
-rw-r--r--src/system/event/EventSubscriber.ts44
8 files changed, 53 insertions, 169 deletions
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
index 3d06157..53161b1 100644
--- a/src/system/DeltaLogger.ts
+++ b/src/system/DeltaLogger.ts
@@ -21,7 +21,7 @@
* Logger for delta events
*/
-import { EventSubscriber } from "./event/EventSubscriber";
+import { EventEmitter } from "events";
enum LogLevel {
DEBUG,
@@ -47,13 +47,13 @@ export class DeltaLogger
/**
* Initialize delta logger
*
- * @param _env - The environment ( dev, test, demo, live )
- * @param _subscriber - An event subscriber
- * @param _ts_ctr - a timestamp constructor
+ * @param _env - The environment ( dev, test, demo, live )
+ * @param _emitter - An event emitter
+ * @param _ts_ctr - a timestamp constructor
*/
constructor(
private readonly _env: string,
- private readonly _subscriber: EventSubscriber,
+ private readonly _emitter: EventEmitter,
private readonly _ts_ctr : () => UnixTimestamp,
) {
this.init();
@@ -86,7 +86,7 @@ export class DeltaLogger
{
const logF = this._getLogLevelFunction( level )
- this._subscriber.subscribe( event_id, logF );
+ this._emitter.on( event_id, logF );
}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index db5be5b..d7137bd 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
-import { EventDispatcher } from "./event/EventDispatcher";
+import { EventEmitter } from "events";
/**
* Process deltas for a quote and publish to a queue
@@ -41,14 +41,14 @@ export class DeltaProcessor
/**
* Initialize processor
*
- * @param _dao - Mongo collection
- * @param _publisher - Amqp Publisher
- * @param _dispatcher - Event dispatcher instance
+ * @param _dao - Mongo collection
+ * @param _publisher - Amqp Publisher
+ * @param _emitter - Event emiter instance
*/
constructor(
- private readonly _dao: DeltaDao,
- private readonly _publisher: AmqpPublisher,
- private readonly _dispatcher: EventDispatcher
+ private readonly _dao: DeltaDao,
+ private readonly _publisher: AmqpPublisher,
+ private readonly _emitter: EventEmitter
) {}
@@ -90,7 +90,7 @@ export class DeltaProcessor
// this document if there was an error
if ( error )
{
- self._dispatcher.dispatch(
+ self._emitter.emit(
'delta-process-error',
error
);
@@ -101,7 +101,7 @@ export class DeltaProcessor
{
const elapsedTime = process.hrtime( startTime );
- self._dispatcher.dispatch(
+ self._emitter.emit(
'delta-process-complete',
elapsedTime[ 1 ] / 10000
);
@@ -111,7 +111,7 @@ export class DeltaProcessor
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
- self._dispatcher.dispatch(
+ self._emitter.emit(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@@ -120,13 +120,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
- self._dispatcher.dispatch( 'mongodb-err', err );
+ self._emitter.emit( 'mongodb-err', err );
} );
} );
} )
.catch( err =>
{
- self._dispatcher.dispatch( 'mongodb-err', err );
+ self._emitter.emit( 'mongodb-err', err );
} );
}
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index 1fe9e3c..0a99ac9 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -23,7 +23,7 @@
import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
import { DeltaResult } from '../bucket/delta';
-import { EventDispatcher } from './event/EventDispatcher';
+import { EventEmitter } from "events";
import {
connect as amqpConnect,
Channel,
@@ -68,9 +68,9 @@ export class DeltaPublisher implements AmqpPublisher
* @param _ts_ctr - a timestamp constructor
*/
constructor(
- private readonly _conf: AmqpConfig,
- private readonly _dispatcher: EventDispatcher,
- private readonly _ts_ctr : () => UnixTimestamp,
+ private readonly _conf: AmqpConfig,
+ private readonly _emitter: EventEmitter,
+ private readonly _ts_ctr: () => UnixTimestamp,
) {
this._type = avro.parse( this.SCHEMA_PATH );
}
@@ -91,7 +91,7 @@ export class DeltaPublisher implements AmqpPublisher
// If there is an error, attemp to reconnect
this._conn.on( 'error', e =>
{
- this._dispatcher.dispatch( 'amqp-conn-error', e );
+ this._emitter.emit( 'amqp-conn-error', e );
let reconnect_interval: NodeJS.Timer;
@@ -103,7 +103,7 @@ export class DeltaPublisher implements AmqpPublisher
{
clearInterval( reconnect_interval );
- this._dispatcher.dispatch(
+ this._emitter.emit(
'amqp-reconnect-fail',
'Could not re-establish AMQP connection.'
);
@@ -111,7 +111,7 @@ export class DeltaPublisher implements AmqpPublisher
return;
}
- this._dispatcher.dispatch(
+ this._emitter.emit(
'amqp-reconnect',
'...attempting to re-establish AMQP connection'
);
@@ -121,14 +121,14 @@ export class DeltaPublisher implements AmqpPublisher
{
clearInterval( reconnect_interval );
- this._dispatcher.dispatch(
+ this._emitter.emit(
'amqp-reconnect',
'AMQP re-connected'
);
} )
.catch( e =>
{
- this._dispatcher.dispatch( 'amqp-conn-error', e );
+ this._emitter.emit( 'amqp-conn-error', e );
} );
}
@@ -188,7 +188,7 @@ export class DeltaPublisher implements AmqpPublisher
this.sendMessage( delta )
.then( _ =>
{
- this._dispatcher.dispatch(
+ this._emitter.emit(
'delta-publish',
"Published " + delta.type + " delta with ts '"
+ delta.timestamp + "' to '" + this._conf.exchange
@@ -200,7 +200,7 @@ export class DeltaPublisher implements AmqpPublisher
} )
.catch( e =>
{
- this._dispatcher.dispatch(
+ this._emitter.emit(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
+ delta.timestamp + '" to "' + this._conf.exchange
@@ -310,7 +310,7 @@ export class DeltaPublisher implements AmqpPublisher
{
if ( !this._type )
{
- this._dispatcher.dispatch(
+ this._emitter.emit(
'avro-err',
'No avro scheama found',
);
@@ -322,7 +322,7 @@ export class DeltaPublisher implements AmqpPublisher
}
catch( e )
{
- this._dispatcher.dispatch(
+ this._emitter.emit(
'avro-err',
'Error encoding data to avro: ' + e,
);
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 4f38c46..2976858 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -21,22 +21,14 @@
* Collect Metrics for Prometheus
*/
-import { EventSubscriber } from "./event/EventSubscriber";
import { DeltaDao } from "./db/DeltaDao";
import { PositiveInteger } from "../numeric";
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
+import { EventEmitter } from "events";
const client = require( 'prom-client' );
-// declare type MetricStructure = {
-// path: string;
-// code: number;
-// service: string;
-// env: string;
-// }
-
-
export declare type PrometheusConfig = {
/** The hostname to connect to */
hostname: string;
@@ -85,19 +77,17 @@ export class MetricsCollector
* Initialize delta logger
*
* @param _conf - the prometheus configuration
- * @param _subscriber - the event subscriber
+ * @param _emitter - the event emitr
*/
constructor(
private readonly _conf: PrometheusConfig,
- private readonly _subscriber: EventSubscriber,
+ private readonly _emitter: EventEmitter,
) {
// Set labels
- const default_labels = {
+ client.register.setDefaultLabels( {
env: this._conf.env,
service: 'delta_processor',
- };
-
- client.register.setDefaultLabels( default_labels );
+ } );
// Create gateway
const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
@@ -140,25 +130,25 @@ export class MetricsCollector
);
// Subsribe metrics to events
- this.subscribeMetrics();
+ this.emitMetrics();
}
/**
- * Subscribe metrics
+ * emit metrics
*/
- private subscribeMetrics()
+ private emitMetrics()
{
- this._subscriber.subscribe(
+ this._emitter.on(
'delta-process-complete',
- ( val ) =>
+ ( val: any ) =>
{
this._process_time_hist.observe( val );
this._process_delta_count.inc();
}
);
- this._subscriber.subscribe(
+ this._emitter.on(
'delta-process-error',
( _ ) => this._process_error_count.inc()
);
@@ -178,29 +168,11 @@ export class MetricsCollector
_body?: any
): void
{
- // console.log( 'Push callback' );
- // console.error( error, response, body );
+ console.log( 'Push callback' );
+ console.error( _error );
}
- /**
- * Get structured metric object
- *
- * @param path - the endpoint being hit
- * @param code - the response code
- *
- * @returns a structured logging object
- */
- // private _formatMetricVal( label: string, val: any ): MetricStructure
- // {
- // return <MetricStructure>{
- // path: path,
- // code: code,
- // service: 'quote-server',
- // env: this._conf.env,
- // };
- // }
-
/**
* Look for mongodb delta errors and update metrics if found
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index 173fcee..5ad09fc 100644
--- a/src/system/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -52,7 +52,7 @@ export interface DeltaDao
advanceDeltaIndex(
doc_id: DocumentId,
type: string,
- ): Promise<NullableError>
+ ): Promise<null>
/**
@@ -67,7 +67,7 @@ export interface DeltaDao
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
- ): Promise<NullableError>
+ ): Promise<null>
/**
@@ -77,7 +77,7 @@ export interface DeltaDao
*
* @return any errors that occurred
*/
- setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
+ setErrorFlag( doc_id: DocumentId ): Promise<null>
/**
@@ -85,6 +85,6 @@ export interface DeltaDao
*
* @return a count of the documents in an error state
*/
- getErrorCount(): Promise<number | Error>
+ getErrorCount(): Promise<number>
}
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 81816e0..81640c7 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -61,7 +61,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return any errors that occurred
*/
- init(): Promise<NullableError>
+ init(): Promise<null>
{
var dao = this;
@@ -177,7 +177,7 @@ export class MongoDeltaDao implements DeltaDao
advanceDeltaIndex(
doc_id: DocumentId,
type: MongoDeltaType,
- ): Promise<NullableError>
+ ): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@@ -217,7 +217,7 @@ export class MongoDeltaDao implements DeltaDao
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
- ): Promise<NullableError>
+ ): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@@ -248,7 +248,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return any errors that occurred
*/
- setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
+ setErrorFlag( doc_id: DocumentId ): Promise<null>
{
return new Promise( ( resolve, reject ) =>
{
@@ -277,7 +277,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return a count of the documents in an error state
*/
- getErrorCount(): Promise<number | Error>
+ getErrorCount(): Promise<number>
{
return new Promise( ( resolve, reject ) =>
{
diff --git a/src/system/event/EventDispatcher.ts b/src/system/event/EventDispatcher.ts
deleted file mode 100644
index 45a15b8..0000000
--- a/src/system/event/EventDispatcher.ts
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Event Dispatcher
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of liza.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * Dispatch events
- */
-
-import { EventEmitter } from "events";
-
-export class EventDispatcher extends EventEmitter
-{
- /**
- * Initialize dispatcher
- *
- * @param _emitter - the event emitter
- */
- constructor(
- private readonly _emitter: EventEmitter
- ) {
- super();
- }
-
-
- dispatch( event_id: string, arg: any ): void
- {
- this._emitter.emit( event_id, arg );
- }
-}
diff --git a/src/system/event/EventSubscriber.ts b/src/system/event/EventSubscriber.ts
deleted file mode 100644
index 2950460..0000000
--- a/src/system/event/EventSubscriber.ts
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Event Subscriber
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of liza.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * Subscribe to events
- */
-
-import { EventEmitter } from "events";
-
-export class EventSubscriber extends EventEmitter
-{
- /**
- * Initialize subscriber
- *
- * @param _emitter - the event emitter
- */
- constructor(
- private readonly _emitter: EventEmitter
- ) {
- super();
- }
-
-
- subscribe( event_id: string, callback: ( arg: any ) => void ): void
- {
- this._emitter.on( event_id, callback );
- }
-}