Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.env8
-rw-r--r--bin/delta-processor.ts301
-rw-r--r--src/bucket/delta.ts56
-rw-r--r--src/server/Server.js2
-rw-r--r--src/server/db/MongoServerDao.ts27
-rw-r--r--src/system/AmqpPublisher.ts42
-rw-r--r--src/system/DeltaProcessor.ts87
-rw-r--r--src/system/DeltaPublisher.ts385
-rw-r--r--src/system/EventLogger.ts151
-rw-r--r--src/system/EventMediator.ts88
-rw-r--r--src/system/MetricsCollector.ts58
-rw-r--r--src/system/PrometheusFactory.ts36
-rw-r--r--src/system/PsrLogger.ts117
-rw-r--r--src/system/StandardLogger.ts198
-rw-r--r--src/system/amqp/AmqpConnection.ts154
-rw-r--r--src/system/avro/AvroFactory.ts90
-rw-r--r--src/system/avro/schema.avsc25
-rw-r--r--src/system/db/DeltaDao.ts7
-rw-r--r--src/system/db/MongoDeltaDao.ts33
-rw-r--r--src/system/db/MongoFactory.ts176
-rw-r--r--src/types/mongodb.d.ts19
-rw-r--r--test/server/db/MongoServerDaoTest.js157
-rw-r--r--test/server/db/MongoServerDaoTest.ts7
-rw-r--r--test/system/DeltaProcessorTest.ts312
-rw-r--r--test/system/DeltaPublisherTest.ts221
-rw-r--r--test/system/EventLoggerTest.ts103
-rw-r--r--test/system/EventMediatorTest.ts139
-rw-r--r--test/system/MetricsCollectorTest.ts26
-rw-r--r--test/system/StandardLoggerTest.ts178
29 files changed, 2074 insertions, 1129 deletions
diff --git a/.env b/.env
index acb0b79..0bf93e2 100644
--- a/.env
+++ b/.env
@@ -1,15 +1,15 @@
NODE_ENV=dev
amqp_hostname=localhost
amqp_port=5672
-amqp_username=quote_referral
+amqp_username=
amqp_password=
amqp_frameMax=0
amqp_heartbeat=2
-amqp_vhost=quote
-amqp_exchange=quoteupdate
+amqp_vhost=
+amqp_exchange=
amqp_retries=30
amqp_retry_wait=1
-prom_hostname=dmz2docker01.rsgcorp.local
+prom_hostname=
prom_port=9091
prom_push_interval_ms=5000
process_interval_ms=2000
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index 40842de..522b98b 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -18,34 +18,43 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-import fs = require( 'fs' );
-
-import { AmqpConfig } from '../src/system/AmqpPublisher';
+import { createAmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor';
import { DeltaPublisher } from '../src/system/DeltaPublisher';
-import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb';
-import { EventLogger } from '../src/system/EventLogger';
+import { MongoCollection } from '../src/types/mongodb';
+import { createAvroEncoder } from '../src/system/avro/AvroFactory';
+import {
+ createMongoConfig,
+ createMongoDB,
+ getMongoCollection,
+} from '../src/system/db/MongoFactory';
+import { EventMediator } from '../src/system/EventMediator';
import { EventEmitter } from 'events';
-import { PrometheusFactory } from '../src/system/PrometheusFactory';
+import { StandardLogger } from '../src/system/StandardLogger';
+import { MetricsCollector } from '../src/system/MetricsCollector';
import {
- MetricsCollector,
- PrometheusConfig,
-} from '../src/system/MetricsCollector';
+ PrometheusFactory,
+ createPrometheusConfig,
+} from '../src/system/PrometheusFactory';
+import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
-const {
- Db: MongoDb,
- Server: MongoServer,
- ReplServers: ReplSetServers,
-} = require( 'mongodb' );
-const amqp_conf = _getAmqpConfig( process.env );
-const db_conf = _getMongoConfig( process.env );
-const prom_conf = _getPrometheusConfig( process.env );
+const amqp_conf = createAmqpConfig( process.env );
+const prom_conf = createPrometheusConfig( process.env );
+const db_conf = createMongoConfig( process.env );
+const db = createMongoDB( db_conf );
+const process_interval_ms = +( process.env.process_interval_ms || 2000 );
const env = process.env.NODE_ENV || 'Unknown Environment';
-const process_interval_ms = +(process.env.process_interval_ms || 2000);
const emitter = new EventEmitter();
-const db = _createDB( db_conf );
+const log = new StandardLogger( console, ts_ctr, env );
+const amqp_connection = new AmqpConnection( amqp_conf, emitter );
+const publisher = new DeltaPublisher(
+ emitter,
+ ts_ctr,
+ createAvroEncoder,
+ amqp_connection,
+);
// Prometheus Metrics
const prom_factory = new PrometheusFactory();
@@ -57,67 +66,47 @@ const metrics = new MetricsCollector(
);
// Structured logging
-new EventLogger( console, env, emitter, ts_ctr );
+new EventMediator( log, emitter );
let process_interval: NodeJS.Timer;
let dao: MongoDeltaDao;
-let publisher: DeltaPublisher;
-let processor: DeltaProcessor;
-_getMongoCollection( db, db_conf )
+getMongoCollection( db, db_conf )
.then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } )
- .then( ( mongoDao: MongoDeltaDao ) =>
- {
- dao = mongoDao;
- publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
- processor = new DeltaProcessor( mongoDao, publisher, emitter );
- } )
- .then( _ => publisher.connect() )
+ .then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } )
+ .then( _ => amqp_connection.connect() )
.then( _ =>
- {
- const pidPath = __dirname + '/../conf/.delta_processor.pid';
+ {
+ log.info( 'Liza Delta Processor' );
- writePidFile(pidPath );
- greet( 'Liza Delta Processor', pidPath );
+ handleShutdown();
- process_interval = setInterval( () =>
+ const processor = new DeltaProcessor( dao, publisher, emitter );
+
+ process_interval = setInterval( () =>
{
processor.process();
- metrics.checkForErrors( dao );
+
+ dao.getErrorCount()
+ .then( count => { metrics.updateErrorCount( count ) } );
},
process_interval_ms,
);
} )
- .catch( e => { console.error( 'Error: ' + e ); } );
-
-
-/**
- * Output greeting
- *
- * The greeting contains the program name and PID file path.
- *
- * @param name - program name
- * @param pid_path - path to PID file
- */
-function greet( name: string, pid_path: string ): void
-{
- console.log( `${name}`);
- console.log( `PID file: ${pid_path}` );
-}
+ .catch( e =>
+ {
+ log.error( e );
+ process.exit( 1 );
+ } );
/**
- * Write process id (PID) file
- *
- * @param pid_path - path to pid file
+ * Hook shutdown events
*/
-function writePidFile( pid_path: string ): void
+function handleShutdown(): void
{
- fs.writeFileSync( pid_path, process.pid );
-
process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } )
- .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } )
- .on( 'exit', () => { fs.unlink( pid_path, () => {} ); } );
+ .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } );
}
@@ -128,12 +117,12 @@ function writePidFile( pid_path: string ): void
*/
function shutdown( signal: string ): void
{
- console.log( 'Received ' + signal + '. Beginning graceful shutdown:' );
- console.log( '...Stopping processing interval' );
+ log.info( 'Received ' + signal + '. Beginning graceful shutdown:' );
+ log.info( '...Stopping processing interval' );
clearInterval( process_interval );
- console.log( '...Closing MongoDb connection' );
+ log.info( '...Closing MongoDb connection' );
db.close( ( err, _data ) =>
{
@@ -143,11 +132,15 @@ function shutdown( signal: string ): void
}
} );
- console.log( '...Closing AMQP connection...' );
+ log.info( '...Closing AMQP connection...' );
+
+ amqp_connection.close();
+
+ log.info( '...Stopping the metrics collector...' );
- publisher.close();
+ metrics.stop();
- console.log( 'Shutdown complete. Exiting.' );
+ log.info( 'Shutdown complete. Exiting.' );
process.exit();
}
@@ -161,179 +154,3 @@ function ts_ctr(): UnixTimestamp
{
return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
}
-
-
-/**
- * Create the database connection
- *
- * @param conf - the configuration from the environment
- *
- * @return the mongodb connection
- */
-function _createDB( conf: MongoDbConfig ): MongoDb
-{
- if( conf.ha )
- {
- var mongodbPort = conf.port || 27017;
- var mongodbReplSet = conf.replset || 'rs0';
- var dbServers = new ReplSetServers(
- [
- new MongoServer( conf.host_a, conf.port_a || mongodbPort),
- new MongoServer( conf.host_b, conf.port_b || mongodbPort),
- ],
- {rs_name: mongodbReplSet, auto_reconnect: true}
- );
- }
- else
- {
- var dbServers = new MongoServer(
- conf.host || '127.0.0.1',
- conf.port || 27017,
- {auto_reconnect: true}
- );
- }
- var db = new MongoDb(
- 'program',
- dbServers,
- {native_parser: false, safe: false}
- );
- return db;
-}
-
-
-/**
- * Attempts to connect to the database
- *
- * connectError event will be emitted on failure.
- *
- * @return any errors that occurred
- */
-function _getMongoCollection(
- db: MongoDb,
- conf: MongoDbConfig
-): Promise<MongoCollection>
-{
- return new Promise( ( resolve, reject ) =>
- {
- // attempt to connect to the database
- db.open( ( e: any, db: any ) =>
- {
- // if there was an error, don't bother with anything else
- if ( e )
- {
- // in some circumstances, it may just be telling us that
- // we're already connected (even though the connection may
- // have been broken)
- if ( e.errno !== undefined )
- {
- reject( 'Error opening mongo connection: ' + e );
- return;
- }
- } else if ( db == null )
- {
- reject( 'No database connection' );
- return;
- }
-
- // quotes collection
- db.collection(
- conf.collection,
- ( e: any, collection: MongoCollection ) =>
- {
- if ( e )
- {
- reject( 'Error creating collection: ' + e );
- return;
- }
-
- // initialize indexes
- collection.createIndex(
- [
- ['published', 1],
- ['deltaError', 1],
- ],
- true,
- ( e: any, _index: { [P: string]: any } ) =>
- {
- if ( e )
- {
- reject( 'Error creating index: ' + e );
- return;
- }
-
- resolve( collection );
- return;
- }
- );
- }
- );
- } );
- } );
-}
-
-
-/**
- * Create a mongodb configuration from the environment
- *
- * @param env - the environment variables
- *
- * @return the mongo configuration
- */
-function _getMongoConfig( env: any ): MongoDbConfig
-{
- return <MongoDbConfig>{
- 'port': +( env.MONGODB_PORT || 0 ),
- 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1,
- 'replset': env.LIZA_MONGODB_REPLSET,
- 'host': env.MONGODB_HOST,
- 'host_a': env.LIZA_MONGODB_HOST_A,
- 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ),
- 'host_b': env.LIZA_MONGODB_HOST_B,
- 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ),
- 'collection': 'quotes',
- };
-}
-
-
-/**
- * Create an amqp configuration from the environment
- *
- * @param env - the environment variables
- *
- * @return the amqp configuration
- */
-function _getAmqpConfig( env: any ): AmqpConfig
-{
- return <AmqpConfig>{
- 'protocol': 'amqp',
- 'hostname': env.amqp_hostname,
- 'port': +( env.amqp_port || 0 ),
- 'username': env.amqp_username,
- 'password': env.amqp_password,
- 'locale': 'en_US',
- 'frameMax': env.amqp_frameMax,
- 'heartbeat': env.amqp_heartbeat,
- 'vhost': env.amqp_vhost,
- 'exchange': env.amqp_exchange,
- 'retries': env.amqp_retries || 30,
- 'retry_wait': env.amqp_retry_wait || 1000,
- };
-}
-
-
-/**
- * Create a prometheus configuration from the environment
- *
- * @param env - the environment variables
- *
- * @return the prometheus configuration
- */
-function _getPrometheusConfig( env: any ): PrometheusConfig
-{
- return <PrometheusConfig>{
- 'hostname': env.prom_hostname,
- 'port': +( env.prom_port || 0 ),
- 'env': process.env.NODE_ENV,
- 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
- };
-} \ No newline at end of file
diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts
index ef2ab77..b83a8e7 100644
--- a/src/bucket/delta.ts
+++ b/src/bucket/delta.ts
@@ -18,16 +18,21 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+import { DocumentId } from '../document/Document';
+
/** The data structure expected for a document's internal key/value store */
export type Kv<T = any> = Record<string, T[]>;
+
/** Possible delta values for Kv array indexes */
export type DeltaDatum<T> = T | null | undefined;
+
/** Possible delta types */
export type DeltaType = 'ratedata' | 'data';
+
/**
* The constructor type for a delta generating function
*
@@ -46,6 +51,53 @@ export type DeltaConstructor<T = any, U extends Kv<T> = Kv<T>, V extends Kv<T> =
export type DeltaResult<T> = { [K in keyof T]: DeltaDatum<T[K]> | null };
+/** Complete delta type */
+export type Delta<T> = {
+ type: DeltaType,
+ timestamp: UnixTimestamp,
+ data: DeltaResult<T>,
+}
+
+
+/** Reverse delta type */
+export type ReverseDelta<T> = {
+ data: Delta<T>[],
+ ratedata: Delta<T>[],
+}
+
+
+/** Structure for Published delta count */
+export type PublishDeltaCount = {
+ data?: number,
+ ratedata?: number,
+}
+
+
+/**
+ * Document structure
+ */
+export interface DeltaDocument
+{
+ /** The document id */
+ id: DocumentId,
+
+ /** The time the document was updated */
+ lastUpdate: UnixTimestamp,
+
+ /** The data bucket */
+ data: Record<string, any>,
+
+ /** The rate data bucket */
+ ratedata?: Record<string, any>,
+
+ /** The calculated reverse deltas */
+ rdelta?: ReverseDelta<any>,
+
+ /** A count of how many of each delta type have been processed */
+ totalPublishDelta?: PublishDeltaCount,
+};
+
+
/**
* Create delta to transform from src into dest
*
@@ -105,7 +157,7 @@ export function createDelta<T, U extends Kv<T>, V extends Kv<T>>(
* @param bucket - The bucket data
* @param delta - The delta to apply
*
- * @return the delta
+ * @return the bucket with the delta applied
*/
export function applyDelta<T, U extends Kv<T>, V extends Kv<T>>(
bucket: U = <U>{},
@@ -164,7 +216,7 @@ export function applyDelta<T, U extends Kv<T>, V extends Kv<T>>(
* @param bucket - The bucket data array
* @param delta - The delta data array
*
- * @return an object with an changed flag and a data array
+ * @return the applied delta
*/
function _applyDeltaKey<T>(
bucket: T[],
diff --git a/src/server/Server.js b/src/server/Server.js
index 8651f9d..d839d7b 100644
--- a/src/server/Server.js
+++ b/src/server/Server.js
@@ -340,7 +340,6 @@ module.exports = Class( 'Server' )
.setImported( quote_data.importedInd || false )
.setBound( quote_data.boundInd || false )
.needsImport( quote_data.importDirty || false )
- .needsDeltaProcessing( quote_data.processed || true )
.setCurrentStepId(
quote_data.currentStepId
|| quote_program.getFirstStepId()
@@ -393,7 +392,6 @@ module.exports = Class( 'Server' )
importedInd: ( quote.isImported() ) ? 1 : 0,
boundInd: ( quote.isBound() ) ? 1 : 0,
importDirty: 0,
- published: 1,
syncInd: 0,
boundInd: 0,
notifyInd: 0,
diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts
index ef129b8..dd1df2d 100644
--- a/src/server/db/MongoServerDao.ts
+++ b/src/server/db/MongoServerDao.ts
@@ -20,7 +20,7 @@
*/
import { ServerDao, Callback } from "./ServerDao";
-import { MongoCollection, MongoUpdate } from "mongodb";
+import { MongoCollection, MongoUpdate, MongoDb } from "mongodb";
import { PositiveInteger } from "../../numeric";
import { ServerSideQuote } from "../quote/ServerSideQuote";
import { QuoteId } from "../../document/Document";
@@ -64,7 +64,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
* @param {Mongo.Db} db mongo database connection
*/
constructor(
- private readonly _db: any
+ private readonly _db: MongoDb
)
{
super();
@@ -86,7 +86,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
var dao = this;
// map db error event (on connection error) to our connectError event
- this._db.on( 'error', function( err: any )
+ this._db.on( 'error', function( err: Error )
{
dao._ready = false;
dao._collection = null;
@@ -165,7 +165,10 @@ export class MongoServerDao extends EventEmitter implements ServerDao
collection.createIndex(
[ ['id', 1] ],
true,
- function( _err: any, _index: { [P: string]: any } )
+ function(
+ _err: NullableError,
+ _index: { [P: string]: any,
+ } )
{
// mark the DAO as ready to be used
dao._collection = collection;
@@ -179,7 +182,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
db.collection(
dao.COLLECTION_SEQ,
function(
- err: any,
+ err: Error,
collection: MongoCollection,
) {
if ( err )
@@ -199,7 +202,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
collection.find(
{ _id: dao.SEQ_QUOTE_ID },
{ limit: <PositiveInteger>1 },
- function( err: any, cursor )
+ function( err: NullableError, cursor )
{
if ( err )
{
@@ -207,7 +210,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
return;
}
- cursor.toArray( function( _err: any, data: any[] )
+ cursor.toArray( function( _err: Error, data: any[] )
{
if ( data.length == 0 )
{
@@ -236,7 +239,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
_id: this.SEQ_QUOTE_ID,
val: this.SEQ_QUOTE_ID_DEFAULT,
},
- function( err: any, _docs: any )
+ function( err: NullableError, _docs: any )
{
if ( err )
{
@@ -467,8 +470,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao
*/
saveQuoteState(
quote: ServerSideQuote,
- success_callback: any,
- failure_callback: any,
+ success_callback: Callback,
+ failure_callback: Callback,
)
{
var update = {
@@ -486,8 +489,8 @@ export class MongoServerDao extends EventEmitter implements ServerDao
saveQuoteClasses(
quote: ServerSideQuote,
classes: any,
- success: any,
- failure: any,
+ success: Callback,
+ failure: Callback,
)
{
return this.mergeData(
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index 12f521c..1f728b3 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -26,6 +26,32 @@ import { DocumentId } from '../document/Document';
import { Options } from 'amqplib';
+/**
+ * Create an amqp configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the amqp configuration
+ */
+export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
+{
+ return <AmqpConfig>{
+ protocol: 'amqp',
+ hostname: env.amqp_hostname,
+ port: +( env.amqp_port || 0 ),
+ username: env.amqp_username,
+ password: env.amqp_password,
+ locale: 'en_US',
+ frameMax: +( env.amqp_frameMax || 0 ),
+ heartbeat: +( env.amqp_heartbeat || 0 ),
+ vhost: env.amqp_vhost,
+ exchange: env.amqp_exchange,
+ retries: env.amqp_retries || 30,
+ retry_wait: env.amqp_retry_wait || 1000,
+ };
+}
+
+
export interface AmqpConfig extends Options.Connect {
/** The protocol to connect with (should always be "amqp") */
protocol: string;
@@ -49,7 +75,7 @@ export interface AmqpConfig extends Options.Connect {
frameMax: number;
/** How often to check for a live connection */
- heartBeat: number;
+ heartbeat: number;
/** The virtual host we are on (e.g. live, demo, test) */
vhost?: string;
@@ -70,13 +96,15 @@ export interface AmqpPublisher
/**
* Publish quote message to exchange post-rating
*
- * @param delta - The delta
- * @param bucket - The bucket
- * @param doc_id - The doc_id
+ * @param doc_id - The doc_id
+ * @param delta - The delta
+ * @param bucket - The bucket
+ * @param ratedata - The rate data bucket
*/
publish(
- delta: DeltaResult<any>,
- bucket: Record<string, any>,
- doc_id: DocumentId,
+ doc_id: DocumentId,
+ delta: DeltaResult<any>,
+ bucket: Record<string, any>,
+ ratedata?: Record<string, any>,
): Promise<void>
}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index d8da5cb..b136f80 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -20,10 +20,16 @@
*/
import { DeltaDao } from "../system/db/DeltaDao";
-import { DeltaResult, DeltaType, applyDelta } from "../bucket/delta";
import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
import { EventEmitter } from "events";
+import {
+ DeltaType,
+ applyDelta,
+ DeltaDocument,
+ Delta,
+ ReverseDelta,
+} from "../bucket/delta";
/**
@@ -58,81 +64,84 @@ export class DeltaProcessor
process(): Promise<void>
{
return this._dao.getUnprocessedDocuments()
- .then( docs => this._processNext( docs ) )
- .catch( err => { this._emitter.emit( 'dao-err', err ) } );
+ .then( docs => this._processNext( docs ) );
}
- private _processNext( docs: any ): Promise<void>
+ private _processNext( docs: DeltaDocument[] ): Promise<void>
{
- if ( docs.length === 0 )
+ const doc = docs.shift();
+
+ if ( !doc )
{
return Promise.resolve();
}
- const doc = docs.shift();
-
return this._processDocument( doc )
- .then( _ => this._processNext( docs ) );
+ .then( _ => this._processNext( docs ) )
}
- private _processDocument( doc: Record<string, any> ): Promise<void>
+ private _processDocument( doc: DeltaDocument ): Promise<void>
{
- const deltas = this.getTimestampSortedDeltas( doc );
- const doc_id: DocumentId = doc.id;
- const bucket = doc.data;
- const last_updated_ts = doc.lastUpdate;
+ const deltas = this.getTimestampSortedDeltas( doc );
+ const doc_id = doc.id;
+ const bucket = doc.data;
+ const ratedata = doc.ratedata;
+ const last_updated_ts = doc.lastUpdate;
- return this._processNextDelta( deltas, bucket, doc_id )
+ return this._processNextDelta( doc_id, deltas, bucket, ratedata )
.then( _ =>
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
)
.then( _ =>
{
- this._emitter.emit(
- 'document-processed',
- 'Deltas on document ' + doc_id + ' processed '
- + 'successfully. Document has been marked as '
- + 'completely processed.'
- );
+ this._emitter.emit( 'document-processed', { doc_id: doc_id } );
} )
.catch( e =>
{
- this._emitter.emit( 'delta-err', e );
- this._dao.setErrorFlag( doc_id );
+ this._emitter.emit( 'error', e );
+ return this._dao.setErrorFlag( doc_id );
} );
}
private _processNextDelta(
- deltas: DeltaResult<any>[],
- bucket: Record<string, any>,
- doc_id: DocumentId,
+ doc_id: DocumentId,
+ deltas: Delta<any>[],
+ bucket: Record<string, any>,
+ ratedata?: Record<string, any>,
): Promise<void>
{
- if ( deltas.length === 0 )
- {
- return Promise.resolve();
- }
-
const delta = deltas.shift();
if ( !delta )
{
- return Promise.reject( new Error( 'Undefined delta' ) );
+ return Promise.resolve();
}
const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
this._emitter.emit( 'delta-process-start', delta_uid );
- const new_bucket = applyDelta( bucket, delta.data );
+ if ( delta.type == this.DELTA_DATA )
+ {
+ bucket = applyDelta( bucket, delta.data );
+ }
+ else
+ {
+ ratedata = applyDelta( ratedata, delta.data );
+ }
- return this._publisher.publish( delta, new_bucket, doc_id )
+ return this._publisher.publish( doc_id, delta, bucket, ratedata )
.then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
- .then( _ => this._processNextDelta( deltas, new_bucket, doc_id ) );
+ .then( _ => this._processNextDelta(
+ doc_id,
+ deltas,
+ bucket,
+ ratedata
+ ) );
}
@@ -144,7 +153,7 @@ export class DeltaProcessor
*
* @return a list of deltas sorted by timestamp
*/
- getTimestampSortedDeltas( doc: any ): DeltaResult<any>[]
+ getTimestampSortedDeltas( doc: DeltaDocument ): Delta<any>[]
{
const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA );
const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA );
@@ -164,10 +173,10 @@ export class DeltaProcessor
*
* @return a trimmed list of deltas
*/
- getDeltas( doc: any, type: DeltaType ): DeltaResult<any>[]
+ getDeltas( doc: DeltaDocument, type: DeltaType ): Delta<any>[]
{
- const deltas_obj = doc.rdelta || {};
- const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
+ const deltas_obj = doc.rdelta || <ReverseDelta<any>>{};
+ const deltas: Delta<any>[] = deltas_obj[ type ] || [];
// Get type specific delta index
let published_count = 0;
@@ -197,7 +206,7 @@ export class DeltaProcessor
*
* @return a sort value
*/
- private _sortByTimestamp( a: DeltaResult<any>, b: DeltaResult<any> ): number
+ private _sortByTimestamp( a: Delta<any>, b: Delta<any> ): number
{
if ( a.timestamp < b.timestamp )
{
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index a5b68b8..94c925c 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -21,37 +21,22 @@
* Publish delta message to a queue
*/
-import { AmqpPublisher, AmqpConfig } from './AmqpPublisher';
-import { DeltaResult } from '../bucket/delta';
+import { AmqpPublisher } from './AmqpPublisher';
+import { Delta } from '../bucket/delta';
import { EventEmitter } from "events";
import { DocumentId } from '../document/Document';
import { context } from '../error/ContextError';
import { AmqpError } from '../error/AmqpError';
-import {
- connect as amqpConnect,
- Channel,
- Connection,
-} from 'amqplib';
+import { AvroSchema, AvroEncoderCtr } from './avro/AvroFactory';
+import { AmqpConnection } from './amqp/AmqpConnection';
-const avro = require( 'avro-js' );
-
-
-export interface AvroSchema {
- /** Write data to a buffer */
- toBuffer( data: Record<string, any> ): Buffer | null;
-}
+const avro = require( 'avro-js' );
export class DeltaPublisher implements AmqpPublisher
{
- /** The amqp connection */
- private _conn?: Connection;
-
- /** The amqp channel */
- private _channel?: Channel;
-
/** The avro schema */
- private _type?: AvroSchema;
+ private _schema: AvroSchema;
/** The path to the avro schema */
readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
@@ -66,122 +51,45 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Delta publisher
*
- * @param _conf - amqp configuration
- * @param _emitter - event emitter instance
- * @param _ts_ctr - a timestamp constructor
+ * @param _emitter - event emitter instance
+ * @param _ts_ctr - a timestamp constructor
+ * @param _encoder_ctr - a factory function to create an avro encoder
+ * @param _conn - the amqp connection
*/
constructor(
- private readonly _conf: AmqpConfig,
- private readonly _emitter: EventEmitter,
- private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _emitter: EventEmitter,
+ private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _encoder_ctr: AvroEncoderCtr,
+ private readonly _conn: AmqpConnection,
) {
- this._type = avro.parse( this.SCHEMA_PATH );
- }
-
-
- /**
- * Initialize connection
- */
- connect(): Promise<void>
- {
- return amqpConnect( this._conf )
- .then( conn =>
- {
- this._conn = conn;
-
- // If there is an error, attempt to reconnect
- this._conn.on( 'error', e =>
- {
- this._emitter.emit( 'amqp-conn-error', e );
- this._reconnect();
- } );
-
- return this._conn.createChannel();
- } )
- .then( ( ch: Channel ) =>
- {
- this._channel = ch;
-
- this._channel.assertExchange(
- this._conf.exchange,
- 'fanout',
- { durable: true }
- );
- } );
- }
-
-
- /**
- * Attempt to re-establish the connection
- *
- * @return Whether reconnecting was successful
- */
- private _reconnect( retry_count: number = 0 ): void
- {
- if ( retry_count >= this._conf.retries )
- {
- this._emitter.emit(
- 'amqp-reconnect-fail',
- 'Could not re-establish AMQP connection.'
- );
-
- return;
- }
-
- this._emitter.emit(
- 'amqp-reconnect',
- '...attempting to re-establish AMQP connection'
- );
-
- this.connect()
- .then( _ =>
- {
- this._emitter.emit(
- 'amqp-reconnect',
- 'AMQP re-connected'
- );
- } )
- .catch( _ =>
- {
- const wait_ms = this._conf.retry_wait;
- setTimeout( () => this._reconnect( ++retry_count ), wait_ms );
- } );
- }
-
-
- /**
- * Close the amqp conenction
- */
- close(): void
- {
- if ( this._conn )
- {
- this._conn.close.bind(this._conn);
- }
+ this._schema = avro.parse( this.SCHEMA_PATH );
}
/**
* Publish quote message to exchange post-rating
*
- * @param delta - The delta
- * @param bucket - The bucket
- * @param doc_id - The doc_id
+ * @param doc_id - The doc_id
+ * @param delta - The delta
+ * @param bucket - The bucket
+ * @param ratedata - The ratedata bucket
*/
- publish(
- delta: DeltaResult<any>,
- bucket: Record<string, any>,
- doc_id: DocumentId,
+ publish(
+ doc_id: DocumentId,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any> = {},
): Promise<void>
{
- return this.sendMessage( delta, bucket, doc_id )
+ return this._sendMessage( doc_id, delta, bucket, ratedata )
.then( _ =>
{
this._emitter.emit(
'delta-publish',
- "Published " + delta.type + " delta with ts '"
- + delta.timestamp + "' to '" + this._conf.exchange
- + '" exchange',
+ {
+ delta: delta,
+ exchange: this._conn.getExchangeName(),
+ }
);
} );
}
@@ -190,131 +98,155 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Send message to exchange
*
- * @param delta - The delta to publish
- * @param bucket - The bucket
- * @param doc_id - The doc_id
+ * @param doc_id - The doc_id
+ * @param delta - The delta to publish
+ * @param bucket - The bucket
+ * @param ratedata - The ratedata bucket
*
* @return whether publish was successful
*/
- sendMessage(
- delta: DeltaResult<any>,
- bucket: Record<string, any>,
- doc_id: DocumentId,
+ private _sendMessage(
+ doc_id: DocumentId,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
): Promise<void>
{
- return new Promise<void>( ( resolve, reject ) =>
- {
- const ts = this._ts_ctr();
- const headers = { version: 1, created: ts };
- const avro_object = this.avroFormat( delta, bucket, doc_id, ts );
- const avro_buffer = this.avroEncode( avro_object );
+ const ts = this._ts_ctr();
+ const headers = { version: 1, created: ts };
+ const avro_object = this._avroFormat(
+ ts,
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ );
- if ( !this._conn )
- {
- reject( context (
- new AmqpError( 'Error sending message: No connection' ),
- {
- doc_id: doc_id,
- delta_type: delta.type,
- delta_ts: delta.ts,
- },
- ) );
- return;
- }
- else if ( !this._channel )
+ return this.avroEncode( avro_object )
+ .then( ( avro_buffer ) =>
{
- reject( context (
- new AmqpError( 'Error sending message: No channel' ),
- {
- doc_id: doc_id,
- delta_type: delta.type,
- delta_ts: delta.ts,
- },
- ) );
- return;
- }
- else if ( !avro_buffer )
- {
- reject( context (
- new Error( 'Error sending message: No avro buffer' ),
- {
- doc_id: doc_id,
- delta_type: delta.type,
- delta_ts: delta.ts,
- },
- ) );
- return;
- }
+ const channel = this._conn.getAmqpChannel();
- // we don't use a routing key; fanout exchange
- const published_successfully = this._channel.publish(
- this._conf.exchange,
- '',
- avro_buffer,
- { headers: headers },
- );
+ if ( !channel )
+ {
+ return Promise.reject( context (
+ new AmqpError( 'Error sending message: No channel' ),
+ {
+ doc_id: doc_id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp,
+ },
+ ) );
+ }
- if ( published_successfully )
- {
- resolve();
- return;
- }
+ // we don't use a routing key; fanout exchange
+ const published_successfully = channel.publish(
+ this._conn.getExchangeName(),
+ '',
+ avro_buffer,
+ { headers: headers },
+ );
- reject( context(
- new Error ( 'Error sending message: publishing failed' ),
+ if ( !published_successfully )
{
- doc_id: doc_id,
- delta_type: delta.type,
- delta_ts: delta.ts,
+ return Promise.reject( context(
+ new Error ( 'Delta publish failed' ),
+ {
+ doc_id: doc_id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp,
+ }
+ ) );
}
- ) );
- } );
+
+ return Promise.resolve();
+ } );
}
- avroFormat(
- delta: DeltaResult<any>,
- _bucket: Record<string, any>,
- doc_id: DocumentId,
- ts: UnixTimestamp,
+ /**
+ * Throw an error with specific information if the schema is invalid
+ *
+ * @param schema - Avro schema
+ * @param data - Data to encode
+ */
+ private _assertValidAvro(
+ schema: AvroSchema,
+ data: Record<string, any>,
+ ): void
+ {
+ schema.isValid( data, { errorHook: hook } );
+
+ function hook( keys: any, vals: any) {
+ throw context( new Error( 'Invalid Avro Schema' ),
+ {
+ invalid_paths: keys,
+ invalid_data: vals,
+ }
+ );
+ }
+ }
+
+
+ /**
+ * Format the avro data with data type labels
+ *
+ * @param ts - a timestamp
+ * @param doc_id - the document id
+ * @param delta - the current delta
+ * @param bucket - the data bucket
+ * @param ratedata - the ratedata bucket
+ *
+ * @return the formatted data
+ */
+ private _avroFormat(
+ ts: UnixTimestamp,
+ doc_id: DocumentId,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
): any
{
- const delta_data = this.setDataTypes( delta.data );
- const event_id = this.DELTA_MAP[ delta.type ];
+ const delta_formatted = this.setDataTypes( delta.data );
+ const bucket_formatted = this.setDataTypes( bucket );
+ const ratedata_formatted = this.setDataTypes( ratedata );
+ const event_id = this.DELTA_MAP[ delta.type ];
return {
event: {
- id: event_id,
- ts: ts,
+ id: event_id,
+ ts: ts,
actor: 'SERVER',
- step: null,
+ step: null,
},
document: {
- id: doc_id
- },
- session: {
- entity_name: 'Foobar', // Fix
- entity_id: 123123, // Fix
+ id: doc_id
},
data: {
Data: {
- bucket: _bucket,
+ bucket: bucket_formatted,
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: ratedata_formatted,
},
},
delta: {
Data: {
- bucket: delta_data,
+ bucket: delta_formatted,
},
},
program: {
Program: {
id: 'quote_server',
- version: 'dadaddwafdwa', // Fix
+ version: '',
},
},
}
}
+
/**
* Encode the data in an avro buffer
*
@@ -322,33 +254,28 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return the avro buffer or null if there is an error
*/
- avroEncode( data: Record<string, any> ): Buffer | null
+ avroEncode( data: Record<string, any> ): Promise<Buffer>
{
- let buffer = null;
-
- try
+ return new Promise<Buffer>( ( resolve, reject ) =>
{
- if ( !this._type )
- {
- this._emitter.emit(
- 'avro-err',
- 'No avro scheama found',
- );
+ const bufs: Buffer[] = [];
- return null;
- }
+ try
+ {
+ this._assertValidAvro( this._schema, data )
- buffer = this._type.toBuffer( data );
- }
- catch( e )
- {
- this._emitter.emit(
- 'avro-err',
- 'Error encoding data to avro: ' + e,
- );
- }
+ const encoder = this._encoder_ctr( this._schema )
- return buffer;
+ encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
+ encoder.on('error', ( err: Error ) => { reject( err ); } )
+ encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
+ encoder.end( data );
+ }
+ catch ( e )
+ {
+ reject( e );
+ }
+ } );
}
@@ -365,7 +292,7 @@ export class DeltaPublisher implements AmqpPublisher
switch( typeof( data ) )
{
- case 'object': // Typescript treats arrays as objects
+ case 'object':
if ( data == null )
{
return null;
diff --git a/src/system/EventLogger.ts b/src/system/EventLogger.ts
deleted file mode 100644
index ef46aca..0000000
--- a/src/system/EventLogger.ts
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Event logger
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of liza.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * PSR-12 style logger based on node events
- */
-
-import { EventEmitter } from "events";
-
-enum LogLevel {
- DEBUG,
- INFO,
- NOTICE,
- WARNING,
- ERROR,
- CRITICAL,
- ALERT,
- EMERGENCY,
-};
-
-declare type StructuredLog = {
- message: string;
- timestamp: UnixTimestamp;
- service: string;
- env: string;
- severity: string;
-}
-
-export class EventLogger
-{
- /**
- * Initialize logger
- *
- * @param _env - The environment ( dev, test, demo, live )
- * @param _emitter - An event emitter
- * @param _ts_ctr - a timestamp constructor
- */
- constructor(
- private readonly _console: Console,
- private readonly _env: string,
- private readonly _emitter: EventEmitter,
- private readonly _ts_ctr: () => UnixTimestamp,
- ) {
- this.init();
- }
-
-
- /**
- * Initialize the logger to look for specific events
- */
- init(): void
- {
- this._registerEvent( 'document-processed', LogLevel.NOTICE );
- this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- this._registerEvent( 'amqp-conn-error', LogLevel.WARNING );
- this._registerEvent( 'amqp-reconnect', LogLevel.WARNING );
- this._registerEvent( 'amqp-reconnect-fail', LogLevel.ERROR );
- this._registerEvent( 'avro-err', LogLevel.ERROR );
- this._registerEvent( 'dao-err', LogLevel.ERROR );
- this._registerEvent( 'publish-err', LogLevel.ERROR );
-
- // this._registerEvent( 'log', LogLevel.INFO );
- // this._registerEvent( 'debug', LogLevel.DEBUG );
- // this._registerEvent( 'info', LogLevel.INFO );
- // this._registerEvent( 'notice', LogLevel.NOTICE );
- // this._registerEvent( 'warning', LogLevel.WARNING );
- // this._registerEvent( 'error', LogLevel.ERROR );
- // this._registerEvent( 'critical', LogLevel.CRITICAL );
- // this._registerEvent( 'alert', LogLevel.ALERT );
- // this._registerEvent( 'emergency', LogLevel.EMERGENCY );
- }
-
-
- /**
- * Register an event at a specific log level
- *
- * @param event_id - the event id
- * @param level - the log level
- */
- private _registerEvent( event_id: string, level: LogLevel ): void
- {
- const logF = this._getLogLevelFunction( level )
-
- this._emitter.on( event_id, logF );
- }
-
-
- /**
- * Get a logging function for the specified log level
- *
- * @param event_id - the event id
- *
- * @return the function to log with
- */
- private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void
- {
- switch( level )
- {
- case LogLevel.DEBUG:
- case LogLevel.INFO:
- return ( str ) => this._console.info( this._format( str, level ) );
- case LogLevel.NOTICE:
- return ( str ) => this._console.log( this._format( str, level ) );
- case LogLevel.WARNING:
- return ( str ) => this._console.warn( this._format( str, level ) );
- case LogLevel.ERROR:
- case LogLevel.CRITICAL:
- case LogLevel.ALERT:
- case LogLevel.EMERGENCY:
- return ( str ) => this._console.error( this._format( str, level ) );
- default:
- return ( str ) => this._console.log( "UNKNOWN LOG LEVEL: " + str );
- }
- }
-
-
- /**
- * Get structured log object
- *
- * @param str - the string to log
- * @param level - the log level
- *
- * @returns a structured logging object
- */
- private _format( str: string, level: LogLevel ): StructuredLog
- {
- return <StructuredLog>{
- message: str,
- timestamp: this._ts_ctr(),
- service: 'quote-server',
- env: this._env,
- severity: LogLevel[level],
- };
- }
-}
diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts
new file mode 100644
index 0000000..31b245e
--- /dev/null
+++ b/src/system/EventMediator.ts
@@ -0,0 +1,88 @@
+/**
+ * Event Meditator
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Hook events and log them
+ */
+
+import { EventEmitter } from 'events';
+import { PsrLogger } from './PsrLogger';
+import { hasContext } from '../error/ContextError';
+
+export class EventMediator
+{
+ /**
+ * Initialize mediator
+ *
+ * @param _log - A PSR-3 style logger
+ * @param _emitter - An event emitter
+ */
+ constructor(
+ private readonly _log: PsrLogger,
+ private readonly _emitter: EventEmitter,
+ ) {
+ this._emitter.on( 'delta-publish', ( msg ) => this._log.notice(
+ 'Published delta to exchange',
+ msg
+ ) );
+
+ this._emitter.on( 'document-processed', ( msg ) => this._log.notice(
+ 'Deltas on document processed successfully. Document has been '
+ + 'marked as completely processed.',
+ msg
+ ) );
+
+ this._emitter.on( 'amqp-conn-error', ( msg ) =>
+ this._log.warning( 'AMQP Connection Error', msg ) );
+
+ this._emitter.on( 'amqp-reconnect', () =>
+ this._log.warning(
+ '...attempting to re-establish AMQP connection'
+ )
+ );
+
+ this._emitter.on( 'amqp-reconnected', () =>
+ this._log.warning(
+ 'AMQP re-connected'
+ )
+ );
+
+ this._emitter.on( 'error', ( arg ) =>
+ this._handleError( arg ) );
+ }
+
+
+ private _handleError( e: any ): void
+ {
+ let msg: string = '';
+ let context = {};
+
+ if ( e instanceof( Error ) )
+ {
+ msg = e.message;
+
+ if ( hasContext( e ) )
+ {
+ context = e.context;
+ }
+ }
+
+ this._log.error( msg, context );
+ }
+}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 30c4ea2..bc4a6cc 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -21,27 +21,12 @@
* Collect Metrics for Prometheus
*/
-import { DeltaDao } from "./db/DeltaDao";
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from "events";
-import { PrometheusFactory } from './PrometheusFactory';
+import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory';
const client = require( 'prom-client' )
-export declare type PrometheusConfig = {
- /** The hostname to connect to */
- hostname: string;
-
- /** The port to connect to */
- port: number;
-
- /** The environment ( dev, test, demo, live ) */
- env: string;
-
- /** The rate (in milliseconds) at which metrics are pushed */
- push_interval_ms: number;
-}
-
export type MetricTimer = (
_start_time?: [ number, number ]
@@ -78,6 +63,8 @@ export class MetricsCollector
/** Timing map */
private _timing_map: Record<string, [ number, number ]> = {};
+ private _push_interval: NodeJS.Timer;
+
/**
* Initialize delta logger
@@ -133,8 +120,7 @@ export class MetricsCollector
);
// Push metrics on a specific interval
- setInterval(
- () =>
+ this._push_interval = setInterval( () =>
{
this._gateway.pushAdd(
{ jobName: 'liza_delta_metrics' }, this.pushCallback
@@ -148,16 +134,22 @@ export class MetricsCollector
/**
+ * Stop the push interval
+ */
+ stop(): void
+ {
+ clearInterval( this._push_interval );
+ }
+
+
+ /**
* List to events to update metrics
*/
private hookMetrics()
{
this._emitter.on(
'delta-process-start',
- ( uid: string ) =>
- {
- this._timing_map[ uid ] = this._timer();
- }
+ ( uid: string ) => { this._timing_map[ uid ] = this._timer(); }
);
this._emitter.on(
@@ -166,7 +158,7 @@ export class MetricsCollector
{
const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ];
const t = this._timer( start_time_ms );
- const total_time_ms = ( ( t[ 0 ] * 1000 ) + ( t[ 1 ] / 1000 ) );
+ const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000;
this._process_time.observe( total_time_ms );
this._total_processed.inc();
@@ -188,27 +180,23 @@ export class MetricsCollector
* @param body - The resposne body
*/
private pushCallback(
- _error?: Error | undefined,
+ error?: Error | undefined,
_response?: any,
_body?: any
): void
{
- console.log( 'Push callback' );
- console.error( _error );
+ if ( error )
+ {
+ this._emitter.emit( 'error', error );
+ }
}
/**
- * Look for mongodb delta errors and update metrics if found
- *
- * @return any errors the occurred
+ * Update metrics with current error count
*/
- checkForErrors( dao: DeltaDao ): NullableError
+ updateErrorCount( count: number ): void
{
- dao.getErrorCount()
- .then( count => { this._current_error.set( +count ); } )
- .catch( err => { return err; } );
-
- return null;
+ this._current_error.set( +count );
}
}
diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts
index 088612e..0cf059f 100644
--- a/src/system/PrometheusFactory.ts
+++ b/src/system/PrometheusFactory.ts
@@ -22,6 +22,42 @@
*/
import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client';
+
+export declare type PrometheusConfig = {
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** The environment ( dev, test, demo, live ) */
+ env: string;
+
+ /** The rate (in milliseconds) at which metrics are pushed */
+ push_interval_ms: number;
+}
+
+
+/**
+ * Create a prometheus configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the prometheus configuration
+ */
+export function createPrometheusConfig(
+ env: NodeJS.ProcessEnv
+): PrometheusConfig
+{
+ return <PrometheusConfig>{
+ 'hostname': env.prom_hostname,
+ 'port': +( env.prom_port || 0 ),
+ 'env': process.env.NODE_ENV,
+ 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
+ };
+}
+
+
export class PrometheusFactory
{
/**
diff --git a/src/system/PsrLogger.ts b/src/system/PsrLogger.ts
new file mode 100644
index 0000000..276c78e
--- /dev/null
+++ b/src/system/PsrLogger.ts
@@ -0,0 +1,117 @@
+/**
+ * PSR logger
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * PSR-3 style logger
+ */
+
+export enum LogLevel {
+ DEBUG,
+ INFO,
+ NOTICE,
+ WARNING,
+ ERROR,
+ CRITICAL,
+ ALERT,
+ EMERGENCY,
+};
+
+
+export interface PsrLogger
+{
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ debug( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ info( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ notice( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ warning( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ error( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ critical( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ alert( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ emergency( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log a message
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ log( level: LogLevel, msg: string | object, context?: object ): void
+}
diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts
new file mode 100644
index 0000000..d69c3d4
--- /dev/null
+++ b/src/system/StandardLogger.ts
@@ -0,0 +1,198 @@
+/**
+ * Stdout logger
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Standard out logger implementing PSR-3 standards
+ */
+import { PsrLogger, LogLevel } from './PsrLogger';
+
+declare type StructuredLog = {
+ message: string;
+ timestamp: UnixTimestamp;
+ service: string;
+ env: string;
+ severity: string;
+ context?: Record<string, any>;
+}
+
+export class StandardLogger implements PsrLogger
+{
+ /**
+ * Initialize logger
+ *
+ * @param _console
+ * @param _ts_ctr - a timestamp constructor
+ * @param _env - The environment ( dev, test, demo, live )
+ */
+ constructor(
+ private readonly _console: Console,
+ private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _env: string,
+ ) {}
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ debug( msg: string | object, context?: object ): void
+ {
+ this._console.info( this._format( LogLevel.DEBUG, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ info( msg: string | object, context?: object ): void
+ {
+ this._console.info( this._format( LogLevel.INFO, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ notice( msg: string | object, context?: object ): void
+ {
+ this._console.log( this._format( LogLevel.NOTICE, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ warning( msg: string | object, context?: object ): void
+ {
+ this._console.warn( this._format( LogLevel.WARNING, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ error( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.ERROR, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ critical( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.CRITICAL, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ alert( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.ALERT, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ emergency( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.EMERGENCY, msg, context ) );
+ }
+
+
+ /**
+ * Log a message
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ log( level: LogLevel, msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( level, msg, context ) );
+ }
+
+
+ /**
+ * Get structured log object
+ *
+ * @param msg - the string or object to log
+ * @param level - the log level
+ *
+ * @returns a structured logging object
+ */
+ private _format(
+ level: LogLevel,
+ msg: string | object,
+ context: object = {},
+ ): StructuredLog
+ {
+ let str: string;
+
+ if ( msg !== null && typeof( msg ) === 'object' )
+ {
+ str = JSON.stringify( msg );
+ }
+ else
+ {
+ str = msg;
+ }
+
+ const structured_log = <StructuredLog>{
+ message: str,
+ timestamp: this._ts_ctr(),
+ service: 'quote-server',
+ env: this._env,
+ severity: LogLevel[level],
+ };
+
+ if ( Object.keys( context ).length > 0 )
+ {
+ structured_log[ "context" ] = context;
+ }
+
+ return structured_log;
+ }
+}
diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts
new file mode 100644
index 0000000..13b9791
--- /dev/null
+++ b/src/system/amqp/AmqpConnection.ts
@@ -0,0 +1,154 @@
+/**
+ * Amqp Connection
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Amqp Connection
+ */
+import { AmqpConfig } from '../AmqpPublisher';
+import { EventEmitter } from "events";
+import {
+ connect as AmqpConnect,
+ Channel,
+ Connection,
+} from 'amqplib';
+
+
+export class AmqpConnection
+{
+ /** The amqp connection */
+ private _conn?: Connection;
+
+ /** The amqp channel */
+ private _channel?: Channel;
+
+
+ /**
+ * Amqp Connection
+ *
+ * @param _conf - amqp configuration
+ * @param _emitter - event emitter instance
+ */
+ constructor(
+ private readonly _conf: AmqpConfig,
+ private readonly _emitter: EventEmitter,
+ ) {}
+
+
+ /**
+ * Initialize connection
+ */
+ connect(): Promise<void>
+ {
+ return AmqpConnect( this._conf )
+ .then( conn =>
+ {
+ this._conn = conn;
+
+ /** If there is an error, attempt to reconnect
+ * Only hook this once because it will be re-hooked on each
+ * successive successful connection
+ */
+ this._conn.once( 'error', e =>
+ {
+ this._emitter.emit( 'amqp-conn-error', e );
+ this._reconnect();
+ } );
+
+ return this._conn.createChannel();
+ } )
+ .then( ( ch: Channel ) =>
+ {
+ this._channel = ch;
+
+ this._channel.assertExchange(
+ this._conf.exchange,
+ 'fanout',
+ { durable: true }
+ );
+ } );
+ }
+
+
+ /**
+ * Attempt to re-establish the connection
+ *
+ * @param retry_count - the number of retries attempted
+ */
+ private _reconnect( retry_count: number = 0 ): void
+ {
+ if ( retry_count >= this._conf.retries )
+ {
+ this._emitter.emit(
+ 'error',
+ new Error( 'Could not re-establish AMQP connection.' )
+ );
+
+ return;
+ }
+
+ this._emitter.emit( 'amqp-reconnect' );
+
+ this.connect()
+ .then( _ => { this._emitter.emit( 'amqp-reconnected' ) } )
+ .catch( _ =>
+ {
+ const wait_ms = this._conf.retry_wait;
+ setTimeout( () => this._reconnect( ++retry_count ), wait_ms );
+ } );
+ }
+
+
+ /**
+ * Returns the exchange to publish to
+ *
+ * @return exchange name
+ */
+ getExchangeName(): string
+ {
+ return this._conf.exchange;
+ }
+
+
+ /**
+ * Returns the amqp channel
+ *
+ * @return exchange name
+ */
+ getAmqpChannel(): Channel | undefined
+ {
+ if ( !this._channel )
+ {
+ this._reconnect();
+ }
+
+ return this._channel;
+ }
+
+
+ /**
+ * Close the amqp conenction
+ */
+ close(): void
+ {
+ if ( this._conn )
+ {
+ this._conn.close.bind(this._conn);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/system/avro/AvroFactory.ts b/src/system/avro/AvroFactory.ts
new file mode 100644
index 0000000..0a07a32
--- /dev/null
+++ b/src/system/avro/AvroFactory.ts
@@ -0,0 +1,90 @@
+/**
+ * Factory functions for avro
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import { Duplex } from 'stream';
+
+const avro = require( 'avro-js' );
+
+
+export interface AvroSchema
+{
+ /**
+ * Write data to a buffer
+ *
+ * @param data - the data to write
+ *
+ * @return the buffer if successful
+ */
+ toBuffer( data: Record<string, any> ): Buffer | null;
+
+
+ /**
+ * Check if data is valid against schema
+ *
+ * @param data - the data to validate
+ * @param opts - options specified as key/values
+ *
+ * @return the buffer if it is valid
+ */
+ isValid(
+ data: Record<string, any>,
+ opts?: Record<string, any>
+ ): Buffer | null;
+
+
+ /**
+ * Write to a buffer
+ *
+ * @param data - the data to write
+ * @param buffer - the buffer that will be written to
+ */
+ encode( data: Record<string, any>, buffer: Buffer ): void;
+
+
+ /**
+ * Output to a json string
+ *
+ * @param data - the data to format
+ *
+ * @return the formatted data
+ */
+ toString( data: Record<string, any> ): string;
+
+
+ /**
+ * Deserialize from a buffer
+ *
+ * @param buffer - the buffer to read from
+ *
+ * @return the resulting data
+ */
+ fromBuffer( buffer: Buffer ): any;
+}
+
+
+/** The avro encoder constructor type */
+export type AvroEncoderCtr = ( type: AvroSchema ) => Duplex;
+
+
+/** The avro encoder constructor */
+export function createAvroEncoder( schema: AvroSchema ): Duplex
+{
+ return new avro.streams.BlockEncoder( schema );
+} \ No newline at end of file
diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc
index 4a9a609..53e4a9d 100644
--- a/src/system/avro/schema.avsc
+++ b/src/system/avro/schema.avsc
@@ -78,23 +78,6 @@
}
},
{
- "name": "session",
- "type": {
- "type": "record",
- "name": "Session",
- "fields": [
- {
- "name": "entity_name",
- "type": "string"
- },
- {
- "name": "entity_id",
- "type": "int"
- }
- ]
- }
- },
- {
"name": "data",
"type": [
"null",
@@ -161,6 +144,13 @@
]
},
{
+ "name": "ratedata",
+ "type": [
+ "null",
+ "Data"
+ ]
+ },
+ {
"name": "delta",
"type": [
"null",
@@ -191,4 +181,3 @@
}
]
}
-
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index d7d2544..881bc79 100644
--- a/src/system/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -28,6 +28,7 @@
*/
import { DocumentId } from "../../document/Document";
+import { DeltaDocument } from "../../bucket/delta";
/** Manage deltas */
@@ -38,7 +39,7 @@ export interface DeltaDao
*
* @return documents in need of processing
*/
- getUnprocessedDocuments(): Promise<Record<string, any>[]>
+ getUnprocessedDocuments(): Promise<DeltaDocument[]>
/**
@@ -46,8 +47,6 @@ export interface DeltaDao
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
- *
- * @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@@ -61,8 +60,6 @@ export interface DeltaDao
*
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
- *
- * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 4ab4e91..8d45684 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -26,7 +26,7 @@ import { DeltaDao } from './DeltaDao';
import { MongoCollection } from 'mongodb';
import { context } from '../../error/ContextError';
import { DaoError } from '../../error/DaoError';
-import { DeltaType } from '../../bucket/delta';
+import { DeltaType, DeltaDocument } from '../../bucket/delta';
/** Manage deltas */
export class MongoDeltaDao implements DeltaDao
@@ -37,11 +37,21 @@ export class MongoDeltaDao implements DeltaDao
/** The data delta type */
static readonly DELTA_DATA: string = 'data';
+ /** The document fields to read */
+ readonly RESULT_FIELDS: Record<string, number> = {
+ id: 1,
+ lastUpdate: 1,
+ data: 1,
+ ratedata: 1,
+ rdelta: 1,
+ totalPublishDelta: 1,
+ };
+
/**
* Initialize connection
*
- * @param _db Mongo db
+ * @param _collection - Mongo db collection
*/
constructor(
private readonly _collection: MongoCollection,
@@ -53,7 +63,7 @@ export class MongoDeltaDao implements DeltaDao
*
* @return documents in need of processing
*/
- getUnprocessedDocuments(): Promise<Record<string, any>[]>
+ getUnprocessedDocuments(): Promise<DeltaDocument[]>
{
return new Promise( ( resolve, reject ) =>
{
@@ -62,7 +72,7 @@ export class MongoDeltaDao implements DeltaDao
published: false,
deltaError: false,
},
- {},
+ { fields: this.RESULT_FIELDS },
( e, cursor ) =>
{
if ( e )
@@ -75,7 +85,7 @@ export class MongoDeltaDao implements DeltaDao
return
}
- cursor.toArray( ( e: Error, data: any[] ) =>
+ cursor.toArray( ( e: Error, data: DeltaDocument[] ) =>
{
if ( e )
{
@@ -100,13 +110,8 @@ export class MongoDeltaDao implements DeltaDao
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
- *
- * @return any errors that occurred
*/
- advanceDeltaIndex(
- doc_id: DocumentId,
- type: DeltaType,
- ): Promise<void>
+ advanceDeltaIndex( doc_id: DocumentId, type: DeltaType ): Promise<void>
{
return new Promise( ( resolve, reject ) =>
{
@@ -123,9 +128,7 @@ export class MongoDeltaDao implements DeltaDao
if ( e )
{
reject( context(
- new DaoError(
- 'Error advancing delta index: ' + e
- ),
+ new DaoError( 'Error advancing delta index: ' + e ),
{
doc_id: doc_id,
type: type,
@@ -149,8 +152,6 @@ export class MongoDeltaDao implements DeltaDao
*
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
- *
- * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts
new file mode 100644
index 0000000..5a3b03e
--- /dev/null
+++ b/src/system/db/MongoFactory.ts
@@ -0,0 +1,176 @@
+/**
+ * Mongo Factory functions
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * These definitions are for a very old mongodb library, which will be
+ * once we get around to updating node. Quite a failure on the maintenance
+ * front.
+ *
+ * instantiate objects for MongoDb
+ */
+import { MongoDb, MongoDbConfig, MongoCollection } from '../../types/mongodb';
+import { DaoError } from '../../error/DaoError';
+
+
+const {
+ Db: MongoDb,
+ Server: MongoServer,
+ ReplServers: ReplSetServers,
+} = require( 'mongodb' );
+
+
+/**
+ * Create a mongodb configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the mongo configuration
+ */
+export function createMongoConfig( env: NodeJS.ProcessEnv ): MongoDbConfig
+{
+ return <MongoDbConfig>{
+ 'port': +( env.MONGODB_PORT || 0 ),
+ 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1,
+ 'replset': env.LIZA_MONGODB_REPLSET,
+ 'host': env.MONGODB_HOST,
+ 'host_a': env.LIZA_MONGODB_HOST_A,
+ 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ),
+ 'host_b': env.LIZA_MONGODB_HOST_B,
+ 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ),
+ 'collection': 'quotes',
+ };
+}
+
+
+/**
+ * Create the database connection
+ *
+ * @param conf - the configuration from the environment
+ *
+ * @return the mongodb connection
+ */
+export function createMongoDB( conf: MongoDbConfig ): MongoDb
+{
+ if( conf.ha )
+ {
+ var mongodbPort = conf.port || 27017;
+ var mongodbReplSet = conf.replset || 'rs0';
+ var dbServers = new ReplSetServers(
+ [
+ new MongoServer( conf.host_a, conf.port_a || mongodbPort),
+ new MongoServer( conf.host_b, conf.port_b || mongodbPort),
+ ],
+ {rs_name: mongodbReplSet, auto_reconnect: true}
+ );
+ }
+ else
+ {
+ var dbServers = new MongoServer(
+ conf.host || '127.0.0.1',
+ conf.port || 27017,
+ {auto_reconnect: true}
+ );
+ }
+ var db = new MongoDb(
+ 'program',
+ dbServers,
+ {native_parser: false, safe: false}
+ );
+ return db;
+}
+
+
+/**
+ * Attempts to connect to the database and retrieve the collection
+ *
+ * connectError event will be emitted on failure.
+ *
+ * @param db - the mongo database
+ * @param conf - the mongo configuration
+ *
+ * @return the collection
+ */
+export function getMongoCollection(
+ db: MongoDb,
+ conf: MongoDbConfig
+): Promise<MongoCollection>
+{
+ return new Promise( ( resolve, reject ) =>
+ {
+ // attempt to connect to the database
+ db.open( ( e: any, db: any ) =>
+ {
+ // if there was an error, don't bother with anything else
+ if ( e )
+ {
+ // in some circumstances, it may just be telling us that
+ // we're already connected (even though the connection may
+ // have been broken)
+ if ( e.errno !== undefined )
+ {
+ reject( new Error(
+ 'Error opening mongo connection: ' + e
+ ) );
+ return;
+ }
+ } else if ( db == null )
+ {
+ reject( new DaoError( 'No database connection' ) );
+ return;
+ }
+
+ // quotes collection
+ db.collection(
+ conf.collection,
+ ( e: any, collection: MongoCollection ) =>
+ {
+ if ( e )
+ {
+ reject( new DaoError(
+ 'Error creating collection: ' + e
+ ) );
+ return;
+ }
+
+ // initialize indexes
+ collection.createIndex(
+ [
+ ['published', 1],
+ ['deltaError', 1],
+ ],
+ true,
+ ( e: any, _index: { [P: string]: any } ) =>
+ {
+ if ( e )
+ {
+ reject( new DaoError(
+ 'Error creating index: ' + e
+ ) );
+ return;
+ }
+
+ resolve( collection );
+ return;
+ }
+ );
+ }
+ );
+ } );
+ } );
+} \ No newline at end of file
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 4642223..bbbfa46 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -62,6 +62,15 @@ export interface MongoDb
* @param callback continuation on completion
*/
close( callback: MongoCallback ): void;
+
+
+ /**
+ * Hook events
+ *
+ * @param event_id - the event to hook
+ * @param callback - a function to call in response to the event
+ */
+ on( event_id: string, callback: ( err: Error ) => void ): void;
}
@@ -107,6 +116,9 @@ interface MongoFindOptions
/** Whether to project only id's */
id?: number,
+
+ /** Which fields to include in the result set */
+ fields?: Record<string, number>,
}
@@ -236,6 +248,10 @@ declare interface MongoCollection
/**
* Creates an index on the collection
+ *
+ * @param fieldOrSpec - indexes to create
+ * @param options - mongo options
+ * @param callback - continuation on completion
*/
createIndex(
fieldOrSpec: MongoIndexSpecification,
@@ -246,6 +262,9 @@ declare interface MongoCollection
/**
* Creates an index on the collection
+ *
+ * @param docs - documents to insert
+ * @param callback - continuation on completion
*/
insert(
docs: MongoInsertSpecification,
diff --git a/test/server/db/MongoServerDaoTest.js b/test/server/db/MongoServerDaoTest.js
deleted file mode 100644
index d6c8bf1..0000000
--- a/test/server/db/MongoServerDaoTest.js
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Tests MongoServerDao
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of the Liza Data Collection Framework.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-'use strict';
-Object.defineProperty(exports, "__esModule", { value: true });
-var MongoServerDao_1 = require("../../../src/server/db/MongoServerDao");
-var chai_1 = require("chai");
-chai_1.use(require('chai-as-promised'));
-describe('MongoServerDao', function () {
- describe('#saveQuote', function () {
- describe("with no save data or push data", function () {
- it("saves entire metabucket record individually", function (done) {
- var metadata = {
- foo: ['bar', 'baz'],
- bar: [{ quux: 'quuux' }],
- };
- var quote = createStubQuote(metadata);
- var sut = new MongoServerDao_1.MongoServerDao(createMockDb(
- // update
- function (_selector, data) {
- chai_1.expect(data.$set['meta.foo'])
- .to.deep.equal(metadata.foo);
- chai_1.expect(data.$set['meta.bar'])
- .to.deep.equal(metadata.bar);
- chai_1.expect(data.$push).to.equal(undefined);
- done();
- }));
- sut.init(function () {
- return sut.saveQuote(quote, function () { }, function () { });
- });
- });
- });
- describe("with push data", function () {
- it("adds push data to the collection", function (done) {
- var push_data = {
- foo: ['bar', 'baz'],
- bar: [{ quux: 'quuux' }],
- };
- var quote = createStubQuote({});
- var sut = new MongoServerDao_1.MongoServerDao(createMockDb(
- // update
- function (_selector, data) {
- chai_1.expect(data.$push['foo'])
- .to.deep.equal(push_data.foo);
- chai_1.expect(data.$push['bar'])
- .to.deep.equal(push_data.bar);
- done();
- }));
- sut.init(function () {
- return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data);
- });
- });
- it("skips push data when it is an empty object", function (done) {
- var push_data = {};
- var quote = createStubQuote({});
- var sut = new MongoServerDao_1.MongoServerDao(createMockDb(
- // update
- function (_selector, data) {
- chai_1.expect(data.$push).to.equal(undefined);
- done();
- }));
- sut.init(function () {
- return sut.saveQuote(quote, function () { }, function () { }, undefined, push_data);
- });
- });
- });
- });
-});
-function createMockDb(on_update) {
- var collection_quotes = {
- update: on_update,
- createIndex: function (_, __, c) { return c(); },
- };
- var collection_seq = {
- find: function (_, __, c) {
- c(null, {
- toArray: function (c) { return c(null, { length: 5 }); },
- });
- },
- };
- var db = {
- collection: function (id, c) {
- var coll = (id === 'quotes')
- ? collection_quotes
- : collection_seq;
- c(null, coll);
- },
- };
- var driver = {
- open: function (c) { return c(null, db); },
- on: function () { },
- };
- return driver;
-}
-function createStubQuote(metadata) {
- var program = {
- getId: function () { return '1'; },
- ineligibleLockCount: 0,
- apis: {},
- internal: {},
- meta: {
- arefs: {},
- fields: {},
- groups: {},
- qdata: {},
- qtypes: {},
- },
- mapis: {},
- initQuote: function () { },
- };
- var quote = {
- getBucket: function () { return ({
- getData: function () { },
- }); },
- getMetabucket: function () { return ({
- getData: function () { return metadata; },
- }); },
- getId: function () { return 123; },
- getProgramVersion: function () { return 'Foo'; },
- getLastPremiumDate: function () { return 0; },
- getRatedDate: function () { return 0; },
- getExplicitLockReason: function () { return ""; },
- getExplicitLockStep: function () { return 1; },
- isImported: function () { return false; },
- isBound: function () { return false; },
- getTopVisitedStepId: function () { return 1; },
- getTopSavedStepId: function () { return 1; },
- setRatedDate: function () { return quote; },
- setRateBucket: function () { return quote; },
- setRatingData: function () { return quote; },
- getRatingData: function () { return ({ _unavailable_all: '0' }); },
- getProgram: function () { return program; },
- setExplicitLock: function () { return quote; },
- getProgramId: function () { return 'Foo'; },
- getCurrentStepId: function () { return 0; },
- setLastPremiumDate: function () { return quote; },
- };
- return quote;
-}
-//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiTW9uZ29TZXJ2ZXJEYW9UZXN0LmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiTW9uZ29TZXJ2ZXJEYW9UZXN0LnRzIl0sIm5hbWVzIjpbXSwibWFwcGluZ3MiOiJBQUFBOzs7Ozs7Ozs7Ozs7Ozs7Ozs7O0dBbUJHO0FBRUgsWUFBWSxDQUFDOztBQUViLHdFQUE4RTtBQUU5RSw2QkFBK0M7QUFRL0MsVUFBUSxDQUFFLE9BQU8sQ0FBRSxrQkFBa0IsQ0FBRSxDQUFFLENBQUM7QUFHMUMsUUFBUSxDQUFFLGdCQUFnQixFQUFFO0lBRXhCLFFBQVEsQ0FBRSxZQUFZLEVBQUU7UUFFcEIsUUFBUSxDQUFFLGdDQUFnQyxFQUFFO1lBRXhDLEVBQUUsQ0FBRSw2Q0FBNkMsRUFBRSxVQUFBLElBQUk7Z0JBRW5ELElBQU0sUUFBUSxHQUFHO29CQUNiLEdBQUcsRUFBRSxDQUFFLEtBQUssRUFBRSxLQUFLLENBQUU7b0JBQ3JCLEdBQUcsRUFBRSxDQUFFLEVBQUUsSUFBSSxFQUFFLE9BQU8sRUFBRSxDQUFFO2lCQUM3QixDQUFDO2dCQUVGLElBQU0sS0FBSyxHQUFHLGVBQWUsQ0FBRSxRQUFRLENBQUUsQ0FBQztnQkFFMUMsSUFBTSxHQUFHLEdBQUcsSUFBSSwrQkFBRyxDQUFFLFlBQVk7Z0JBQzdCLFNBQVM7Z0JBQ1QsVUFBRSxTQUF3QixFQUFFLElBQWlCO29CQUV6QyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUVuQyxhQUFNLENBQUUsSUFBSSxDQUFDLElBQUksQ0FBRSxVQUFVLENBQUUsQ0FBRTt5QkFDNUIsRUFBRSxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBRSxDQUFDO29CQUduQyxhQUFNLENBQUUsSUFBSSxDQUFDLEtBQUssQ0FBRSxDQUFDLEVBQUUsQ0FBQyxLQUFLLENBQUUsU0FBUyxDQUFFLENBQUM7b0JBRTNDLElBQUksRUFBRSxDQUFDO2dCQUNYLENBQUMsQ0FDSixDQUFFLENBQUM7Z0JBRUosR0FBRyxDQUFDLElBQUksQ0FBRTtvQkFDTixPQUFBLEdBQUcsQ0FBQyxTQUFTLENBQUUsS0FBSyxFQUFFLGNBQU8sQ0FBQyxFQUFFLGNBQU8sQ0FBQyxDQUFFO2dCQUExQyxDQUEwQyxDQUM3QyxDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztRQUVKLFFBQVEsQ0FBRSxnQkFBZ0IsRUFBRTtZQUV4QixFQUFFLENBQUUsa0NBQWtDLEVBQUUsVUFBQSxJQUFJO2dCQUV4QyxJQUFNLFNBQVMsR0FBRztvQkFDZCxHQUFHLEVBQUUsQ0FBRSxLQUFLLEVBQUUsS0FBSyxDQUFFO29CQUNyQixHQUFHLEVBQUUsQ0FBRSxFQUFFLElBQUksRUFBRSxPQUFPLEVBQUUsQ0FBRTtpQkFDN0IsQ0FBQztnQkFFRixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUMsU0FBd0IsRUFBRSxJQUFpQjtvQkFFeEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsS0FBSyxDQUFFLENBQUU7eUJBQ3hCLEVBQUUsQ0FBQyxJQUFJLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBQyxHQUFHLENBQUUsQ0FBQztvQkFFcEMsSUFBSSxFQUFFLENBQUM7Z0JBQ1gsQ0FBQyxDQUNKLENBQUUsQ0FBQztnQkFFSixHQUFHLENBQUMsSUFBSSxDQUFFO29CQUNOLE9BQUEsR0FBRyxDQUFDLFNBQVMsQ0FDVCxLQUFLLEVBQ0wsY0FBTyxDQUFDLEVBQ1IsY0FBTyxDQUFDLEVBQ1IsU0FBUyxFQUNULFNBQVMsQ0FDWjtnQkFORCxDQU1DLENBQ0osQ0FBQztZQUNOLENBQUMsQ0FBRSxDQUFDO1lBRUosRUFBRSxDQUFFLDRDQUE0QyxFQUFFLFVBQUEsSUFBSTtnQkFFbEQsSUFBTSxTQUFTLEdBQUcsRUFBRSxDQUFDO2dCQUVyQixJQUFNLEtBQUssR0FBRyxlQUFlLENBQUUsRUFBRSxDQUFFLENBQUM7Z0JBRXBDLElBQU0sR0FBRyxHQUFHLElBQUksK0JBQUcsQ0FBRSxZQUFZO2dCQUM3QixTQUFTO2dCQUNULFVBQUUsU0FBd0IsRUFBRSxJQUFpQjtvQkFFekMsYUFBTSxDQUFFLElBQUksQ0FBQyxLQUFLLENBQUUsQ0FBQyxFQUFFLENBQUMsS0FBSyxDQUFFLFNBQVMsQ0FBRSxDQUFDO29CQUUzQyxJQUFJLEVBQUUsQ0FBQztnQkFDWCxDQUFDLENBQ0osQ0FBRSxDQUFDO2dCQUVKLEdBQUcsQ0FBQyxJQUFJLENBQUU7b0JBQ04sT0FBQSxHQUFHLENBQUMsU0FBUyxDQUNULEtBQUssRUFDTCxjQUFPLENBQUMsRUFDUixjQUFPLENBQUMsRUFDUixTQUFTLEVBQ1QsU0FBUyxDQUNaO2dCQU5ELENBTUMsQ0FDSixDQUFDO1lBQ04sQ0FBQyxDQUFFLENBQUM7UUFDUixDQUFDLENBQUUsQ0FBQztJQUNSLENBQUMsQ0FBRSxDQUFDO0FBQ1IsQ0FBQyxDQUFFLENBQUM7QUFHSixTQUFTLFlBQVksQ0FBRSxTQUFjO0lBRWpDLElBQU0saUJBQWlCLEdBQUc7UUFDdEIsTUFBTSxFQUFFLFNBQVM7UUFDakIsV0FBVyxFQUFFLFVBQUUsQ0FBTSxFQUFFLEVBQU8sRUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLEVBQUUsRUFBSCxDQUFHO0tBQ2xELENBQUM7SUFFRixJQUFNLGNBQWMsR0FBRztRQUNuQixJQUFJLEVBQUosVUFBTSxDQUFNLEVBQUUsRUFBTyxFQUFFLENBQU07WUFFekIsQ0FBQyxDQUFFLElBQUksRUFBRTtnQkFDTCxPQUFPLEVBQUUsVUFBRSxDQUFNLElBQU0sT0FBQSxDQUFDLENBQUUsSUFBSSxFQUFFLEVBQUUsTUFBTSxFQUFFLENBQUMsRUFBRSxDQUFFLEVBQXhCLENBQXdCO2FBQ2xELENBQUUsQ0FBQztRQUNSLENBQUM7S0FDSixDQUFDO0lBRUYsSUFBTSxFQUFFLEdBQUc7UUFDUCxVQUFVLEVBQVYsVUFBWSxFQUFPLEVBQUUsQ0FBTTtZQUV2QixJQUFNLElBQUksR0FBRyxDQUFFLEVBQUUsS0FBSyxRQUFRLENBQUU7Z0JBQzVCLENBQUMsQ0FBQyxpQkFBaUI7Z0JBQ25CLENBQUMsQ0FBQyxjQUFjLENBQUM7WUFFckIsQ0FBQyxDQUFFLElBQUksRUFBRSxJQUFJLENBQUUsQ0FBQztRQUNwQixDQUFDO0tBQ0osQ0FBQztJQUVGLElBQU0sTUFBTSxHQUFHO1FBQ1gsSUFBSSxFQUFFLFVBQUUsQ0FBTSxJQUFNLE9BQUEsQ0FBQyxDQUFFLElBQUksRUFBRSxFQUFFLENBQUUsRUFBYixDQUFhO1FBQ2pDLEVBQUUsRUFBSSxjQUFPLENBQUM7S0FDakIsQ0FBQztJQUVGLE9BQU8sTUFBTSxDQUFDO0FBQ2xCLENBQUM7QUFHRCxTQUFTLGVBQWUsQ0FBRSxRQUE2QjtJQUVuRCxJQUFNLE9BQU8sR0FBWTtRQUNyQixLQUFLLEVBQWdCLGNBQU0sT0FBQSxHQUFHLEVBQUgsQ0FBRztRQUM5QixtQkFBbUIsRUFBRSxDQUFDO1FBQ3RCLElBQUksRUFBaUIsRUFBRTtRQUN2QixRQUFRLEVBQWEsRUFBRTtRQUN2QixJQUFJLEVBQWlCO1lBQ2pCLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7WUFDVixNQUFNLEVBQUUsRUFBRTtZQUNWLEtBQUssRUFBRyxFQUFFO1lBQ1YsTUFBTSxFQUFFLEVBQUU7U0FDYjtRQUNELEtBQUssRUFBZ0IsRUFBRTtRQUN2QixTQUFTLEVBQVksY0FBTyxDQUFDO0tBQ2hDLENBQUM7SUFFRixJQUFNLEtBQUssR0FBb0I7UUFDM0IsU0FBUyxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNoQyxPQUFPLEVBQUUsY0FBTyxDQUFDO1NBQ3BCLENBQUUsRUFGYyxDQUVkO1FBRUgsYUFBYSxFQUFFLGNBQU0sT0FBaUIsQ0FBRTtZQUNwQyxPQUFPLEVBQUUsY0FBTSxPQUFBLFFBQVEsRUFBUixDQUFRO1NBQzFCLENBQUUsRUFGa0IsQ0FFbEI7UUFFSCxLQUFLLEVBQWtCLGNBQU0sT0FBUyxHQUFHLEVBQVosQ0FBWTtRQUN6QyxpQkFBaUIsRUFBTSxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsa0JBQWtCLEVBQUssY0FBTSxPQUFlLENBQUMsRUFBaEIsQ0FBZ0I7UUFDN0MsWUFBWSxFQUFXLGNBQU0sT0FBZSxDQUFDLEVBQWhCLENBQWdCO1FBQzdDLHFCQUFxQixFQUFFLGNBQU0sT0FBQSxFQUFFLEVBQUYsQ0FBRTtRQUMvQixtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsVUFBVSxFQUFhLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxPQUFPLEVBQWdCLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxtQkFBbUIsRUFBSSxjQUFNLE9BQWlCLENBQUMsRUFBbEIsQ0FBa0I7UUFDL0MsaUJBQWlCLEVBQU0sY0FBTSxPQUFpQixDQUFDLEVBQWxCLENBQWtCO1FBQy9DLFlBQVksRUFBVyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7UUFDbEMsYUFBYSxFQUFVLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxhQUFhLEVBQVUsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGFBQWEsRUFBVSxjQUFNLE9BQUEsQ0FBWSxFQUFFLGdCQUFnQixFQUFFLEdBQUcsRUFBRSxDQUFBLEVBQXJDLENBQXFDO1FBQ2xFLFVBQVUsRUFBYSxjQUFNLE9BQUEsT0FBTyxFQUFQLENBQU87UUFDcEMsZUFBZSxFQUFRLGNBQU0sT0FBQSxLQUFLLEVBQUwsQ0FBSztRQUNsQyxZQUFZLEVBQVcsY0FBTSxPQUFBLEtBQUssRUFBTCxDQUFLO1FBQ2xDLGdCQUFnQixFQUFPLGNBQU0sT0FBQSxDQUFDLEVBQUQsQ0FBQztRQUM5QixrQkFBa0IsRUFBSyxjQUFNLE9BQUEsS0FBSyxFQUFMLENBQUs7S0FDckMsQ0FBQztJQUVGLE9BQU8sS0FBSyxDQUFDO0FBQ2pCLENBQUMifQ== \ No newline at end of file
diff --git a/test/server/db/MongoServerDaoTest.ts b/test/server/db/MongoServerDaoTest.ts
index b0a83bb..58e6ab9 100644
--- a/test/server/db/MongoServerDaoTest.ts
+++ b/test/server/db/MongoServerDaoTest.ts
@@ -22,7 +22,7 @@
'use strict';
import { MongoServerDao as Sut } from "../../../src/server/db/MongoServerDao";
-import { MongoSelector, MongoUpdate } from "mongodb";
+import { MongoSelector, MongoUpdate, MongoDb } from "mongodb";
import { expect, use as chai_use } from 'chai';
import { ServerSideQuote } from "../../../src/server/quote/ServerSideQuote";
import { PositiveInteger } from "../../../src/numeric";
@@ -139,7 +139,7 @@ describe( 'MongoServerDao', () =>
} );
-function createMockDb( on_update: any )
+function createMockDb( on_update: any ): MongoDb
{
const collection_quotes = {
update: on_update,
@@ -166,8 +166,9 @@ function createMockDb( on_update: any )
},
};
- const driver = {
+ const driver = <MongoDb>{
open: ( c: any ) => c( null, db ),
+ close: () => {},
on: () => {},
};
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index d0cd8f8..3998d68 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -22,7 +22,8 @@
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
-import { DeltaType } from "../../src/bucket/delta";
+import { DeltaType, DeltaDocument } from "../../src/bucket/delta";
+import { DocumentId } from '../../src/document/Document';
import { EventEmitter } from 'events';
import { expect, use as chai_use } from 'chai';
@@ -308,11 +309,12 @@ describe( 'system.DeltaProcessor', () =>
}[]>[
{
label: 'No deltas are processed',
- docs: [
+ given: [
{
id: 123,
lastUpdate: 123123123,
- bucket: {},
+ data: {},
+ ratedata: {},
rdelta: {},
},
],
@@ -324,7 +326,8 @@ describe( 'system.DeltaProcessor', () =>
{
id: 123,
lastUpdate: 123123123,
- bucket: { foo: [ 'start_bar' ] },
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
rdelta: {
data: [
{
@@ -341,14 +344,16 @@ describe( 'system.DeltaProcessor', () =>
],
expected: [
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'second_bar' ] },
+ bucket: { foo: [ 'second_bar' ] },
+ ratedata: {},
},
],
},
@@ -358,17 +363,18 @@ describe( 'system.DeltaProcessor', () =>
{
id: 123,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_123' ] },
+ ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_123' ] },
timestamp: 234,
},
],
ratedata: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_123' ] },
timestamp: 123,
},
],
@@ -377,19 +383,20 @@ describe( 'system.DeltaProcessor', () =>
{
id: 234,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_234' ] },
+ ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_234' ] },
timestamp: 123,
},
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_234' ] },
timestamp: 234,
},
{
- data: { foo: [ 'third_bar' ] },
+ data: { foo: [ 'third_bar_234' ] },
timestamp: 345,
},
],
@@ -398,15 +405,16 @@ describe( 'system.DeltaProcessor', () =>
{
id: 345,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_345' ] },
+ ratedata: {},
rdelta: {
ratedata: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_345' ] },
timestamp: 123,
},
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_345' ] },
timestamp: 234,
},
],
@@ -415,60 +423,73 @@ describe( 'system.DeltaProcessor', () =>
],
expected: [
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'first_bar_123' ] },
+ bucket: { foo: [ 'start_bar_123' ] },
+ ratedata: { foo: [ 'first_bar_123' ] },
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'second_bar_123' ] },
+ bucket: { foo: [ 'second_bar_123' ] },
+ ratedata: { foo: [ 'first_bar_123' ] },
},
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'first_bar_234' ] },
+ bucket: { foo: [ 'first_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'second_bar_234' ] },
+ bucket: { foo: [ 'second_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'third_bar' ] },
- bucket: { foo: [ 'third_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'third_bar_234' ] },
+ bucket: { foo: [ 'third_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 345,
+ doc_id: 345,
+ delta: { foo: [ 'first_bar_345' ] },
+ bucket: { foo: [ 'start_bar_345' ] },
+ ratedata: { foo: [ 'first_bar_345' ] },
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 345,
+ doc_id: 345,
+ delta: { foo: [ 'second_bar_345' ] },
+ bucket: { foo: [ 'start_bar_345' ] },
+ ratedata: { foo: [ 'second_bar_345' ] },
},
],
},
- ] ).forEach( ( { given, expected, label } ) => it( label, () =>
+ ] ).forEach( ( { label, given, expected } ) => it( label, () =>
{
let published: any = [];
const dao = createMockDeltaDao();
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
- dao.getUnprocessedDocuments = (): Promise<Record<string, any>[]> =>
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
{
return Promise.resolve( given );
}
- publisher.publish = ( delta, bucket, doc_id ): Promise<void> =>
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
{
published.push( {
- delta: delta.data,
- bucket: bucket,
- doc_id: doc_id,
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
} );
return Promise.resolve();
@@ -479,6 +500,203 @@ describe( 'system.DeltaProcessor', () =>
.then( _ => expect( published ).to.deep.equal( expected ) );
} ) );
} );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Marks document in error state and continues', () =>
+ {
+ let published: any = [];
+ let error_flag_set = false;
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ const expected_published = [
+ {
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ },
+ {
+ doc_id: 234,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ }
+ ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ {
+ error_flag_set = true;
+ return Promise.resolve();
+ }
+
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect( new Sut( dao, publisher, emitter ).process() )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( error_flag_set ).to.be.true;
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Failure to set document error state further processing', () =>
+ {
+ let published: any = [];
+ let caught_error = '';
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ // Only one is published
+ const expected_published = [ {
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ } ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( 'Couldn\'t mark document' ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect(
+ new Sut( dao, publisher, emitter ).process()
+ .catch( e => { caught_error = e.message } )
+ )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( caught_error ).to.equal( expected_error );
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
} );
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
index 40663d0..fcb788c 100644
--- a/test/system/DeltaPublisherTest.ts
+++ b/test/system/DeltaPublisherTest.ts
@@ -19,13 +19,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+import { AmqpConnection } from '../../src/system/amqp/AmqpConnection';
+import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
-import { AmqpConfig } from '../../src/system/AmqpPublisher';
+import { DocumentId } from '../../src/document/Document';
+import { Duplex } from 'stream';
import { EventEmitter } from "events";
+import { hasContext } from '../../src/error/ContextError';
+import { AmqpError } from '../../src/error/AmqpError';
+import { Channel } from 'amqplib';
+import {
+ createAvroEncoder,
+ AvroEncoderCtr,
+ AvroSchema,
+} from '../../src/system/avro/AvroFactory';
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
+const sinon = require( 'sinon' );
describe( 'server.DeltaPublisher', () =>
{
@@ -33,24 +45,96 @@ describe( 'server.DeltaPublisher', () =>
{
it( 'sends a message', () =>
{
- const conf = createMockConf();
- const emitter = new EventEmitter();
+ let publish_called = false;
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ conn.getAmqpChannel = () =>
+ {
+ return <Channel>{
+ publish: ( _: any, __: any, buf: any, ___: any ) =>
+ {
+ expect( buf instanceof Buffer ).to.be.true;
- console.log( new Sut( conf, emitter, ts_ctr ) );
- expect( true ).to.be.true
+ publish_called = true;
+
+ return true;
+ }
+ };
+ };
+
+ const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn );
+
+ return expect(
+ sut.publish( <DocumentId>123, delta, bucket, ratedata )
+ ).to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( publish_called ).to.be.true;
+ } );
} );
- } );
- describe( '#sendMessage', () =>
- {
- it( 'sends a message', () =>
+ ( <[string, () => Channel | undefined, Error, string ][]>[
+ [
+ 'Throws an error when publishing was unsuccessful',
+ () =>
+ {
+ return <Channel>{
+ publish: ( _: any, __: any, _buf: any, ___: any ) =>
+ {
+ return false;
+ }
+ };
+ },
+ Error,
+ 'Delta publish failed'
+ ],
+ [
+ 'Throws an error when no amqp channel is found',
+ () =>
+ {
+ return undefined;
+ },
+ AmqpError,
+ 'Error sending message: No channel'
+ ]
+ ] ).forEach( ( [ label, getChannelF, error_type, err_msg ] ) =>
+ it( label, () =>
{
- const conf = createMockConf();
- const emitter = new EventEmitter();
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const doc_id = <DocumentId>123;
+ const expected = {
+ doc_id: doc_id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp
+ }
- console.log( new Sut( conf, emitter, ts_ctr ) );
- expect( true ).to.be.true
- } );
+ conn.getAmqpChannel = getChannelF;
+
+ const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn )
+ .publish( doc_id, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith(
+ error_type, err_msg
+ ),
+ result.catch( e =>
+ {
+ if ( !hasContext( e ) )
+ {
+ return expect.fail();
+ }
+
+ return expect( e.context ).to.deep.equal( expected );
+ } )
+ ] );
+ } ) );
} );
describe( '#avroEncode parses', () =>
@@ -137,32 +221,26 @@ describe( 'server.DeltaPublisher', () =>
{
it( label, () =>
{
- let errorCalled = false;
+ const emitter = createMockEventEmitter();
+ const conn = createMockAmqpConnection();
+ const data = createMockData( delta_data );
+ const sut = new Sut(
+ emitter,
+ ts_ctr,
+ createAvroEncoder,
+ conn,
+ );
- const emitter = <EventEmitter>{
- emit( _event_id, _err )
+ sut.avroEncode( data )
+ .then( b =>
{
- errorCalled = true;
-
- console.log( 'server.DeltaPublisher.Error' + _err );
- }
- }
-
- const conf = createMockConf();
- const data = createMockData( delta_data );
- const sut = new Sut( conf, emitter, ts_ctr );
- const buffer = sut.avroEncode( data );
-
- if ( valid )
- {
- expect( typeof(buffer) ).to.equal( 'object' );
- }
- else
- {
- expect( buffer ).to.equal( null );
- }
-
- expect( valid ).to.equal( !errorCalled );
+ expect( typeof(b) ).to.equal( 'object' );
+ expect( valid ).to.be.true;
+ } )
+ .catch( _ =>
+ {
+ expect( valid ).to.be.false;
+ } );
} );
} );
} );
@@ -301,9 +379,16 @@ describe( 'server.DeltaPublisher', () =>
{
it( label, () =>
{
- const emitter = <EventEmitter>{}
- const conf = createMockConf();
- const sut = new Sut( conf, emitter, ts_ctr );
+ const encoded = 'FooBar';
+ const emitter = createMockEventEmitter();
+ const conn = createMockAmqpConnection();
+ const avroEncoderCtr = createMockEncoder( encoded );
+ const sut = new Sut(
+ emitter,
+ ts_ctr,
+ avroEncoderCtr,
+ conn,
+ );
const actual = sut.setDataTypes( delta_data );
expect( actual ).to.deep.equal( expected );
@@ -312,14 +397,39 @@ describe( 'server.DeltaPublisher', () =>
} );
} );
+
function ts_ctr(): UnixTimestamp
{
return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
}
-function createMockConf(): AmqpConfig
+
+function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
+{
+ return ( _schema: AvroSchema ) =>
+ {
+ const mock = sinon.mock( Duplex );
+
+ mock.on = ( _: string, __: any ) => {};
+ mock.end = ( _: any ) => { return mock_encoded_data; };
+
+ return mock;
+ };
+}
+
+
+function createMockEventEmitter(): EventEmitter
+{
+ return <EventEmitter>{};
+}
+
+
+function createMockAmqpConnection(): AmqpConnection
{
- return <AmqpConfig>{};
+ return <AmqpConnection>{
+ connect: () => {},
+ getExchangeName: () => { 'Foo' },
+ };
}
@@ -339,11 +449,8 @@ function createMockData( delta_data: any ): any
modified: 1573856916,
top_visited_step: '2',
},
- session: {
- entity_name: 'Foobar',
- entity_id: 123123 ,
- },
- data: null,
+ data: null,
+ ratedata: null,
delta: {
Data: {
bucket: delta_data,
@@ -356,4 +463,22 @@ function createMockData( delta_data: any ): any
},
},
};
+}
+
+
+function createMockBucketData(): Record<string, any>
+{
+ return {
+ foo: [ 'bar', 'baz' ]
+ }
+}
+
+
+function createMockDelta(): Delta<any>
+{
+ return <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ }
} \ No newline at end of file
diff --git a/test/system/EventLoggerTest.ts b/test/system/EventLoggerTest.ts
deleted file mode 100644
index b3d5f0f..0000000
--- a/test/system/EventLoggerTest.ts
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Event logger test
- *
- * Copyright (C) 2010-2019 R-T Specialty, LLC.
- *
- * This file is part of the Liza Data Collection Framework.
- *
- * liza is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-import { EventLogger as Sut } from '../../src/system/EventLogger';
-import { EventEmitter } from "events";
-import { expect } from 'chai';
-
-const sinon = require( 'sinon' );
-
-declare interface MockConsole extends Console {
- getLevel(): string,
-}
-
-describe( 'system.EventLogger captures and logs events', () =>
-{
- [
- {
- event_id: 'document-processed',
- console_level: 'log',
- },
- {
- event_id: 'delta-publish',
- console_level: 'log',
- },
- {
- event_id: 'amqp-conn-error',
- console_level: 'warn',
- },
- {
- event_id: 'amqp-reconnect',
- console_level: 'warn',
- },
- {
- event_id: 'amqp-reconnect-fail',
- console_level: 'error',
- },
- {
- event_id: 'avro-err',
- console_level: 'error',
- },
- {
- event_id: 'dao-err',
- console_level: 'error',
- },
- {
- event_id: 'publish-err',
- console_level: 'error',
- },
- ].forEach( ( { event_id, console_level } ) =>
- {
- it( event_id + ' triggers console output level: ' + console_level, () =>
- {
- const emitter = new EventEmitter();
- const con = createMockConsole();
- const env = 'test';
-
- new Sut( con, env, emitter, ts_ctr );
-
- emitter.emit( event_id );
-
- expect( con.getLevel() ).to.equal( console_level );
- } );
- } );
-} );
-
-
-function ts_ctr(): UnixTimestamp
-{
- return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
-}
-
-
-function createMockConsole(): MockConsole
-{
- const mock = sinon.mock( console );
-
- mock.level = '';
- mock.info = ( _str: string ) => { mock.level = 'info'; };
- mock.log = ( _str: string ) => { mock.level = 'log'; };
- mock.warn = ( _str: string ) => { mock.level = 'warn'; };
- mock.error = ( _str: string ) => { mock.level = 'error'; };
- mock.getLevel = () => mock.level;
-
- return mock;
-} \ No newline at end of file
diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts
new file mode 100644
index 0000000..581437c
--- /dev/null
+++ b/test/system/EventMediatorTest.ts
@@ -0,0 +1,139 @@
+/**
+ * Event logger test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { EventMediator as Sut } from '../../src/system/EventMediator';
+import { context } from '../../src/error/ContextError';
+import { EventEmitter } from "events";
+import { expect } from 'chai';
+import { PsrLogger } from '../../src/system/PsrLogger';
+
+
+describe( 'system.EventLogger captures and logs events', () =>
+{
+ it( 'document-processed triggers log#notice', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'document-processed';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.notice = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'delta-publish triggers log#notice', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'delta-publish';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.notice = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'amqp-conn-error triggers log#warning', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'amqp-conn-error';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.warning = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'amqp-reconnect triggers log#warning', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'amqp-reconnect';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.warning = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'context is retrieved from error', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'error';
+ const err_msg = 'Foo';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+ const err_context = { bar: 'baz' };
+
+ log.error = ( str: string, context: any ) =>
+ {
+ method_called = true;
+
+ expect( str ).to.equal( err_msg );
+ expect( context ).to.equal( err_context );
+ };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id, context( new Error( err_msg ), err_context ) );
+
+ expect( method_called ).to.be.true;
+ } );
+} );
+
+
+function createMockLogger(): PsrLogger
+{
+ return <PsrLogger>{
+ debug( _msg: string | object, _context: object ){},
+ info( _msg: string | object, _context: object ){},
+ notice( _msg: string | object, _context: object ){ console.log( 'asdasd msg: ', _msg ); },
+ warning( _msg: string | object, _context: object ){},
+ error( _msg: string | object, _context: object ){},
+ critical( _msg: string | object, _context: object ){},
+ alert( _msg: string | object, _context: object ){},
+ emergency( _msg: string | object, _context: object ){},
+ log( _level: any, _msg: string | object, _context: object ){},
+ };
+} \ No newline at end of file
diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts
index 07867a8..eafc77d 100644
--- a/test/system/MetricsCollectorTest.ts
+++ b/test/system/MetricsCollectorTest.ts
@@ -19,13 +19,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-import { PrometheusFactory } from '../../src/system/PrometheusFactory';
+import {
+ PrometheusFactory,
+ PrometheusConfig,
+} from '../../src/system/PrometheusFactory';
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
import { EventEmitter } from 'events';
import { expect } from 'chai';
import {
MetricsCollector as Sut,
- PrometheusConfig,
MetricTimer,
} from '../../src/system/MetricsCollector';
@@ -35,8 +37,8 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
{
it( 'process-complete event is hooked', () =>
{
- let histogram_called = false;
- let counter_called = false;
+ let histogram_called = false;
+ let counter_called = false;
const emitter = new EventEmitter();
const conf = createMockConfig();
@@ -46,18 +48,20 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
counter_cb: () => { counter_called = true },
} );
- new Sut( factory, conf, emitter, timer );
+ const sut = new Sut( factory, conf, emitter, timer );
emitter.emit( 'delta-process-end' );
expect( histogram_called ).to.be.true;
expect( counter_called ).to.be.true;
+
+ sut.stop();
} );
it( 'process-error event is hooked', () =>
{
- let counter_called = false;
+ let counter_called = false;
const emitter = new EventEmitter();
const conf = createMockConfig();
@@ -66,11 +70,13 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
counter_cb: () => { counter_called = true },
} );
- new Sut( factory, conf, emitter, timer );
+ const sut = new Sut( factory, conf, emitter, timer );
emitter.emit( 'delta-process-error' );
expect( counter_called ).to.be.true;
+
+ sut.stop();
} );
@@ -80,7 +86,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
const uid = 'foo';
const start_time_ns = 1234;
const end_time_ns = 5678;
- const expected_ms = ( end_time_ns - start_time_ns ) / 1000;
+ const expected_ms = ( end_time_ns - start_time_ns ) / 1000000;
const emitter = new EventEmitter();
const conf = createMockConfig();
const timer = createMockTimer( start_time_ns, end_time_ns );
@@ -88,12 +94,14 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
histogram_cb: ( n: number ) => { actual_ms = n },
} );
- new Sut( factory, conf, emitter, timer );
+ const sut = new Sut( factory, conf, emitter, timer );
emitter.emit( 'delta-process-start', uid );
emitter.emit( 'delta-process-end', uid );
expect( actual_ms ).to.be.equal( expected_ms );
+
+ sut.stop();
} );
} );
diff --git a/test/system/StandardLoggerTest.ts b/test/system/StandardLoggerTest.ts
new file mode 100644
index 0000000..918bfd1
--- /dev/null
+++ b/test/system/StandardLoggerTest.ts
@@ -0,0 +1,178 @@
+/**
+ * Event logger test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { StandardLogger as Sut } from '../../src/system/StandardLogger';
+import { LogLevel } from '../../src/system/PsrLogger';
+import { expect } from 'chai';
+
+const sinon = require( 'sinon' );
+
+declare interface MockConsole extends Console {
+ getLevel(): string,
+ getStr(): string,
+}
+
+describe( 'system.EventLogger captures and logs events', () =>
+{
+ it( 'debug triggers console output level: info', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.debug( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'info' );
+ } );
+
+ it( 'info triggers console output level: info', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.info( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'info' );
+ } );
+
+ it( 'notice triggers console output level: log', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.notice( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'log' );
+ } );
+
+ it( 'warning triggers console output level: warn', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.warning( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'warn' );
+ } );
+
+ it( 'error triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.error( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'critical triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.critical( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'alert triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.alert( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'emergency triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.emergency( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'log triggers corresponding log level', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.log( LogLevel.ERROR, 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'Context is included in structured output', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+ const context = { bar: 'baz' };
+ const expected_output = {
+ message: 'Foo',
+ timestamp: 123123,
+ service: 'quote-server',
+ env: 'test',
+ severity: 'NOTICE',
+ context: {
+ bar: 'baz',
+ },
+ };
+
+ sut.notice( 'Foo', context );
+
+ expect( con.getStr() ).to.deep.equal( expected_output );
+ } );
+} );
+
+
+function ts_ctr(): UnixTimestamp
+{
+ return <UnixTimestamp>123123;
+}
+
+
+function createMockConsole(): MockConsole
+{
+ const mock = sinon.mock( console );
+
+ mock.lvl = '';
+ mock.str = '';
+ mock.info = ( str: string ) => { mock.str = str; mock.lvl = 'info'; };
+ mock.log = ( str: string ) => { mock.str = str; mock.lvl = 'log'; };
+ mock.warn = ( str: string ) => { mock.str = str; mock.lvl = 'warn'; };
+ mock.error = ( str: string ) => { mock.str = str; mock.lvl = 'error'; };
+ mock.getLevel = () => mock.lvl;
+ mock.getStr = () => mock.str;
+
+ return mock;
+}