Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSchaffer, Austin <austin.schaffer@ryansg.com>2019-12-12 15:21:04 -0500
committerSchaffer, Austin <austin.schaffer@ryansg.com>2019-12-12 15:21:04 -0500
commit6c38f7d164b2b9d79e926fa47affc4edfd1f7e8a (patch)
treed828128f751774eecbe8458ec3e39038f86c9169
parent31a820a34f2694c3d144488e75580279c85929bc (diff)
parentcf0ca51cefcdee1455c24fbd178608a4939bab7c (diff)
downloadliza-6c38f7d164b2b9d79e926fa47affc4edfd1f7e8a.tar.gz
liza-6c38f7d164b2b9d79e926fa47affc4edfd1f7e8a.tar.bz2
liza-6c38f7d164b2b9d79e926fa47affc4edfd1f7e8a.zip
Merge branch 'jira-5312' into 'master'
[DEV-5312] Process to publish quote modifications to new exchange See merge request floss/liza!68
-rw-r--r--.env17
-rw-r--r--.gitignore4
-rw-r--r--bin/delta-processor.in33
-rw-r--r--bin/delta-processor.ts175
-rw-r--r--bin/server.ts (renamed from bin/server.js)49
-rw-r--r--conf/vanilla-server.json1
-rw-r--r--configure.ac4
-rw-r--r--package.json.in17
-rw-r--r--src/bucket/delta.ts169
-rw-r--r--src/conf/ConfLoader.ts (renamed from src/conf/ConfLoader.js)51
-rw-r--r--src/conf/ConfStore.d.ts32
-rw-r--r--src/conf/ConfStore.js2
-rw-r--r--src/document/Document.ts26
-rw-r--r--src/error/AmqpError.ts27
-rw-r--r--src/error/DaoError.ts27
-rw-r--r--src/quote/BaseQuote.d.ts53
-rw-r--r--src/server/daemon/controller.js6
-rw-r--r--src/server/db/MongoServerDao.ts (renamed from src/server/db/MongoServerDao.js)489
-rw-r--r--src/server/db/ServerDao.d.ts4
-rw-r--r--src/server/log/Log.js2
-rw-r--r--src/server/quote/ServerSideQuote.d.ts16
-rw-r--r--src/server/rater/DslRaterContext.js6
-rw-r--r--src/server/request/UserSession.d.ts27
-rw-r--r--src/server/service/RatingService.ts2
-rw-r--r--src/server/token/MongoTokenDao.ts6
-rw-r--r--src/store/Store.d.ts114
-rw-r--r--src/system/AmqpPublisher.ts111
-rw-r--r--src/system/DeltaProcessor.ts300
-rw-r--r--src/system/DeltaPublisher.ts124
-rw-r--r--src/system/EventMediator.ts95
-rw-r--r--src/system/MessageWriter.ts44
-rw-r--r--src/system/MetricsCollector.ts205
-rw-r--r--src/system/PrometheusFactory.ts171
-rw-r--r--src/system/PsrLogger.ts117
-rw-r--r--src/system/StandardLogger.ts199
-rw-r--r--src/system/amqp/AmqpConnection.ts153
-rw-r--r--src/system/avro/AvroFactory.ts32
-rw-r--r--src/system/avro/V1MessageWriter.ts259
-rw-r--r--src/system/avro/schema.avsc215
-rw-r--r--src/system/db/DeltaDao.ts87
-rw-r--r--src/system/db/MongoDeltaDao.ts278
-rw-r--r--src/system/db/MongoFactory.ts176
-rw-r--r--src/types/avro-js.d.ts89
-rw-r--r--src/types/misc.d.ts5
-rw-r--r--src/types/mongodb.d.ts128
-rw-r--r--src/version.d.ts39
-rw-r--r--test/bucket/delta.ts205
-rw-r--r--test/conf/ConfLoaderTest.ts (renamed from test/conf/ConfLoaderTest.js)76
-rw-r--r--test/server/dapi/TokenedDataApiTest.ts2
-rw-r--r--test/server/db/MongoServerDaoTest.ts (renamed from test/server/db/MongoServerDaoTest.js)88
-rw-r--r--test/server/request/DataProcessorTest.ts41
-rw-r--r--test/server/service/RatingServiceTest.ts35
-rw-r--r--test/server/token/MongoTokenDaoTest.ts14
-rw-r--r--test/system/DeltaProcessorTest.ts665
-rw-r--r--test/system/DeltaPublisherTest.ts236
-rw-r--r--test/system/EventMediatorTest.ts145
-rw-r--r--test/system/MetricsCollectorTest.ts165
-rw-r--r--test/system/StandardLoggerTest.ts178
-rw-r--r--test/system/V1MessageWriterTest.ts532
-rw-r--r--test/system/amqp/AmqpConnectionTest.ts112
-rw-r--r--tsconfig.json1
61 files changed, 6206 insertions, 475 deletions
diff --git a/.env b/.env
new file mode 100644
index 0000000..21f018c
--- /dev/null
+++ b/.env
@@ -0,0 +1,17 @@
+AMQP_HOST=localhost
+AMQP_PORT=5672
+AMQP_USER=
+AMQP_PASS=
+AMQP_FRAMEMAX=0
+AMQP_HEARTBEAT=2
+AMQP_VHOST=
+AMQP_EXCHANGE=
+AMQP_RETRIES=30
+AMQP_RETRY_WAIT=1
+PROM_HOST=
+PROM_PORT=9091
+PROM_PUSH_INTERVAL_MS=5000
+PROM_BUCKETS_START=0
+PROM_BUCKETS_WIDTH=10
+PROM_BUCKETS_COUNT=10
+PROCESS_INTERVAL_MS=2000
diff --git a/.gitignore b/.gitignore
index 61aba11..cc6afae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@ Makefile.in
# generated by configure
bin/server
+bin/delta-processor
src/version.js
/config.*
Makefile
@@ -27,6 +28,7 @@ src/**/index.js
# npm
node_modules
-# typescript
+# typescript output
+bin/*.js
tsconfig.tsbuildinfo
diff --git a/bin/delta-processor.in b/bin/delta-processor.in
new file mode 100644
index 0000000..f0984e1
--- /dev/null
+++ b/bin/delta-processor.in
@@ -0,0 +1,33 @@
+#!/bin/sh
+# Start Liza delta processor using Node.js executable determined at
+# configure-time
+#
+# Copyright (C) 2010-2019 R-T Specialty, LLC.
+#
+# This file is part of liza.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# In addition to the configure-time NODE_FLAGS, the NODE_FLAGS environment
+# variable can be used to add additional arguments to this script.
+# WARNING: NODE_FLAGS arguments provided via environment varialbes are _not_
+# escaped, so be mindful of word expansion!
+#
+# @AUTOGENERATED@
+##
+
+cd "$( dirname $( readlink -f "$0" ) )"
+
+exec "@NODE@" @NODE_FLAGS@ $NODE_FLAGS delta-processor.js "$@"
+
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
new file mode 100644
index 0000000..83e42e3
--- /dev/null
+++ b/bin/delta-processor.ts
@@ -0,0 +1,175 @@
+/**
+ * Start the Liza delta processor
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import * as amqplib from 'amqplib';
+import { createAmqpConfig } from '../src/system/AmqpPublisher';
+import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
+import { DeltaProcessor } from '../src/system/DeltaProcessor';
+import { DeltaPublisher } from '../src/system/DeltaPublisher';
+import { MongoCollection } from '../src/types/mongodb';
+import { createAvroEncoder } from '../src/system/avro/AvroFactory';
+import { V1MessageWriter } from '../src/system/avro/V1MessageWriter';
+import {
+ createMongoConfig,
+ createMongoDB,
+ getMongoCollection,
+} from '../src/system/db/MongoFactory';
+import { EventMediator } from '../src/system/EventMediator';
+import { EventEmitter } from 'events';
+import { StandardLogger } from '../src/system/StandardLogger';
+import { MetricsCollector } from '../src/system/MetricsCollector';
+import {
+ PrometheusFactory,
+ createPrometheusConfig,
+} from '../src/system/PrometheusFactory';
+import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
+import { parse as avro_parse } from 'avro-js';
+
+require('dotenv-flow').config();
+
+const amqp_conf = createAmqpConfig( process.env );
+const prom_conf = createPrometheusConfig( process.env );
+const db_conf = createMongoConfig( process.env );
+const db = createMongoDB( db_conf );
+const process_interval_ms = +( process.env.process_interval_ms || 2000 );
+const env = process.env.NODE_ENV || 'Unknown Environment';
+const emitter = new EventEmitter();
+const log = new StandardLogger( console, ts_ctr, env );
+const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter );
+
+const message_writer = new V1MessageWriter(
+ createAvroEncoder,
+ avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
+);
+
+const publisher = new DeltaPublisher(
+ emitter,
+ ts_ctr,
+ amqp_connection,
+ message_writer,
+);
+
+// Prometheus Metrics
+const prom_factory = new PrometheusFactory();
+const metrics = new MetricsCollector(
+ prom_factory,
+ prom_conf,
+ emitter,
+ process.hrtime,
+);
+
+// Structured logging
+new EventMediator( log, emitter );
+
+let process_interval: NodeJS.Timer;
+let dao: MongoDeltaDao;
+
+getMongoCollection( db, db_conf )
+ .then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } )
+ .then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } )
+ .then( _ => amqp_connection.connect() )
+ .then( _ =>
+ {
+ log.info( 'Liza Delta Processor' );
+
+ handleShutdown();
+
+ const processor = new DeltaProcessor( dao, publisher, emitter );
+
+ return new Promise( ( _resolve, reject ) =>
+ {
+ process_interval = setInterval( () =>
+ {
+ try
+ {
+ processor.process()
+ .catch( err => reject( err ) );
+ }
+ catch ( err )
+ {
+ reject( err );
+ }
+
+ dao.getErrorCount()
+ .then( count => { metrics.updateErrorCount( count ) } );
+ }, process_interval_ms );
+ } );
+ } )
+ .catch( e =>
+ {
+ emitter.emit( 'error', e );
+ process.exit( 1 );
+ } );
+
+
+/**
+ * Hook shutdown events
+ */
+function handleShutdown(): void
+{
+ process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } )
+ .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } );
+}
+
+
+/**
+ * Perform a graceful shutdown
+ *
+ * @param signal - the signal that caused the shutdown
+ */
+function shutdown( signal: string ): void
+{
+ log.info( 'Received ' + signal + '. Beginning graceful shutdown:' );
+ log.info( '...Stopping processing interval' );
+
+ clearInterval( process_interval );
+
+ log.info( '...Closing MongoDb connection' );
+
+ db.close( ( err, _data ) =>
+ {
+ if ( err )
+ {
+ console.error( ' Error closing connection: ' + err );
+ }
+ } );
+
+ log.info( '...Closing AMQP connection...' );
+
+ amqp_connection.close();
+
+ log.info( '...Stopping the metrics collector...' );
+
+ metrics.stop();
+
+ log.info( 'Shutdown complete. Exiting.' );
+
+ process.exit();
+}
+
+
+/** Timestamp constructor
+ *
+ * @return a timestamp
+ */
+function ts_ctr(): UnixTimestamp
+{
+ return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
+}
diff --git a/bin/server.js b/bin/server.ts
index ec93fb7..b14a7c7 100644
--- a/bin/server.js
+++ b/bin/server.ts
@@ -1,7 +1,7 @@
/**
* Start the Liza Server
*
- * Copyright (C) 2017 R-T Specialty, LLC.
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
*
* This file is part of the Liza Data Collection Framework.
*
@@ -19,19 +19,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-'use strict';
+import fs = require( 'fs' );
+import path = require( 'path' );
-const fs = require( 'fs' );
-const path = require( 'path' );
-
-const {
- conf: {
- ConfLoader,
- ConfStore,
- },
- server,
- version,
-} = require( '../' );
+import { ConfLoader } from "../src/conf/ConfLoader";
+import { ConfStore } from "../src/conf/ConfStore";
+import * as version from "../src/version";
// kluge for now
const conf_path = (
@@ -42,7 +35,7 @@ const conf_path = (
const conf_dir = path.dirname( conf_path );
-ConfLoader( fs, ConfStore )
+new ConfLoader( fs, ConfStore )
.fromFile( conf_path )
.then( conf => Promise.all( [
conf.get( 'name' ),
@@ -70,12 +63,12 @@ ConfLoader( fs, ConfStore )
* Produce an absolute path if `path` is absolute, otherwise a path relative
* to the configuration directory
*
- * @param {string} conf_path configuration path (for relative `path`)
- * @param {string} path path to resolve
+ * @param conf_path - configuration path (for relative `path`)
+ * @param path - path to resolve
*
* @return resolved path
*/
-function _resolvePath( conf_path, path )
+function _resolvePath( conf_path: string, path: string ): string
{
return ( path[ 0 ] === '/' )
? path
@@ -83,15 +76,29 @@ function _resolvePath( conf_path, path )
}
-function writePidFile( pid_path )
+/**
+ * Write process id (PID) file
+ *
+ * @param pid_path - path to pid file
+ */
+function writePidFile( pid_path: string ): void
{
- fs.writeFile( pid_path, process.pid );
+ fs.writeFileSync( pid_path, process.pid );
- process.on( 'exit', () => fs.unlink( pid_path ) );
+ process.on( 'exit', () => fs.unlink( pid_path, () => {} ) );
}
-function greet( name, pid_path )
+/**
+ * Output greeting
+ *
+ * The greeting contains the program name, version, configuration path,
+ * and PID file path.
+ *
+ * @param name - program name
+ * @param pid_path - path to PID file
+ */
+function greet( name: string, pid_path: string ): void
{
console.log( `${name} (liza-${version})`);
console.log( `Server configuration: ${conf_path}` );
diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json
index da222bb..523bb7b 100644
--- a/conf/vanilla-server.json
+++ b/conf/vanilla-server.json
@@ -62,6 +62,7 @@
"vhost": "/",
"queueName": "postrate"
}
+
},
"c1export": {
"host": "localhost",
diff --git a/configure.ac b/configure.ac
index 1f1675f..3a0fbc5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,6 +1,6 @@
## For use my automake and autoconf
#
-# Copyright (C) 2014--2017 R-T Specialty, LLC.
+# Copyright (C) 2010-2019 R-T Specialty, LLC.
#
# This file is part of liza.
#
@@ -88,6 +88,8 @@ AC_CONFIG_FILES([Makefile package.json
src/version.js])
AC_CONFIG_FILES([bin/server],
[chmod +x bin/server])
+AC_CONFIG_FILES([bin/delta-processor],
+ [chmod +x bin/delta-processor])
AC_OUTPUT
diff --git a/package.json.in b/package.json.in
index c0db61e..f7bed5e 100644
--- a/package.json.in
+++ b/package.json.in
@@ -16,7 +16,8 @@
},
"bin": {
- "liza-server": "bin/server"
+ "liza-server": "bin/server",
+ "delta-processor": "bin/delta-processor"
},
"scripts": {
@@ -24,13 +25,14 @@
},
"dependencies": {
- "easejs": "0.2.x",
- "mongodb": "1.2.14",
- "amqplib": "0.5.3"
+ "easejs": "0.2.x",
+ "mongodb": "1.2.14",
+ "dotenv-flow": "3.1.0",
+ "amqplib": "0.5.3"
},
"devDependencies": {
"typescript": "~3.7",
- "@types/node": "@TS_NODE_VERSION@",
+ "@types/node": "12.12.11",
"chai": ">=1.9.1 < 4",
"@types/chai": ">=1.9.1 < 4",
"chai-as-promised": "7.1.0",
@@ -38,7 +40,10 @@
"mocha": "5.2.0",
"@types/mocha": "5.2.0",
"sinon": ">=1.17.4",
- "es6-promise": "~3"
+ "es6-promise": "~3",
+ "@types/amqplib": "0.5.13",
+ "avro-js": "1.9.1",
+ "prom-client": "11.0.0"
},
"licenses": [
diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts
index 58c75ec..7c5dd6a 100644
--- a/src/bucket/delta.ts
+++ b/src/bucket/delta.ts
@@ -18,14 +18,21 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+import { DocumentId } from '../document/Document';
+
/** The data structure expected for a document's internal key/value store */
export type Kv<T = any> = Record<string, T[]>;
+
/** Possible delta values for Kv array indexes */
export type DeltaDatum<T> = T | null | undefined;
+/** Possible delta types */
+export type DeltaType = 'ratedata' | 'data';
+
+
/**
* The constructor type for a delta generating function
*
@@ -44,7 +51,63 @@ export type DeltaConstructor<T = any, U extends Kv<T> = Kv<T>, V extends Kv<T> =
export type DeltaResult<T> = { [K in keyof T]: DeltaDatum<T[K]> | null };
- /**
+/** Complete delta type */
+export type Delta<T> = {
+ type: DeltaType,
+ timestamp: UnixTimestamp,
+ data: DeltaResult<T>,
+}
+
+
+/** Reverse delta type */
+export type ReverseDelta<T> = {
+ data: Delta<T>[],
+ ratedata: Delta<T>[],
+}
+
+
+/** Structure for Published delta count */
+export type PublishDeltaCount = {
+ data?: number,
+ ratedata?: number,
+}
+
+
+/**
+ * Document structure
+ */
+export interface DeltaDocument
+{
+ /** The document id */
+ id: DocumentId,
+
+ /** The entity name */
+ agentName: string,
+
+ /** The entity id */
+ agentEntityId: number,
+
+ /** The time the document was created */
+ startDate: UnixTimestamp,
+
+ /** The time the document was updated */
+ lastUpdate: UnixTimestamp,
+
+ /** The data bucket */
+ data: Record<string, any>,
+
+ /** The rate data bucket */
+ ratedata?: Record<string, any>,
+
+ /** The calculated reverse deltas */
+ rdelta?: ReverseDelta<any>,
+
+ /** A count of how many of each delta type have been processed */
+ totalPublishDelta?: PublishDeltaCount,
+};
+
+
+/**
* Create delta to transform from src into dest
*
* @param src - the source data set
@@ -98,12 +161,114 @@ export function createDelta<T, U extends Kv<T>, V extends Kv<T>>(
/**
+ * Apply a delta to a bucket
+ *
+ * @param bucket - The bucket data
+ * @param delta - The delta to apply
+ *
+ * @return the bucket with the delta applied
+ */
+export function applyDelta<T, U extends Kv<T>, V extends Kv<T>>(
+ bucket: U = <U>{},
+ delta: DeltaResult<U & V>,
+): U
+{
+ const appliedDelta: DeltaResult<any> = {};
+
+ if( !delta )
+ {
+ return bucket;
+ }
+
+ // Loop through all keys
+ const key_set = new Set(
+ Object.keys( bucket ).concat( Object.keys( delta ) ) );
+
+ key_set.forEach( key =>
+ {
+ const bucket_data = bucket[ key ];
+ const delta_data = delta[ key ];
+
+ // If bucket does not contain the key, use entire delta data
+ if ( !bucket_data || !bucket_data.length )
+ {
+ appliedDelta[ key ] = delta_data;
+
+ return;
+ }
+
+ // If delta does not contain the key then retain bucket data
+ if ( delta_data === null )
+ {
+ return;
+ }
+
+ // If delta does not contain the key then retain bucket data
+ if ( delta_data === undefined )
+ {
+ appliedDelta[ key ] = bucket_data;
+
+ return;
+ }
+
+ // If neither condition above is true then create the key iteratively
+ appliedDelta[ key ] = _applyDeltaKey( bucket_data, delta_data );
+ } );
+
+ return <U>appliedDelta;
+}
+
+
+/**
+ * Apply the delta key iteratively
+ *
+ * @param bucket - The bucket data array
+ * @param delta - The delta data array
+ *
+ * @return the applied delta
+ */
+function _applyDeltaKey<T>(
+ bucket: T[],
+ delta: T[],
+): DeltaDatum<T>[]
+{
+ const data = [];
+ const max_size = Math.max( delta.length, bucket.length );
+
+ for ( let i = 0; i < max_size; i++ )
+ {
+ const delta_datum = delta[ i ];
+ const bucket_datum = bucket[ i ];
+
+ if ( delta_datum === null )
+ {
+ break;
+ }
+ else if ( delta_datum === undefined )
+ {
+ data[ i ] = bucket_datum;
+ }
+ else if ( _deepEqual( delta_datum, bucket_datum ) )
+ {
+ data[ i ] = bucket_datum;
+ }
+ else
+ {
+ data[ i ] = delta_datum;
+ }
+ }
+
+ return data;
+}
+
+
+/**
* Build the delta key iteratively
*
* @param src - the source data array
* @param dest - the destination data array
*
- * @return an object with an identical flag and a data array
+ * @return an object with an changed flag and a data array
*/
function _createDeltaKey<T>(
src: T[],
diff --git a/src/conf/ConfLoader.js b/src/conf/ConfLoader.ts
index 17f26ed..7d556e5 100644
--- a/src/conf/ConfLoader.js
+++ b/src/conf/ConfLoader.ts
@@ -19,9 +19,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-'use strict';
-
-const { Class } = require( 'easejs' );
+import { readFile } from "fs";
+import { Store } from "../store/Store";
/**
@@ -35,36 +34,22 @@ const { Class } = require( 'easejs' );
* TODO: Merging multiple configuration files would be convenient for
* modular configuration.
*/
-module.exports = Class( 'ConfLoader',
+export class ConfLoader
{
/**
- * Filesystem module
- * @type {fs}
- */
- 'private _fs': null,
-
- /**
- * Store object constructor
- * @type {function():Store}
- */
- 'private _storeCtor': null,
-
-
- /**
* Initialize with provided filesystem module and Store constructor
*
* The module should implement `#readFile` compatible with
* Node.js'. The Store constructor `store_ctor` is used to instantiate
* new stores to be populated with configuration data.
*
- * @param {fs} fs filesystem module
- * @param {function():Store} store_ctor Store object constructor
+ * @param fs - filesystem module
+ * @param store_ctor - Store object constructor
*/
- constructor( fs, store_ctor )
- {
- this._fs = fs;
- this._storeCtor = store_ctor;
- },
+ constructor(
+ private _fs: { readFile: typeof readFile },
+ private _storeCtor: () => Store,
+ ) {}
/**
@@ -72,11 +57,11 @@ module.exports = Class( 'ConfLoader',
*
* A Store will be produced, populated with the configuration data.
*
- * @param {string} filename path to configuration JSON
+ * @param filename - path to configuration JSON
*
- * @return {Promise.<Store>} a promise of a populated Store
+ * @return a promise of a populated Store
*/
- 'public fromFile'( filename )
+ fromFile( filename: string ): Promise<Store>
{
return new Promise( ( resolve, reject ) =>
{
@@ -104,7 +89,7 @@ module.exports = Class( 'ConfLoader',
}
} );
} );
- },
+ }
/**
@@ -112,12 +97,12 @@ module.exports = Class( 'ConfLoader',
*
* Parses configuration string as JSON.
*
- * @param {string} data raw configuration data
+ * @param data raw configuration data
*
- * @return {Promise.<Object>} `data` parsed as JSON
+ * @return `data` parsed as JSON
*/
- 'virtual protected parseConfData'( data )
+ protected parseConfData( data: string ): Promise<any>
{
return Promise.resolve( JSON.parse( data ) );
- },
-} );
+ }
+}
diff --git a/src/conf/ConfStore.d.ts b/src/conf/ConfStore.d.ts
new file mode 100644
index 0000000..8140736
--- /dev/null
+++ b/src/conf/ConfStore.d.ts
@@ -0,0 +1,32 @@
+/**
+ * Ideal Store for system configuration
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { Store } from "../store/Store";
+
+/**
+ * A store that recursively instantiates itself
+ *
+ * This store is ideal for nested configurations, and handles cases where
+ * configuration might be asynchronously retrieved. Nested values may be
+ * retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see
+ * trait `DelimitedKey` for more information and examples.
+ */
+export declare function ConfStore(): Store;
diff --git a/src/conf/ConfStore.js b/src/conf/ConfStore.js
index 0c0569f..d51ad38 100644
--- a/src/conf/ConfStore.js
+++ b/src/conf/ConfStore.js
@@ -36,7 +36,7 @@ const {
* retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see
* trait `DelimitedKey` for more information and examples.
*/
-module.exports = function ConfStore()
+exports.ConfStore = function ConfStore()
{
return MemoryStore
.use( AutoObjectStore( ConfStore ) )
diff --git a/src/document/Document.ts b/src/document/Document.ts
index 0db893a..8f05bac 100644
--- a/src/document/Document.ts
+++ b/src/document/Document.ts
@@ -18,7 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
- * The term "Quote" is synonymous with "Document"; this project is moving
+ * The term 'Quote' is synonymous with 'Document'; this project is moving
* more toward the latter as it is further generalized.
*/
@@ -31,7 +31,29 @@ export type DocumentId = NominalType<number, 'DocumentId'>;
/**
* Quote (Document) id
*
- * Where the term "Quote" is still used, this will allow for type
+ * Where the term 'Quote' is still used, this will allow for type
* compatibility and an easy transition.
*/
export type QuoteId = DocumentId;
+
+
+/**
+ * Document meta data
+ */
+export type DocumentMeta =
+{
+ /** The document id */
+ id: DocumentId,
+
+ /** The entity name */
+ entity_name: string,
+
+ /** The entity id */
+ entity_id: number,
+
+ /** The time the document was created */
+ startDate: UnixTimestamp,
+
+ /** The time the document was updated */
+ lastUpdate: UnixTimestamp,
+} \ No newline at end of file
diff --git a/src/error/AmqpError.ts b/src/error/AmqpError.ts
new file mode 100644
index 0000000..8bf308e
--- /dev/null
+++ b/src/error/AmqpError.ts
@@ -0,0 +1,27 @@
+/**
+ * Amqp error
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * This still uses ease.js because it does a good job of transparently
+ * creating Error subtypes.
+ */
+
+const { Class } = require( 'easejs' );
+
+export const AmqpError = Class( 'AmqpError' ).extend( Error, {} );
diff --git a/src/error/DaoError.ts b/src/error/DaoError.ts
new file mode 100644
index 0000000..2939b69
--- /dev/null
+++ b/src/error/DaoError.ts
@@ -0,0 +1,27 @@
+/**
+ * Dao error
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * This still uses ease.js because it does a good job of transparently
+ * creating Error subtypes.
+ */
+
+const { Class } = require( 'easejs' );
+
+export const DaoError = Class( 'DaoError' ).extend( Error, {} );
diff --git a/src/quote/BaseQuote.d.ts b/src/quote/BaseQuote.d.ts
index f2271f4..eb2204a 100644
--- a/src/quote/BaseQuote.d.ts
+++ b/src/quote/BaseQuote.d.ts
@@ -24,6 +24,7 @@
import { Program } from "../program/Program";
import { Quote, QuoteId } from "./Quote";
import { QuoteDataBucket } from "../bucket/QuoteDataBucket";
+import { PositiveInteger } from "../numeric";
export declare class BaseQuote implements Quote
@@ -98,5 +99,55 @@ export declare class BaseQuote implements Quote
*
* @return the data bucket
*/
- getBucket(): QuoteDataBucket
+ getBucket(): QuoteDataBucket;
+
+
+ /**
+ * Retrieves the reason for an explicit lock
+ *
+ * @return lock reason
+ */
+ getExplicitLockReason(): string;
+
+
+ /**
+ * Returns the maximum step to which the explicit lock applies
+ *
+ * If no step restriction is set, then 0 will be returned.
+ *
+ * @return {number} locked max step or 0 if not applicable
+ */
+ getExplicitLockStep(): PositiveInteger;
+
+
+ /**
+ * Returns whether the quote has been imported
+ *
+ * @return true if imported, otherwise false
+ */
+ isImported(): boolean;
+
+
+ /**
+ * Returns whether the quote has been bound
+ *
+ * @return true if bound, otherwise false
+ */
+ isBound(): boolean;
+
+
+ /**
+ * Returns the id of the highest step the quote has reached
+ *
+ * @return top visited step id
+ */
+ getTopVisitedStepId(): PositiveInteger;
+
+
+ /**
+ * Returns the id of the highest step the quote has saved
+ *
+ * @return top saved step id
+ */
+ getTopSavedStepId(): PositiveInteger;
}
diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js
index 06b7a5e..116ed39 100644
--- a/src/server/daemon/controller.js
+++ b/src/server/daemon/controller.js
@@ -69,7 +69,7 @@ const {
DocumentServer,
db: {
- MongoServerDao,
+ MongoServerDao: { MongoServerDao },
},
lock: {
@@ -126,8 +126,8 @@ exports.post_rate_publish = {};
exports.init = function( logger, enc_service, conf )
{
- var db = _createDB( logger );
- const dao = MongoServerDao( db );
+ var db = _createDB( logger );
+ const dao = new MongoServerDao( db );
db.collection( 'quotes', function( err, collection )
{
diff --git a/src/server/db/MongoServerDao.js b/src/server/db/MongoServerDao.ts
index db6140e..3f8128c 100644
--- a/src/server/db/MongoServerDao.js
+++ b/src/server/db/MongoServerDao.ts
@@ -19,83 +19,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-var Class = require( 'easejs' ).Class,
- EventEmitter = require( '../../events' ).EventEmitter,
- ServerDao = require( './ServerDao' ).ServerDao;
+import { ServerDao, Callback } from "./ServerDao";
+import { MongoCollection, MongoUpdate, MongoDb } from "mongodb";
+import { PositiveInteger } from "../../numeric";
+import { ServerSideQuote } from "../quote/ServerSideQuote";
+import { QuoteId } from "../../document/Document";
+import { WorksheetData } from "../rater/Rater";
+const EventEmitter = require( 'events' ).EventEmitter;
+
+type ErrorCallback = ( err: NullableError ) => void;
/**
* Uses MongoDB as a data store
*/
-module.exports = Class( 'MongoServerDao' )
- .implement( ServerDao )
- .extend( EventEmitter,
+export class MongoServerDao extends EventEmitter implements ServerDao
{
- /**
- * Collection used to store quotes
- * @type String
- */
- 'const COLLECTION': 'quotes',
+ /** Collection used to store quotes */
+ readonly COLLECTION: string = 'quotes';
- /**
- * Sequence (auto-increment) collection
- * @type {string}
- */
- 'const COLLECTION_SEQ': 'seq',
+ /** Sequence (auto-increment) collection */
+ readonly COLLECTION_SEQ: string = 'seq';
- /**
- * Sequence key for quote ids
- *
- * @type {string}
- * @const
- */
- 'const SEQ_QUOTE_ID': 'quoteId',
+ /** Sequence key for quote ids */
+ readonly SEQ_QUOTE_ID: string = 'quoteId';
- /**
- * Sequence quoteId default
- *
- * @type {number}
- * @const
- */
- 'const SEQ_QUOTE_ID_DEFAULT': 200000,
+ /** Sequence quoteId default */
+ readonly SEQ_QUOTE_ID_DEFAULT: number = 200000;
- /**
- * Database instance
- * @type Mongo.Db
- */
- 'private _db': null,
+ /** Whether the DAO is initialized and ready to be used */
+ private _ready: boolean = false;
- /**
- * Whether the DAO is initialized and ready to be used
- * @type Boolean
- */
- 'private _ready': false,
+ /** Collection to save data to */
+ private _collection?: MongoCollection | null;
- /**
- * Collection to save data to
- * @type null|Collection
- */
- 'private _collection': null,
-
- /**
- * Collection to read sequences (auto-increments) from
- * @type {null|Collection}
- */
- 'private _seqCollection': null,
+ /** Collection to read sequences (auto-increments) from */
+ private _seqCollection?: MongoCollection | null;
/**
* Initializes DAO
*
* @param {Mongo.Db} db mongo database connection
- *
- * @return undefined
*/
- 'public __construct': function( db )
+ constructor(
+ private readonly _db: MongoDb
+ )
{
- this._db = db;
- },
+ super();
+ }
/**
@@ -108,12 +81,12 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self to allow for method chaining
*/
- 'public init': function( callback )
+ init( callback: () => void ): this
{
var dao = this;
// map db error event (on connection error) to our connectError event
- this._db.on( 'error', function( err )
+ this._db.on( 'error', function( err: Error )
{
dao._ready = false;
dao._collection = null;
@@ -123,7 +96,7 @@ module.exports = Class( 'MongoServerDao' )
this.connect( callback );
return this;
- },
+ }
/**
@@ -136,12 +109,12 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self to allow for method chaining
*/
- 'public connect': function( callback )
+ connect( callback: () => void ): this
{
var dao = this;
// attempt to connect to the database
- this._db.open( function( err, db )
+ this._db.open( function( err: any, db: any )
{
// if there was an error, don't bother with anything else
if ( err )
@@ -176,84 +149,97 @@ module.exports = Class( 'MongoServerDao' )
}
// quotes collection
- db.collection( dao.__self.$('COLLECTION'), function( err, collection )
- {
- // for some reason this gets called more than once
- if ( collection == null )
- {
- return;
- }
-
- // initialize indexes
- collection.createIndex(
- [ ['id', 1] ],
- true,
- function( err, index )
+ db.collection(
+ dao.COLLECTION,
+ function(
+ _err: any,
+ collection: MongoCollection,
+ ) {
+ // for some reason this gets called more than once
+ if ( collection == null )
{
- // mark the DAO as ready to be used
- dao._collection = collection;
- check_ready();
+ return;
}
- );
- });
-
- // seq collection
- db.collection( dao.__self.$('COLLECTION_SEQ'), function( err, collection )
- {
- if ( err )
- {
- dao.emit( 'seqError', err );
- return;
- }
- if ( collection == null )
- {
- return;
+ // initialize indexes
+ collection.createIndex(
+ [ ['id', 1] ],
+ true,
+ function(
+ _err: NullableError,
+ _index: { [P: string]: any,
+ } )
+ {
+ // mark the DAO as ready to be used
+ dao._collection = collection;
+ check_ready();
+ }
+ );
}
+ );
- dao._seqCollection = collection;
+ // seq collection
+ db.collection(
+ dao.COLLECTION_SEQ,
+ function(
+ err: Error,
+ collection: MongoCollection,
+ ) {
+ if ( err )
+ {
+ dao.emit( 'seqError', err );
+ return;
+ }
- // has the sequence we'll be referencing been initialized?
- collection.find(
- { _id: dao.__self.$('SEQ_QUOTE_ID') },
- { limit: 1 },
- function( err, cursor )
+ if ( collection == null )
{
- if ( err )
- {
- dao.initQuoteIdSeq( check_ready )
- return;
- }
+ return;
+ }
+
+ dao._seqCollection = collection;
- cursor.toArray( function( err, data )
+ // has the sequence we'll be referencing been initialized?
+ collection.find(
+ { _id: dao.SEQ_QUOTE_ID },
+ { limit: <PositiveInteger>1 },
+ function( err: NullableError, cursor )
{
- if ( data.length == 0 )
+ if ( err )
{
- dao.initQuoteIdSeq( check_ready );
+ dao._initQuoteIdSeq( check_ready )
return;
}
- check_ready();
- });
- }
- );
- });
+ cursor.toArray( function( _err: Error, data: any[] )
+ {
+ if ( data.length == 0 )
+ {
+ dao._initQuoteIdSeq( check_ready );
+ return;
+ }
+
+ check_ready();
+ });
+ }
+ );
+ }
+ );
});
return this;
- },
+ }
- 'public initQuoteIdSeq': function( callback )
+ private _initQuoteIdSeq( callback: () => void )
{
var dao = this;
- this._seqCollection.insert(
+ this._seqCollection!.insert(
{
- _id: this.__self.$('SEQ_QUOTE_ID'),
- val: this.__self.$('SEQ_QUOTE_ID_DEFAULT'),
+ _id: this.SEQ_QUOTE_ID,
+ val: this.SEQ_QUOTE_ID_DEFAULT,
},
- function( err, docs )
+ function( err: NullableError, _docs: any )
{
if ( err )
{
@@ -261,11 +247,11 @@ module.exports = Class( 'MongoServerDao' )
return;
}
- dao.emit( 'seqInit', this.__self.$('SEQ_QUOTE_ID') );
- callback.call( this );
+ dao.emit( 'seqInit', dao.SEQ_QUOTE_ID );
+ callback.call( dao );
}
);
- },
+ }
/**
@@ -281,15 +267,17 @@ module.exports = Class( 'MongoServerDao' )
* @param Function failure_callback function to call if save fails
* @param Object save_data quote data to save (optional)
* @param Object push_data quote data to push (optional)
- *
- * @return MongoServerDao self to allow for method chaining
*/
- 'public saveQuote': function(
- quote, success_callback, failure_callback, save_data, push_data
- )
+ saveQuote(
+ quote: ServerSideQuote,
+ success_callback: Callback,
+ failure_callback: Callback,
+ save_data?: any,
+ push_data?: any,
+ ): this
{
- var dao = this;
- var meta = {};
+ var dao = this;
+ var meta: Record<string, any> = {};
// if we're not ready, then we can't save the quote!
if ( this._ready === false )
@@ -301,7 +289,7 @@ module.exports = Class( 'MongoServerDao' )
);
failure_callback.call( this, quote );
- return;
+ return dao;
}
if ( save_data === undefined )
@@ -321,6 +309,7 @@ module.exports = Class( 'MongoServerDao' )
save_data.id = id;
save_data.pver = quote.getProgramVersion();
save_data.importDirty = 1;
+ save_data.published = false;
save_data.lastPremDate = quote.getLastPremiumDate();
save_data.initialRatedDate = quote.getRatedDate();
save_data.explicitLock = quote.getExplicitLockReason();
@@ -349,14 +338,14 @@ module.exports = Class( 'MongoServerDao' )
// update the quote data if it already exists (same id), otherwise
// insert it
- this._collection.update( { id: id },
+ this._collection!.update( { id: id },
document,
// create record if it does not yet exist
{ upsert: true },
// on complete
- function( err, docs )
+ function( err, _docs )
{
// if an error occurred, then we cannot continue
if ( err )
@@ -381,7 +370,7 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
- },
+ }
/**
@@ -391,21 +380,24 @@ module.exports = Class( 'MongoServerDao' )
* @param {Object} data quote data
* @param {Function} scallback successful callback
* @param {Function} fcallback failure callback
- *
- * @return {MongoServerDao} self
*/
- 'public mergeData': function( quote, data, scallback, fcallback )
+ mergeData(
+ quote: ServerSideQuote,
+ data: MongoUpdate,
+ scallback: Callback,
+ fcallback: Callback,
+ ): this
{
// we do not want to alter the original data; use it as a prototype
var update = data;
// save the stack so we can track this call via the oplog
var _self = this;
- this._collection.update( { id: quote.getId() },
+ this._collection!.update( { id: quote.getId() },
{ '$set': update },
{},
- function( err, docs )
+ function( err, _docs )
{
if ( err )
{
@@ -427,7 +419,7 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
- },
+ }
/**
@@ -441,9 +433,14 @@ module.exports = Class( 'MongoServerDao' )
*
* @return {MongoServerDao} self
*/
- 'public mergeBucket': function( quote, data, scallback, fcallback )
+ mergeBucket(
+ quote: ServerSideQuote,
+ data: MongoUpdate,
+ success: Callback,
+ failure: Callback,
+ ): this
{
- var update = {};
+ var update: MongoUpdate = {};
for ( var field in data )
{
@@ -455,8 +452,8 @@ module.exports = Class( 'MongoServerDao' )
update[ 'data.' + field ] = data[ field ];
}
- return this.mergeData( quote, update, scallback, fcallback );
- },
+ return this.mergeData( quote, update, success, failure );
+ }
/**
@@ -471,8 +468,10 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self
*/
- 'public saveQuoteState': function(
- quote, success_callback, failure_callback
+ saveQuoteState(
+ quote: ServerSideQuote,
+ success_callback: Callback,
+ failure_callback: Callback,
)
{
var update = {
@@ -484,10 +483,15 @@ module.exports = Class( 'MongoServerDao' )
return this.mergeData(
quote, update, success_callback, failure_callback
);
- },
+ }
- 'public saveQuoteClasses': function( quote, classes, success, failure )
+ saveQuoteClasses(
+ quote: ServerSideQuote,
+ classes: any,
+ success: Callback,
+ failure: Callback,
+ )
{
return this.mergeData(
quote,
@@ -495,7 +499,7 @@ module.exports = Class( 'MongoServerDao' )
success,
failure
);
- },
+ }
/**
@@ -511,9 +515,14 @@ module.exports = Class( 'MongoServerDao' )
*
* @return {undefined}
*/
- 'public saveQuoteMeta'( quote, new_meta, success, failure )
+ saveQuoteMeta(
+ quote: ServerSideQuote,
+ new_meta: any,
+ success: Callback,
+ failure: Callback,
+ ): void
{
- const update = {};
+ const update: MongoUpdate = {};
for ( var key in new_meta )
{
@@ -521,13 +530,12 @@ module.exports = Class( 'MongoServerDao' )
for ( var i in meta )
{
- update[ 'meta.' + key + '.' + i ] =
- new_meta[ key ][ i ];
+ update[ 'meta.' + key + '.' + i ] = new_meta[ key ][ i ];
}
}
this.mergeData( quote, update, success, failure );
- },
+ }
/**
@@ -539,13 +547,20 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self
*/
- 'public saveQuoteLockState': function(
- quote, success_callback, failure_callback
- )
+ saveQuoteLockState(
+ quote: ServerSideQuote,
+ success_callback: Callback,
+ failure_callback: Callback,
+ ): this
{
// lock state is saved by default
- return this.saveQuote( quote, success_callback, failure_callback, {} );
- },
+ return this.saveQuote(
+ quote,
+ success_callback,
+ failure_callback,
+ {}
+ );
+ }
/**
@@ -556,16 +571,19 @@ module.exports = Class( 'MongoServerDao' )
*
* @return MongoServerDao self to allow for method chaining
*/
- 'public pullQuote': function( quote_id, callback )
+ pullQuote(
+ quote_id: PositiveInteger,
+ callback: ( data: Record<string, any> | null ) => void
+ ): this
{
var dao = this;
// XXX: TODO: Do not read whole of record into memory; filter out
// revisions!
- this._collection.find( { id: quote_id }, { limit: 1 },
- function( err, cursor )
+ this._collection!.find( { id: quote_id }, { limit: <PositiveInteger>1 },
+ function( _err, cursor )
{
- cursor.toArray( function( err, data )
+ cursor.toArray( function( _err: NullableError, data: any[] )
{
// was the quote found?
if ( data.length == 0 )
@@ -581,27 +599,28 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
- },
+ }
- 'public getMinQuoteId': function( callback )
+ getMinQuoteId( callback: ( min_id: number ) => void ): this
{
// just in case it's asynchronous later on
- callback.call( this, this.__self.$('SEQ_QUOTE_ID_DEFAULT') );
+ callback.call( this, this.SEQ_QUOTE_ID_DEFAULT );
+
return this;
- },
+ }
- 'public getMaxQuoteId': function( callback )
+ getMaxQuoteId( callback: ( min_id: number ) => void ): void
{
var dao = this;
- this._seqCollection.find(
- { _id: this.__self.$('SEQ_QUOTE_ID') },
- { limit: 1 },
- function( err, cursor )
+ this._seqCollection!.find(
+ { _id: this.SEQ_QUOTE_ID },
+ { limit: <PositiveInteger>1 },
+ function( _err, cursor )
{
- cursor.toArray( function( err, data )
+ cursor.toArray( function( _err: NullableError, data: any[] )
{
if ( data.length == 0 )
{
@@ -614,15 +633,15 @@ module.exports = Class( 'MongoServerDao' )
});
}
);
- },
+ }
- 'public getNextQuoteId': function( callback )
+ getNextQuoteId( callback: ( quote_id: number ) => void ): this
{
var dao = this;
- this._seqCollection.findAndModify(
- { _id: this.__self.$('SEQ_QUOTE_ID') },
+ this._seqCollection!.findAndModify(
+ { _id: this.SEQ_QUOTE_ID },
[ [ 'val', 'descending' ] ],
{ $inc: { val: 1 } },
{ 'new': true },
@@ -643,7 +662,7 @@ module.exports = Class( 'MongoServerDao' )
);
return this;
- },
+ }
/**
@@ -654,13 +673,16 @@ module.exports = Class( 'MongoServerDao' )
* model of storing the deltas in previous revisions and the whole of the
* bucket in the most recently created revision).
*/
- 'public createRevision': function( quote, callback )
+ createRevision(
+ quote: ServerSideQuote,
+ callback: ErrorCallback,
+ ): void
{
var _self = this,
qid = quote.getId(),
data = quote.getBucket().getData();
- this._collection.update( { id: qid },
+ this._collection!.update( { id: qid },
{ '$push': { revisions: { data: data } } },
// create record if it does not yet exist
@@ -678,20 +700,24 @@ module.exports = Class( 'MongoServerDao' )
return;
}
);
- },
+ }
- 'public getRevision': function( quote, revid, callback )
+ getRevision(
+ quote: ServerSideQuote,
+ revid: PositiveInteger,
+ callback: ErrorCallback,
+ ): void
{
- revid = +revid;
+ revid = <PositiveInteger>+revid;
// XXX: TODO: Filter out all but the revision we want
- this._collection.find(
+ this._collection!.find(
{ id: quote.getId() },
- { limit: 1 },
- function( err, cursor )
+ { limit: <PositiveInteger>1 },
+ function( _err, cursor )
{
- cursor.toArray( function( err, data )
+ cursor.toArray( function( _err: NullableError, data: any[] )
{
// was the quote found?
if ( ( data.length === 0 )
@@ -707,12 +733,16 @@ module.exports = Class( 'MongoServerDao' )
});
}
);
- },
+ }
- 'public setWorksheets': function( qid, data, callback )
+ setWorksheets(
+ qid: QuoteId,
+ data: MongoUpdate,
+ callback: NodeCallback<void>,
+ ): void
{
- this._collection.update( { id: qid },
+ this._collection!.update( { id: qid },
{ '$set': { worksheets: { data: data } } },
// create record if it does not yet exist
@@ -725,17 +755,22 @@ module.exports = Class( 'MongoServerDao' )
return;
}
);
- },
+ }
- 'public getWorksheet': function( qid, supplier, index, callback )
+ getWorksheet(
+ qid: QuoteId,
+ supplier: string,
+ index: PositiveInteger,
+ callback: ( data: WorksheetData | null ) => void,
+ ): void
{
- this._collection.find(
+ this._collection!.find(
{ id: qid },
- { limit: 1 },
- function( err, cursor )
+ { limit: <PositiveInteger>1 },
+ function( _err, cursor )
{
- cursor.toArray( function( err, data )
+ cursor.toArray( function( _err: NullableError, data: any[] )
{
// was the quote found?
if ( ( data.length === 0 )
@@ -750,74 +785,8 @@ module.exports = Class( 'MongoServerDao' )
// return the quote data
callback( data[ 0 ].worksheets.data[ supplier ][ index ] );
- });
- }
- );
- },
-
-
- /**
- * Set arbitrary data on a document
- *
- * @param {number} qid quote/document id
- * @param {string} key field key
- * @param {*} value field value
- * @param {function(?Error)} callback completion callback
- *
- * @return {undefined}
- */
- 'public setDocumentField'( qid, key, value, callback )
- {
- this._collection.update(
- { id: qid },
- { '$set': { [key]: value } },
-
- // create record if it does not yet exist
- { upsert: true },
-
- // on complete
- function( err )
- {
- callback && callback( err );
- return;
- }
- );
- },
-
-
- /**
- * Retrieve arbitrary data on a document
- *
- * @param {number} qid quote/document id
- * @param {string} key field key
- * @param {function(?Error)} callback completion callback
- *
- * @return {undefined}
- */
- 'public getDocumentField'( qid, key, callback )
- {
- this._collection.find(
- { id: qid },
- { limit: 1 },
- function( err, cursor )
- {
- if ( err !== null )
- {
- callback( err, null );
- return;
- }
-
- cursor.toArray( function( err, data )
- {
- if ( err !== null )
- {
- callback( err, null );
- return;
- }
-
- callback( null, ( data[ 0 ] || {} )[ key ] );
} );
}
);
- },
-} );
+ }
+};
diff --git a/src/server/db/ServerDao.d.ts b/src/server/db/ServerDao.d.ts
index 6cc8025..59228c3 100644
--- a/src/server/db/ServerDao.d.ts
+++ b/src/server/db/ServerDao.d.ts
@@ -131,7 +131,7 @@ export interface ServerDao
qid: QuoteId,
data: WorksheetData,
callback: NodeCallback<void>,
- ): this;
+ ): void;
/**
@@ -147,5 +147,5 @@ export interface ServerDao
supplier: string,
index: PositiveInteger,
callback: ( data: WorksheetData | null ) => void,
- ): this;
+ ): void;
}
diff --git a/src/server/log/Log.js b/src/server/log/Log.js
index 85a7f57..ffede67 100644
--- a/src/server/log/Log.js
+++ b/src/server/log/Log.js
@@ -116,7 +116,7 @@ module.exports = Class( 'Log',
if ( this._fd !== null )
{
var buffer = new Buffer( sprintf.apply( this, args ) + "\n" );
- fs.write( this._fd, buffer, 0, buffer.length, null );
+ fs.writeSync( this._fd, buffer, 0, buffer.length, null );
}
return this;
diff --git a/src/server/quote/ServerSideQuote.d.ts b/src/server/quote/ServerSideQuote.d.ts
index c8bdf2f..1e00d3f 100644
--- a/src/server/quote/ServerSideQuote.d.ts
+++ b/src/server/quote/ServerSideQuote.d.ts
@@ -68,4 +68,20 @@ export declare class ServerSideQuote extends BaseQuote
* @return rating data
*/
getRatingData(): Record<string, any>;
+
+
+ /**
+ * Metadata bucket
+ *
+ * @return the metadata bucket
+ */
+ getMetabucket(): QuoteDataBucket;
+
+
+ /**
+ * Get the program version
+ *
+ * @return program version
+ */
+ getProgramVersion(): string;
}
diff --git a/src/server/rater/DslRaterContext.js b/src/server/rater/DslRaterContext.js
index e5113b8..79dfaef 100644
--- a/src/server/rater/DslRaterContext.js
+++ b/src/server/rater/DslRaterContext.js
@@ -28,6 +28,12 @@ module.exports = Class( 'DslRaterContext' )
.extend( EventEmitter,
{
/**
+ * TODO: Remove workaround for bug extending class across
+ * multiple easejs instances
+ */
+ 'public _events': {},
+
+ /**
* Hash of classes that will result in a global submit
* @type {Object}
*/
diff --git a/src/server/request/UserSession.d.ts b/src/server/request/UserSession.d.ts
index 01937d7..f7d08ae 100644
--- a/src/server/request/UserSession.d.ts
+++ b/src/server/request/UserSession.d.ts
@@ -20,6 +20,9 @@
*/
+import { PositiveInteger } from "../../numeric";
+
+
/**
* Session management
*/
@@ -31,4 +34,28 @@ export declare class UserSession
* @return true if internal user, otherwise false
*/
isInternal(): boolean;
+
+
+ /**
+ * Gets the agent id, if available
+ *
+ * @return agent id or undefined if unavailable
+ */
+ agentId(): PositiveInteger | undefined
+
+
+ /**
+ * Gets the broker entity id, if available
+ *
+ * @return agent entity id or undefined if unavailable
+ */
+ agentEntityId(): PositiveInteger | undefined
+
+
+ /**
+ * Gets the agent name, if available
+ *
+ * @return agent name or undefined if unavailable
+ */
+ agentName(): string | undefined
}
diff --git a/src/server/service/RatingService.ts b/src/server/service/RatingService.ts
index ea4667e..83cbd10 100644
--- a/src/server/service/RatingService.ts
+++ b/src/server/service/RatingService.ts
@@ -433,7 +433,7 @@ export class RatingService
}
}
- this._dao.setWorksheets( qid, worksheets, ( err: Error | null ) =>
+ this._dao.setWorksheets( qid, worksheets, ( err: NullableError ) =>
{
if ( err )
{
diff --git a/src/server/token/MongoTokenDao.ts b/src/server/token/MongoTokenDao.ts
index ee90d7a..5d3dede 100644
--- a/src/server/token/MongoTokenDao.ts
+++ b/src/server/token/MongoTokenDao.ts
@@ -34,6 +34,8 @@ import { DocumentId } from "../../document/Document";
import { TokenId, TokenNamespace, TokenState } from "./Token";
import { UnknownTokenError } from "./UnknownTokenError";
import { context } from "../../error/ContextError";
+import { MongoCollection } from "mongodb";
+
/**
@@ -118,7 +120,7 @@ export class MongoTokenDao implements TokenDao
},
},
- ( err: Error|null, prev_data ) =>
+ ( err: NullableError, prev_data ) =>
{
if ( err )
{
@@ -250,7 +252,7 @@ export class MongoTokenDao implements TokenDao
this._collection.findOne(
{ id: +doc_id },
{ fields: fields },
- ( err: Error|null, data: TokenQueryResult ) =>
+ ( err: NullableError, data: TokenQueryResult ) =>
{
if ( err || !data )
{
diff --git a/src/store/Store.d.ts b/src/store/Store.d.ts
new file mode 100644
index 0000000..0b62417
--- /dev/null
+++ b/src/store/Store.d.ts
@@ -0,0 +1,114 @@
+/**
+ * Generic key/value store
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework
+ *
+ * Liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/** Store key type */
+type K = string;
+
+
+/**
+ * Generic key/value store with bulk clear
+ *
+ * @todo There's a lot of overlap between this concept and that of the
+ * Bucket. Maybe have the Bucket layer atop of simple Store
+ * interface as a step toward a new, simpler Bucket
+ * implementation. This was not implemented atop of the Bucket
+ * interface because its haphazard implementation would
+ * overcomplicate this.
+ */
+export interface Store<T = any>
+{
+ /**
+ * Add item to store under `key` with value `value`
+ *
+ * The promise will be fulfilled with an object containing the
+ * `key` and `value` added to the store; this is convenient for
+ * promises.
+ *
+ * @param key - store key
+ * @param value - value for key
+ *
+ * @return promise to add item to store, resolving to self (for
+ * chaining)
+ */
+ add( key: K, value: T ): Promise<Store>;
+
+
+ /**
+ * Populate store with each element in object `obj`
+ *
+ * This is simply a convenient way to call `#add` for each element in an
+ * object. This does directly call `#add`, so overriding that method
+ * will also affect this one.
+ *
+ * If the intent is to change the behavior of what happens when an item
+ * is added to the store, override the `#add` method instead of this one
+ * so that it affects _all_ adds, not just calls to this method.
+ *
+ * @param obj - object with which to populate store
+ *
+ * @return array of #add promises
+ */
+ populate( obj: Record<K, T> ): Promise<Store>[];
+
+
+ /**
+ * Retrieve item from store under `key`
+ *
+ * The promise will be rejected if the key is unavailable.
+ *
+ * @param key - store key
+ *
+ * @return promise for the key value
+ */
+ get( key: K ): Promise<T>;
+
+
+ /**
+ * Clear all items in store
+ *
+ * @return promise to clear store, resolving to self (for chaining)
+ */
+ clear(): Promise<Store>;
+
+
+ /**
+ * Fold (reduce) all stored values
+ *
+ * This provides a way to iterate through all stored values and
+ * their keys while providing a useful functional result (folding).
+ *
+ * The order of folding is undefined.
+ *
+ * The ternary function `callback` is of the same form as
+ * {@link Array#fold}: the first argument is the value of the
+ * accumulator (initialized to the value of `initial`; the second
+ * is the stored item; and the third is the key of that item.
+ *
+ * @param callback - folding function
+ * @param initial - initial value for accumulator
+ *
+ * @return promise of a folded value (final accumulator value)
+ */
+ reduce(
+ callback: ( accum: T, value: T, key: K ) => T,
+ initial: T,
+ ): Promise<T>;
+}
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
new file mode 100644
index 0000000..f41ee63
--- /dev/null
+++ b/src/system/AmqpPublisher.ts
@@ -0,0 +1,111 @@
+/**
+ * Amqp Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish Amqp message to a queue
+ */
+
+import { DeltaResult } from '../bucket/delta';
+import { DocumentMeta } from '../document/Document';
+import { Options } from 'amqplib';
+
+
+/**
+ * Create an amqp configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the amqp configuration
+ */
+export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
+{
+ return <AmqpConfig>{
+ protocol: 'amqp',
+ hostname: env.AMQP_HOST,
+ port: +( env.AMQP_PORT || 0 ),
+ username: env.AMQP_USER,
+ password: env.AMQP_PASS,
+ locale: 'en_US',
+ frameMax: +( env.AMQP_FRAMEMAX || 0 ),
+ heartbeat: +( env.AMQP_HEARTBEAT || 0 ),
+ vhost: env.AMQP_VHOST,
+ exchange: env.AMQP_EXCHANGE,
+ retries: env.AMQP_RETRIES || 30,
+ retry_wait: env.AMQP_RETRY_WAIT || 1000,
+ };
+}
+
+
+export interface AmqpConfig extends Options.Connect
+{
+ /** The protocol to connect with (should always be 'amqp') */
+ protocol: string;
+
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** A username if one if required */
+ username?: string;
+
+ /** A password if one if required */
+ password?: string;
+
+ /** Locale (should always be 'en_US') */
+ locale: string;
+
+ /** The size in bytes of the maximum frame allowed */
+ frameMax: number;
+
+ /** How often to check for a live connection */
+ heartbeat: number;
+
+ /** The virtual host we are on (e.g. live, demo, test) */
+ vhost?: string;
+
+ /** The name of a queue or exchange to publish to */
+ exchange: string;
+
+ /** The number of times to retry connecting */
+ retries: number;
+
+ /** The time to wait in between retries */
+ retry_wait: number;
+}
+
+
+export interface AmqpPublisher
+{
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param meta - document meta data
+ * @param delta - delta
+ * @param bucket - bucket
+ * @param ratedata - rate data bucket
+ */
+ publish(
+ meta: DocumentMeta,
+ delta: DeltaResult<any>,
+ bucket: Record<string, any>,
+ ratedata?: Record<string, any>,
+ ): Promise<void>
+}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
new file mode 100644
index 0000000..4eba003
--- /dev/null
+++ b/src/system/DeltaProcessor.ts
@@ -0,0 +1,300 @@
+/**
+ * Delta Processor
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { DeltaDao } from '../system/db/DeltaDao';
+import { DocumentMeta } from '../document/Document';
+import { AmqpPublisher } from './AmqpPublisher';
+import { EventEmitter } from 'events';
+import {
+ DeltaType,
+ applyDelta,
+ DeltaDocument,
+ Delta,
+ ReverseDelta,
+} from '../bucket/delta';
+
+/** Deltas and state of data prior to their application */
+type DeltaState = [
+ Delta<any>,
+ Record<string, any>,
+ Record<string, any>,
+];
+
+
+/**
+ * Process deltas for a quote and publish to a queue
+ *
+ * TODO: Decouple from applyDelta
+ */
+export class DeltaProcessor
+{
+ /** The ratedata delta type */
+ readonly DELTA_RATEDATA: DeltaType = 'ratedata';
+
+ /** The data delta type */
+ readonly DELTA_DATA: DeltaType = 'data';
+
+
+ /**
+ * Initialize processor
+ *
+ * @param _dao - Delta dao
+ * @param _publisher - Amqp Publisher
+ * @param _emitter - Event emiter instance
+ */
+ constructor(
+ private readonly _dao: DeltaDao,
+ private readonly _publisher: AmqpPublisher,
+ private readonly _emitter: EventEmitter,
+ ) {}
+
+
+ /**
+ * Process unpublished deltas
+ */
+ process(): Promise<void>
+ {
+ return this._dao.getUnprocessedDocuments()
+ .then( docs => this._processNext( docs ) );
+ }
+
+
+ /**
+ * Process the next document
+ *
+ * @param docs - list of documents to process
+ */
+ private _processNext( docs: DeltaDocument[] ): Promise<void>
+ {
+ const doc = docs.shift();
+
+ if ( !doc )
+ {
+ return Promise.resolve();
+ }
+
+ return this._processDocument( doc )
+ .then( _ => this._processNext( docs ) )
+ }
+
+
+ /**
+ * Process an individual document
+ *
+ * @param doc - individual document to process
+ */
+ private _processDocument( doc: DeltaDocument ): Promise<void>
+ {
+ const deltas = this._getTimestampSortedDeltas( doc );
+ const bucket = doc.data;
+ const ratedata = doc.ratedata || {};
+ const meta = {
+ id: doc.id,
+ entity_name: doc.agentName,
+ entity_id: +doc.agentEntityId,
+ startDate: doc.startDate,
+ lastUpdate: doc.lastUpdate,
+ };
+
+ const history = this._applyDeltas( deltas, bucket, ratedata );
+
+ return this._processNextDelta( meta, history )
+ .then( _ =>
+ this._dao.markDocumentAsProcessed( meta.id, meta.lastUpdate )
+ )
+ .then( _ =>
+ {
+ this._emitter.emit( 'document-processed', { doc_id: meta.id } );
+ } )
+ .catch( ( e: Error ) =>
+ {
+ this._emitter.emit( 'error', e );
+ return this._dao.setErrorFlag( meta.id );
+ } );
+ }
+
+
+ /**
+ * Produce states of buckets at each point in history
+ *
+ * For bucket data, each tuple will contain the state of the bucket
+ * prior to the corresponding delta having been applied. For rate data,
+ * the tuple will also contain the state of the bucket at the point of
+ * rating.
+ *
+ * @param deltas - deltas to apply
+ * @param bucket - current state of bucket prior to deltas
+ * @param ratedata - current state of ratedata prior to deltas
+ *
+ * @return deltas paired with state prior to its application
+ */
+ private _applyDeltas(
+ deltas: Delta<any>[],
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): DeltaState[]
+ {
+ const pairs: DeltaState[] = [];
+
+ let bucket_state = bucket;
+ let ratedata_state = ratedata;
+ let i = deltas.length;
+
+ while ( i-- )
+ {
+ let delta = deltas[ i ];
+
+ pairs[ i ] = [
+ delta,
+ bucket_state,
+ ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {},
+ ];
+
+ // Don't apply the final delta, since we won't use it
+ if ( i === 0 )
+ {
+ break;
+ }
+
+ if ( delta.type === this.DELTA_DATA )
+ {
+ bucket_state = applyDelta(
+ Object.create( bucket_state ),
+ deltas[ i ].data,
+ );
+ }
+ else
+ {
+ ratedata_state = applyDelta(
+ Object.create( ratedata_state ),
+ deltas[ i ].data,
+ );
+ }
+ }
+
+ return pairs;
+ }
+
+
+ /**
+ * Process the next delta from the history
+ *
+ * @param meta - document meta data
+ * @param history - a history of deltas and their buckets (data, ratedata)
+ */
+ private _processNextDelta(
+ meta: DocumentMeta,
+ history: DeltaState[],
+ ): Promise<void>
+ {
+ if ( history.length === 0 )
+ {
+ return Promise.resolve();
+ }
+
+ const [ delta, bucket, ratedata ] = history[ 0 ];
+
+ const delta_uid = meta.id + '_' + delta.timestamp + '_' + delta.type;
+
+ this._emitter.emit( 'delta-process-start', delta_uid );
+
+ return this._publisher.publish( meta, delta, bucket, ratedata )
+ .then( _ => this._dao.advanceDeltaIndex( meta.id, delta.type ) )
+ .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
+ .then( _ => this._processNextDelta( meta, history.slice( 1 ) ) );
+ }
+
+
+
+ /**
+ * Get sorted list of deltas
+ *
+ * @param doc - the document
+ *
+ * @return a list of deltas sorted by timestamp
+ */
+ private _getTimestampSortedDeltas( doc: DeltaDocument ): Delta<any>[]
+ {
+ const data_deltas = this._getDeltas( doc, this.DELTA_RATEDATA );
+ const ratedata_deltas = this._getDeltas( doc, this.DELTA_DATA );
+ const deltas = data_deltas.concat( ratedata_deltas );
+
+ deltas.sort( this._sortByTimestamp );
+
+ return deltas;
+ }
+
+
+ /**
+ * Get trimmed delta list
+ *
+ * @param doc - the document
+ * @param type - the delta type to get
+ *
+ * @return a trimmed list of deltas
+ */
+ private _getDeltas( doc: DeltaDocument, type: DeltaType ): Delta<any>[]
+ {
+ const deltas_obj = doc.rdelta || <ReverseDelta<any>>{};
+ const deltas: Delta<any>[] = deltas_obj[ type ] || [];
+
+ // Get type specific delta index
+ let published_count = 0;
+ if ( doc.totalPublishDelta )
+ {
+ published_count = doc.totalPublishDelta[ type ] || 0;
+ }
+
+ // Only return the unprocessed deltas
+ const deltas_trimmed = deltas.slice( published_count );
+
+ // Mark each delta with its type
+ deltas_trimmed.forEach( delta =>
+ {
+ delta.type = type;
+ } );
+
+ return deltas_trimmed;
+ }
+
+
+ /**
+ * Sort an array of deltas by timestamp
+ *
+ * @param a - The first delta to compare
+ * @param b - The second delta to compare
+ *
+ * @return a sort value
+ */
+ private _sortByTimestamp( a: Delta<any>, b: Delta<any> ): number
+ {
+ if ( a.timestamp < b.timestamp )
+ {
+ return -1;
+ }
+
+ if ( a.timestamp > b.timestamp ) {
+ return 1;
+ }
+
+ return 0;
+ }
+}
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
new file mode 100644
index 0000000..57a5747
--- /dev/null
+++ b/src/system/DeltaPublisher.ts
@@ -0,0 +1,124 @@
+/**
+ * Delta Publisher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Publish delta message to a queue
+ */
+
+import { AmqpPublisher } from './AmqpPublisher';
+import { Delta } from '../bucket/delta';
+import { EventEmitter } from 'events';
+import { DocumentMeta } from '../document/Document';
+import { context } from '../error/ContextError';
+import { AmqpError } from '../error/AmqpError';
+import { MessageWriter } from './MessageWriter';
+
+import { AmqpConnection } from './amqp/AmqpConnection';
+
+
+export class DeltaPublisher implements AmqpPublisher
+{
+ /**
+ * Delta publisher
+ *
+ * @param _emitter - event emitter instance
+ * @param _ts_ctr - a timestamp constructor
+ * @param _conn - the amqp connection
+ * @param _writer - message writer
+ */
+ constructor(
+ private readonly _emitter: EventEmitter,
+ private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _conn: AmqpConnection,
+ private readonly _writer: MessageWriter,
+ ) {}
+
+
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param meta - document meta data
+ * @param delta - delta
+ * @param bucket - bucket
+ * @param ratedata - rate data bucket
+ */
+ publish(
+ meta: DocumentMeta,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): Promise<void>
+ {
+ const ts = this._ts_ctr();
+ const headers = { version: 1, created: ts };
+
+ return this._writer.write(
+ ts,
+ meta,
+ delta,
+ bucket,
+ ratedata
+ ).then( ( avro_buffer: Buffer ) =>
+ {
+ const channel = this._conn.getAmqpChannel();
+
+ if ( !channel )
+ {
+ throw context(
+ new AmqpError( 'Error sending message: No channel' ),
+ {
+ doc_id: meta.id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp,
+ },
+ );
+ }
+
+ // we don't use a routing key; fanout exchange
+ const published_successfully = channel.publish(
+ this._conn.getExchangeName(),
+ '',
+ avro_buffer,
+ { headers: headers },
+ );
+
+ if ( !published_successfully )
+ {
+ throw context(
+ new Error ( 'Delta publish failed' ),
+ {
+ doc_id: meta.id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp,
+ }
+ );
+ }
+ } )
+ .then( ( _: any ) =>
+ {
+ this._emitter.emit(
+ 'delta-publish',
+ {
+ delta: delta,
+ exchange: this._conn.getExchangeName(),
+ }
+ );
+ } );
+ }
+}
diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts
new file mode 100644
index 0000000..b95536c
--- /dev/null
+++ b/src/system/EventMediator.ts
@@ -0,0 +1,95 @@
+/**
+ * Event Meditator
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Hook events and log them
+ */
+
+import { EventEmitter } from 'events';
+import { PsrLogger } from './PsrLogger';
+import { hasContext } from '../error/ContextError';
+
+export class EventMediator
+{
+ /**
+ * Initialize mediator
+ *
+ * @param _log - A PSR-3 style logger
+ * @param _emitter - An event emitter
+ */
+ constructor(
+ private readonly _log: PsrLogger,
+ private readonly _emitter: EventEmitter,
+ ) {
+ this._emitter.on( 'delta-publish', ( msg ) => this._log.notice(
+ 'Published delta to exchange',
+ msg
+ ) );
+
+ this._emitter.on( 'document-processed', ( msg ) => this._log.notice(
+ 'Deltas on document processed successfully. Document has been '
+ + 'marked as completely processed.',
+ msg
+ ) );
+
+ this._emitter.on( 'amqp-conn-warn', ( msg ) =>
+ this._log.warning( 'AMQP Connection Error', msg ) );
+
+ this._emitter.on( 'amqp-reconnect', () =>
+ this._log.warning(
+ '...attempting to re-establish AMQP connection'
+ )
+ );
+
+ this._emitter.on( 'amqp-reconnected', () =>
+ this._log.warning(
+ 'AMQP re-connected'
+ )
+ );
+
+ this._emitter.on( 'error', ( arg ) =>
+ this._handleError( arg ) );
+ }
+
+
+ /**
+ * Handle an error event
+ *
+ * @param e - any
+ */
+ private _handleError( e: any ): void
+ {
+ let msg: string = '';
+ let context: Record<string, any> = {};
+
+ if ( e instanceof( Error ) )
+ {
+ msg = e.message;
+
+ if ( hasContext( e ) )
+ {
+ context = e.context;
+ }
+
+ context.stack = e.stack;
+ }
+
+ this._log.error( msg, context );
+ }
+}
diff --git a/src/system/MessageWriter.ts b/src/system/MessageWriter.ts
new file mode 100644
index 0000000..8ee2b84
--- /dev/null
+++ b/src/system/MessageWriter.ts
@@ -0,0 +1,44 @@
+/**
+ * Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Write a message to be published to a queue
+ */
+import { DocumentMeta } from '../document/Document';
+import { DeltaResult } from '../bucket/delta';
+
+export interface MessageWriter
+{
+ /**
+ * Write the data to a message
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ */
+ write(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: DeltaResult<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): Promise<Buffer>
+} \ No newline at end of file
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
new file mode 100644
index 0000000..9432a9d
--- /dev/null
+++ b/src/system/MetricsCollector.ts
@@ -0,0 +1,205 @@
+/**
+ * Metrics Collector
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Collect Metrics for Prometheus
+ */
+
+import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
+import { EventEmitter } from 'events';
+import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory';
+
+const client = require( 'prom-client' )
+
+
+export type MetricTimer = (
+ _start_time?: [ number, number ]
+) => [ number, number ];
+
+
+export class MetricsCollector
+{
+ /** The prometheus PushGateway */
+ private _gateway: Pushgateway;
+
+ /** Delta processed time histogram */
+ private _process_time: Histogram;
+ private _process_time_name: string = 'liza_delta_process_time';
+ private _process_time_help: string = 'Delta process time in ms';
+
+ /** Delta error counter */
+ private _total_error: Counter;
+ private _total_error_name: string = 'liza_delta_error';
+ private _total_error_help: string = 'Total errors from delta processing';
+
+ /** Delta current error gauge */
+ private _current_error: Gauge;
+ private _current_error_name: string = 'liza_delta_current_error';
+ private _current_error_help: string =
+ 'The current number of documents in an error state';
+
+ /** Delta error counter */
+ private _total_processed: Counter;
+ private _total_processed_name: string = 'liza_delta_success';
+ private _total_processed_help: string =
+ 'Total deltas successfully processed';
+
+ /** Timing map */
+ private _timing_map: Record<string, [ number, number ]> = {};
+
+ private _push_interval: NodeJS.Timer;
+
+
+ /**
+ * Initialize delta logger
+ *
+ * @param _factory - A factory to create prometheus components
+ * @param _conf - Prometheus configuration
+ * @param _emitter - Event emitter
+ * @param _timer - A timer function to create a tuple timestamp
+ */
+ constructor(
+ private readonly _factory: PrometheusFactory,
+ private readonly _conf: PrometheusConfig,
+ private readonly _emitter: EventEmitter,
+ private readonly _timer: MetricTimer,
+ ) {
+ // Set labels
+ client.register.setDefaultLabels( {
+ env: this._conf.env,
+ service: 'delta_processor',
+ } );
+
+ // Create metrics
+ this._gateway = this._factory.createGateway(
+ client,
+ this._conf.hostname,
+ this._conf.port,
+ );
+
+ this._process_time = this._factory.createHistogram(
+ client,
+ this._process_time_name,
+ this._process_time_help,
+ this._conf.buckets_start,
+ this._conf.buckets_width,
+ this._conf.buckets_count,
+ );
+
+ this._total_error = this._factory.createCounter(
+ client,
+ this._total_error_name,
+ this._total_error_help,
+ );
+
+ this._current_error = this._factory.createGauge(
+ client,
+ this._current_error_name,
+ this._current_error_help,
+ );
+
+ this._total_processed = this._factory.createCounter(
+ client,
+ this._total_processed_name,
+ this._total_processed_help,
+ );
+
+ // Push metrics on a specific interval
+ this._push_interval = setInterval( () =>
+ {
+ this._gateway.pushAdd(
+ { jobName: 'liza_delta_metrics' },
+ this.getPushCallback( this )
+ );
+ }, this._conf.push_interval_ms
+ );
+
+ // Subsribe metrics to events
+ this.hookMetrics();
+ }
+
+
+ /**
+ * Stop the push interval
+ */
+ stop(): void
+ {
+ clearInterval( this._push_interval );
+ }
+
+
+ /**
+ * List to events to update metrics
+ */
+ private hookMetrics(): void
+ {
+ this._emitter.on(
+ 'delta-process-start',
+ ( uid: string ) => { this._timing_map[ uid ] = this._timer(); }
+ );
+
+ this._emitter.on(
+ 'delta-process-end',
+ ( uid: string ) =>
+ {
+ const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ];
+ const t = this._timer( start_time_ms );
+ const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000;
+
+ this._process_time.observe( total_time_ms );
+ this._total_processed.inc();
+ }
+ );
+
+ this._emitter.on( 'error', ( _ ) => this._total_error.inc() );
+ }
+
+
+ /**
+ * Handle push error
+ *
+ * @param self - Metrics Collector object
+ *
+ * @return a function to handle the pushAdd callback
+ */
+ private getPushCallback( self: MetricsCollector ): () => void
+ {
+ return (
+ error?: Error | undefined,
+ _response?: any,
+ _body?: any
+ ): void =>
+ {
+ if ( error )
+ {
+ self._emitter.emit( 'error', error );
+ }
+ }
+ }
+
+ /**
+ * Update metrics with current error count
+ *
+ * @param count - the number of errors found
+ */
+ updateErrorCount( count: number ): void
+ {
+ this._current_error.set( +count );
+ }
+}
diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts
new file mode 100644
index 0000000..7330ea9
--- /dev/null
+++ b/src/system/PrometheusFactory.ts
@@ -0,0 +1,171 @@
+/**
+ * Prometheus Factory functions
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Prometheus Metrics
+ */
+import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client';
+
+
+export declare type PrometheusConfig = {
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** The environment ( dev, test, demo, live ) */
+ env: string;
+
+ /** The rate (in milliseconds) at which metrics are pushed */
+ push_interval_ms: number;
+
+ /** The starting point for process time buckets */
+ buckets_start: number;
+
+ /** The width of process time buckets */
+ buckets_width: number;
+
+ /** The number of process time buckets */
+ buckets_count: number;
+}
+
+
+/**
+ * Create a prometheus configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the prometheus configuration
+ */
+export function createPrometheusConfig(
+ env: NodeJS.ProcessEnv
+): PrometheusConfig
+{
+ return <PrometheusConfig>{
+ hostname: env.PROM_HOST,
+ port: +( env.PROM_PORT || 0 ),
+ env: process.env.NODE_ENV,
+ push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ),
+ buckets_start: +( process.env.PROM_BUCKETS_START || 0 ),
+ buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ),
+ buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ),
+ };
+}
+
+
+export class PrometheusFactory
+{
+ /**
+ * Create a PushGateway
+ *
+ * @param client - prometheus client
+ * @param hostname - push gateway url
+ * @param port - push gateway port
+ *
+ * @return the gateway
+ */
+ createGateway(
+ client: any,
+ hostname: string,
+ port: number,
+ ): Pushgateway
+ {
+ const url = 'http://' + hostname + ':' + port;
+
+ return new client.Pushgateway( url );
+ }
+
+
+ /**
+ * Create a histogram metric
+ *
+ * @param client - prometheus client
+ * @param name - metric name
+ * @param help - a description of the metric
+ * @param bucket_start - where to start the range of buckets
+ * @param bucket_width - the size of each bucket
+ * @param bucket_count - the total number of buckets
+ *
+ * @return the metric
+ */
+ createHistogram(
+ client: any,
+ name: string,
+ help: string,
+ bucket_start: number,
+ bucket_width: number,
+ bucket_count: number,
+ ): Histogram
+ {
+ return new client.Histogram( {
+ name: name,
+ help: help,
+ buckets: client.linearBuckets(
+ bucket_start,
+ bucket_width,
+ bucket_count
+ ),
+ } );
+ }
+
+
+ /**
+ * Create a counter metric
+ *
+ * @param client - prometheus client
+ * @param name - metric name
+ * @param help - a description of the metric
+ *
+ * @return the metric
+ */
+ createCounter(
+ client: any,
+ name: string,
+ help: string,
+ ): Counter
+ {
+ return new client.Counter( {
+ name: name,
+ help: help,
+ } );
+ }
+
+
+ /**
+ * Create a gauge metric
+ *
+ * @param client - prometheus client
+ * @param name - metric name
+ * @param help - a description of the metric
+ *
+ * @return the metric
+ */
+ createGauge(
+ client: any,
+ name: string,
+ help: string,
+ ): Gauge
+ {
+ return new client.Gauge( {
+ name: name,
+ help: help,
+ } );
+ }
+} \ No newline at end of file
diff --git a/src/system/PsrLogger.ts b/src/system/PsrLogger.ts
new file mode 100644
index 0000000..276c78e
--- /dev/null
+++ b/src/system/PsrLogger.ts
@@ -0,0 +1,117 @@
+/**
+ * PSR logger
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * PSR-3 style logger
+ */
+
+export enum LogLevel {
+ DEBUG,
+ INFO,
+ NOTICE,
+ WARNING,
+ ERROR,
+ CRITICAL,
+ ALERT,
+ EMERGENCY,
+};
+
+
+export interface PsrLogger
+{
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ debug( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ info( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ notice( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ warning( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ error( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ critical( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ alert( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ emergency( msg: string | object, context?: object ): void
+
+
+ /**
+ * Log a message
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ log( level: LogLevel, msg: string | object, context?: object ): void
+}
diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts
new file mode 100644
index 0000000..cdd062d
--- /dev/null
+++ b/src/system/StandardLogger.ts
@@ -0,0 +1,199 @@
+/**
+ * Stdout logger
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Standard out logger implementing PSR-3 standards
+ */
+import { PsrLogger, LogLevel } from './PsrLogger';
+
+declare type StructuredLog = {
+ message: string;
+ timestamp: UnixTimestamp;
+ service: string;
+ env: string;
+ severity: string;
+ context?: Record<string, any>;
+}
+
+export class StandardLogger implements PsrLogger
+{
+ /**
+ * Initialize logger
+ *
+ * @param _console
+ * @param _ts_ctr - a timestamp constructor
+ * @param _env - The environment ( dev, test, demo, live )
+ */
+ constructor(
+ private readonly _console: Console,
+ private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _env: string,
+ ) {}
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ debug( msg: string | object, context?: object ): void
+ {
+ this._console.info( this._format( LogLevel.DEBUG, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ info( msg: string | object, context?: object ): void
+ {
+ this._console.info( this._format( LogLevel.INFO, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ notice( msg: string | object, context?: object ): void
+ {
+ this._console.log( this._format( LogLevel.NOTICE, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ warning( msg: string | object, context?: object ): void
+ {
+ this._console.warn( this._format( LogLevel.WARNING, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ error( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.ERROR, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ critical( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.CRITICAL, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ alert( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.ALERT, msg, context ) );
+ }
+
+
+ /**
+ * Log at a debug level
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ emergency( msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( LogLevel.EMERGENCY, msg, context ) );
+ }
+
+
+ /**
+ * Log a message
+ *
+ * @param msg - the message to log
+ * @param context - additional message context
+ */
+ log( level: LogLevel, msg: string | object, context?: object ): void
+ {
+ this._console.error( this._format( level, msg, context ) );
+ }
+
+
+ /**
+ * Get structured log object
+ *
+ * @param msg - the string or object to log
+ * @param level - the log level
+ * @param context - additional message context
+ *
+ * @returns a structured logging object
+ */
+ private _format(
+ level: LogLevel,
+ msg: string | object,
+ context: object = {},
+ ): StructuredLog
+ {
+ let str: string;
+
+ if ( msg !== null && typeof( msg ) === 'object' )
+ {
+ str = JSON.stringify( msg );
+ }
+ else
+ {
+ str = msg;
+ }
+
+ const structured_log = <StructuredLog>{
+ message: str,
+ timestamp: this._ts_ctr(),
+ service: 'quote-server',
+ env: this._env,
+ severity: LogLevel[level],
+ };
+
+ if ( Object.keys( context ).length > 0 )
+ {
+ structured_log[ "context" ] = context;
+ }
+
+ return structured_log;
+ }
+}
diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts
new file mode 100644
index 0000000..410f808
--- /dev/null
+++ b/src/system/amqp/AmqpConnection.ts
@@ -0,0 +1,153 @@
+/**
+ * Amqp Connection
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import { AmqpConfig } from '../AmqpPublisher';
+import { EventEmitter } from "events";
+import * as amqplib from "amqplib";
+
+/**
+ * Connection to AMQP exchange
+ */
+export class AmqpConnection
+{
+ /** The amqp connection */
+ private _conn?: amqplib.Connection;
+
+ /** The amqp channel */
+ private _channel?: amqplib.Channel;
+
+
+ /**
+ * Amqp Connection
+ *
+ * @param _conf - amqp library
+ * @param _conf - amqp configuration
+ * @param _emitter - event emitter instance
+ */
+ constructor(
+ private readonly _amqp: typeof amqplib,
+ private readonly _conf: AmqpConfig,
+ private readonly _emitter: EventEmitter,
+ ) {}
+
+
+ /**
+ * Initialize connection
+ */
+ connect(): Promise<void>
+ {
+ return this._amqp.connect( this._conf )
+ .then( conn =>
+ {
+ this._conn = conn;
+
+ /** If there is an error, attempt to reconnect
+ * Only hook this once because it will be re-hooked on each
+ * successive successful connection
+ */
+ this._conn.once( 'error', e =>
+ {
+ this._emitter.emit( 'amqp-conn-warn', e );
+ this._reconnect();
+ } );
+
+ return this._conn.createChannel();
+ } )
+ .then( ( ch: amqplib.Channel ) =>
+ {
+ this._channel = ch;
+
+ return this._channel.assertExchange(
+ this._conf.exchange,
+ 'fanout',
+ { durable: true }
+ );
+ } )
+ .then( _ => {} );
+ }
+
+
+ /**
+ * Attempt to re-establish the connection
+ *
+ * @param retry_count - the number of retries attempted
+ */
+ private _reconnect( retry_count: number = 0 ): void
+ {
+ if ( retry_count >= this._conf.retries )
+ {
+ this._emitter.emit(
+ 'error',
+ new Error( 'Could not re-establish AMQP connection.' )
+ );
+
+ return;
+ }
+
+ this._emitter.emit( 'amqp-reconnect' );
+
+ this.connect()
+ .then( _ => { this._emitter.emit( 'amqp-reconnected' ) } )
+ .catch( _ =>
+ {
+ const wait_ms = this._conf.retry_wait;
+ setTimeout( () => this._reconnect( ++retry_count ), wait_ms );
+ } );
+ }
+
+
+ /**
+ * Returns the exchange to publish to
+ *
+ * @return exchange name
+ */
+ getExchangeName(): string
+ {
+ return this._conf.exchange;
+ }
+
+
+ /**
+ * Returns the amqp channel
+ *
+ * @return exchange name
+ */
+ getAmqpChannel(): amqplib.Channel | undefined
+ {
+ if ( !this._channel )
+ {
+ this._reconnect();
+ }
+
+ return this._channel;
+ }
+
+
+ /**
+ * Close the amqp conenction
+ */
+ close(): void
+ {
+ if ( this._conn )
+ {
+ this._conn.close.bind(this._conn);
+ }
+ }
+}
diff --git a/src/system/avro/AvroFactory.ts b/src/system/avro/AvroFactory.ts
new file mode 100644
index 0000000..32ba1ec
--- /dev/null
+++ b/src/system/avro/AvroFactory.ts
@@ -0,0 +1,32 @@
+/**
+ * Factory functions for avro
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import { Duplex } from 'stream';
+
+import * as avro from "avro-js";
+
+/** The avro encoder constructor type */
+export type AvroEncoderCtr = ( type: avro.AvroSchema ) => Duplex;
+
+/** The avro encoder constructor */
+export function createAvroEncoder( schema: avro.AvroSchema ): Duplex
+{
+ return new avro.streams.BlockEncoder( schema );
+}
diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts
new file mode 100644
index 0000000..09ee64a
--- /dev/null
+++ b/src/system/avro/V1MessageWriter.ts
@@ -0,0 +1,259 @@
+/**
+ * Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Write a message to be published to a queue
+ */
+import { DocumentMeta } from '../../document/Document';
+import { Delta } from '../../bucket/delta';
+import { AvroEncoderCtr } from '../avro/AvroFactory';
+import { AvroSchema } from 'avro-js';
+import { MessageWriter } from '../MessageWriter';
+import { context } from '../../error/ContextError';
+
+
+export class V1MessageWriter implements MessageWriter
+{
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record<string, string> = {
+ data: 'STEP_SAVE',
+ ratedata: 'RATE',
+ };
+
+
+ /**
+ * Delta publisher
+ *
+ * @param _encoder_ctr - a factory function to create an avro encoder
+ * @param _conn - the amqp connection
+ */
+ constructor(
+ private readonly _encoder_ctor: AvroEncoderCtr,
+ private readonly _schema: AvroSchema,
+ ) {}
+
+
+ /**
+ * Write the data to a message
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ */
+ write(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): Promise<Buffer>
+ {
+ const avro_object = this._avroFormat(
+ ts,
+ meta,
+ delta,
+ bucket,
+ ratedata,
+ );
+
+ return this.avroEncode( avro_object );
+ }
+
+
+ /**
+ * Format the avro data with data type labels
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ *
+ * @return the formatted data
+ */
+ private _avroFormat(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): any
+ {
+ const delta_formatted = this.setDataTypes( delta.data );
+ const bucket_formatted = this.setDataTypes( bucket );
+ const ratedata_formatted = this.setDataTypes( ratedata );
+ const event_id = this.DELTA_MAP[ delta.type ];
+
+ return {
+ event: {
+ id: event_id,
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: meta.id,
+ created: meta.startDate,
+ modified: meta.lastUpdate,
+ },
+ session: {
+ Session: {
+ entity_id: meta.entity_id,
+ entity_name: meta.entity_name,
+ },
+ },
+ data: {
+ Data: {
+ bucket: bucket_formatted,
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: ratedata_formatted,
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta_formatted,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: '',
+ },
+ },
+ }
+ }
+
+
+ /**
+ * Encode the data in an avro buffer
+ *
+ * @param data - the data to encode
+ *
+ * @return the avro buffer or null if there is an error
+ */
+ avroEncode( data: Record<string, any> ): Promise<Buffer>
+ {
+ return new Promise<Buffer>( ( resolve, reject ) =>
+ {
+ const bufs: Buffer[] = [];
+
+ try
+ {
+ this._schema.isValid(
+ data,
+ {
+ errorHook: ( keys: any, vals: any) =>
+ {
+ throw context(
+ new Error( 'Invalid Avro Schema' ),
+ {
+ invalid_paths: keys,
+ invalid_data: vals,
+ }
+ );
+ }
+ }
+ );
+
+ const encoder = this._encoder_ctor( this._schema )
+
+ encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
+ encoder.on('error', ( err: Error ) => { reject( err ); } )
+ encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
+ encoder.end( data );
+ }
+ catch ( e )
+ {
+ reject( e );
+ }
+ } );
+ }
+
+
+ /**
+ * Format the data for avro by add type specifications to the data
+ *
+ * @param data - the data to format
+ * @param top_level - whether we are at the top level of the recursion
+ *
+ * @return the formatted data
+ */
+ setDataTypes( data: any, top_level: boolean = true ): any
+ {
+ let data_formatted: any = {};
+
+ switch( typeof( data ) )
+ {
+ case 'object':
+ if ( data == null )
+ {
+ return null;
+ }
+ else if ( Array.isArray( data ) )
+ {
+ let arr: any[] = [];
+
+ data.forEach( ( datum ) =>
+ {
+ arr.push( this.setDataTypes( datum, false ) );
+ } );
+
+ data_formatted = ( top_level )
+ ? arr
+ : { 'array': arr };
+ }
+ else
+ {
+ let datum_formatted: any = {};
+
+ Object.keys( data).forEach( ( key: string ) =>
+ {
+ const datum = this.setDataTypes( data[ key ], false );
+
+ datum_formatted[ key ] = datum;
+
+ } );
+
+ data_formatted = ( top_level )
+ ? datum_formatted
+ : { 'map': datum_formatted };
+ }
+ break;
+
+ case 'boolean':
+ return { 'boolean': data };
+
+ case 'number':
+ return { 'double': data };
+
+ case 'string':
+ return { 'string': data };
+
+ case 'undefined':
+ return null;
+ }
+
+ return data_formatted;
+ }
+} \ No newline at end of file
diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc
new file mode 100644
index 0000000..63bbc7d
--- /dev/null
+++ b/src/system/avro/schema.avsc
@@ -0,0 +1,215 @@
+{
+ "type": "record",
+ "name": "update",
+ "fields": [
+ {
+ "name": "event",
+ "type": {
+ "type": "record",
+ "name": "Event",
+ "fields": [
+ {
+ "name": "id",
+ "type": {
+ "name": "EventId",
+ "type": "enum",
+ "symbols": [
+ "STEP_SAVE",
+ "RATE"
+ ]
+ }
+ },
+ {
+ "name": "ts",
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ },
+ {
+ "name": "actor",
+ "type": {
+ "type": "enum",
+ "name": "EventActor",
+ "symbols": [ "USER", "CLIENT", "SERVER" ]
+ }
+ },
+ {
+ "name": "step",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "EventStep",
+ "fields": [
+ {
+ "name": "transition",
+ "type": {
+ "type": "enum",
+ "name": "EventStepTransition",
+ "symbols": [ "BACK", "FORWARD", "END" ]
+ }
+ },
+ {
+ "name": "src",
+ "type": "string"
+ },
+ {
+ "name": "dest",
+ "type": "string"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "document",
+ "type": {
+ "type": "record",
+ "name": "Document",
+ "doc": "Source document (quote)",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "created",
+ "type": ["null", "long"],
+ "logicalType": "timestamp-millis",
+ "default": null
+ },
+ {
+ "name": "modified",
+ "type": ["null", "long"],
+ "logicalType": "timestamp-millis",
+ "default": null
+ }
+ ]
+ }
+ },
+ {
+ "name": "session",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Session",
+ "fields": [
+ {
+ "name": "entity_name",
+ "type": "string"
+ },
+ {
+ "name": "entity_id",
+ "type": "int"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "data",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Data",
+ "fields": [
+ {
+ "name": "bucket",
+ "type":{
+ "type": "map",
+ "values": [
+ "null",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string"
+ ]
+ }
+ ]
+ },
+ {
+ "type": "map",
+ "values": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "map",
+ "values": [
+ "null",
+ "boolean",
+ "double",
+ "string"
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ratedata",
+ "type": [
+ "null",
+ "Data"
+ ]
+ },
+ {
+ "name": "delta",
+ "type": [
+ "null",
+ "Data"
+ ]
+ },
+ {
+ "name": "program",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Program",
+ "fields": [
+ {
+ "type": "string",
+ "name": "id",
+ "doc": "Program id"
+ },
+ {
+ "type": "string",
+ "name": "version",
+ "doc": "Program version"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
new file mode 100644
index 0000000..881bc79
--- /dev/null
+++ b/src/system/db/DeltaDao.ts
@@ -0,0 +1,87 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * These types are used to describe the structure of the token data as it
+ * is stored in Mongo. It has a number of undesirable properties and
+ * duplicates data---this was intended to make querying easier and work
+ * around Mongo limitations.
+ *
+ * This structure can be changed in the future, but we'll need to maintain
+ * compatibility with the existing data.
+ */
+
+import { DocumentId } from "../../document/Document";
+import { DeltaDocument } from "../../bucket/delta";
+
+
+/** Manage deltas */
+export interface DeltaDao
+{
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(): Promise<DeltaDocument[]>
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ */
+ advanceDeltaIndex(
+ doc_id: DocumentId,
+ type: string,
+ ): Promise<void>
+
+
+ /**
+ * Mark a given document as processed. First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ ): Promise<void>
+
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise<void>
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise<number>
+}
+
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
new file mode 100644
index 0000000..e058980
--- /dev/null
+++ b/src/system/db/MongoDeltaDao.ts
@@ -0,0 +1,278 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Get deltas from the mongo document in order to process and publish them
+ */
+
+import { DocumentId } from '../../document/Document';
+import { DeltaDao } from './DeltaDao';
+import { MongoCollection } from 'mongodb';
+import { context } from '../../error/ContextError';
+import { DaoError } from '../../error/DaoError';
+import { DeltaType, DeltaDocument } from '../../bucket/delta';
+
+/** Manage deltas */
+export class MongoDeltaDao implements DeltaDao
+{
+ /** The ratedata delta type */
+ static readonly DELTA_RATEDATA: string = 'ratedata';
+
+ /** The data delta type */
+ static readonly DELTA_DATA: string = 'data';
+
+ /** The document fields to read */
+ readonly RESULT_FIELDS: Record<string, number> = {
+ id: 1,
+ agentName: 1,
+ agentEntityId: 1,
+ startDate: 1,
+ lastUpdate: 1,
+ data: 1,
+ ratedata: 1,
+ rdelta: 1,
+ totalPublishDelta: 1,
+ };
+
+
+ /**
+ * Initialize connection
+ *
+ * @param _collection - Mongo db collection
+ */
+ constructor(
+ private readonly _collection: MongoCollection,
+ ) {}
+
+
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(): Promise<DeltaDocument[]>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection.find(
+ {
+ published: false,
+ deltaError: { $ne: true },
+ },
+ { fields: this.RESULT_FIELDS },
+ ( e, cursor ) =>
+ {
+ if ( e )
+ {
+ reject(
+ new DaoError(
+ 'Error fetching unprocessed documents: ' + e
+ )
+ );
+ return
+ }
+
+ cursor.toArray( ( e: Error, data: DeltaDocument[] ) =>
+ {
+ if ( e )
+ {
+ reject(
+ new DaoError(
+ 'Error fetching array from cursor: ' + e
+ )
+ );
+ return;
+ }
+
+ resolve( data );
+ } );
+ }
+ )
+ } );
+ }
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ */
+ advanceDeltaIndex( doc_id: DocumentId, type: DeltaType ): Promise<void>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ const inc_data: Record<string, any> = {};
+
+ inc_data[ 'totalPublishDelta.' + type ] = 1;
+
+ this._collection.update(
+ { id: doc_id },
+ { $inc: inc_data },
+ { upsert: false },
+ e =>
+ {
+ if ( e )
+ {
+ reject( context(
+ new DaoError( 'Error advancing delta index: ' + e ),
+ {
+ doc_id: doc_id,
+ type: type,
+ }
+ ) );
+ return;
+ }
+
+ resolve();
+ }
+ );
+ } );
+ }
+
+
+ /**
+ * Mark a given document as processed.
+ *
+ * First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ ): Promise<void>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection.update(
+ { id: doc_id, lastUpdate: { $lte: last_update_ts } },
+ { $set: { published: true } },
+ { upsert: false },
+ e =>
+ {
+ if ( e )
+ {
+ reject( context(
+ new DaoError(
+ 'Error marking document as processed: ' + e
+ ),
+ {
+ doc_id: doc_id,
+ last_update_ts: last_update_ts,
+ }
+ ) );
+ return;
+ }
+
+ resolve();
+ return;
+ }
+ );
+ } );
+ }
+
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise<void>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection.update(
+ { id: doc_id },
+ { $set: { deltaError: true } },
+ { upsert: false },
+ e =>
+ {
+ if ( e )
+ {
+ reject( context(
+ new DaoError(
+ 'Failed setting error flag: ' + e
+ ),
+ {
+ doc_id: doc_id,
+ }
+ ) );
+ return;
+ }
+
+ resolve();
+ return;
+ }
+ );
+ } );
+ }
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise<number>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection.find(
+ { deltaError: true },
+ {},
+ ( e, cursor ) =>
+ {
+ if ( e )
+ {
+ reject(
+ new Error(
+ 'Failed getting error count: ' + e
+ )
+ );
+ return;
+ }
+
+ cursor.toArray( ( e: NullableError, data: any[] ) =>
+ {
+ if ( e )
+ {
+ reject( context(
+ new DaoError(
+ 'Failed getting error count: ' + e
+ ),
+ {
+ cursor: cursor,
+ }
+ ) );
+ return;
+ }
+
+ resolve( data.length );
+ });
+ }
+ )
+ } );
+ }
+}
+
diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts
new file mode 100644
index 0000000..5a3b03e
--- /dev/null
+++ b/src/system/db/MongoFactory.ts
@@ -0,0 +1,176 @@
+/**
+ * Mongo Factory functions
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * These definitions are for a very old mongodb library, which will be
+ * once we get around to updating node. Quite a failure on the maintenance
+ * front.
+ *
+ * instantiate objects for MongoDb
+ */
+import { MongoDb, MongoDbConfig, MongoCollection } from '../../types/mongodb';
+import { DaoError } from '../../error/DaoError';
+
+
+const {
+ Db: MongoDb,
+ Server: MongoServer,
+ ReplServers: ReplSetServers,
+} = require( 'mongodb' );
+
+
+/**
+ * Create a mongodb configuration from the environment
+ *
+ * @param env - the environment variables
+ *
+ * @return the mongo configuration
+ */
+export function createMongoConfig( env: NodeJS.ProcessEnv ): MongoDbConfig
+{
+ return <MongoDbConfig>{
+ 'port': +( env.MONGODB_PORT || 0 ),
+ 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1,
+ 'replset': env.LIZA_MONGODB_REPLSET,
+ 'host': env.MONGODB_HOST,
+ 'host_a': env.LIZA_MONGODB_HOST_A,
+ 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ),
+ 'host_b': env.LIZA_MONGODB_HOST_B,
+ 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ),
+ 'collection': 'quotes',
+ };
+}
+
+
+/**
+ * Create the database connection
+ *
+ * @param conf - the configuration from the environment
+ *
+ * @return the mongodb connection
+ */
+export function createMongoDB( conf: MongoDbConfig ): MongoDb
+{
+ if( conf.ha )
+ {
+ var mongodbPort = conf.port || 27017;
+ var mongodbReplSet = conf.replset || 'rs0';
+ var dbServers = new ReplSetServers(
+ [
+ new MongoServer( conf.host_a, conf.port_a || mongodbPort),
+ new MongoServer( conf.host_b, conf.port_b || mongodbPort),
+ ],
+ {rs_name: mongodbReplSet, auto_reconnect: true}
+ );
+ }
+ else
+ {
+ var dbServers = new MongoServer(
+ conf.host || '127.0.0.1',
+ conf.port || 27017,
+ {auto_reconnect: true}
+ );
+ }
+ var db = new MongoDb(
+ 'program',
+ dbServers,
+ {native_parser: false, safe: false}
+ );
+ return db;
+}
+
+
+/**
+ * Attempts to connect to the database and retrieve the collection
+ *
+ * connectError event will be emitted on failure.
+ *
+ * @param db - the mongo database
+ * @param conf - the mongo configuration
+ *
+ * @return the collection
+ */
+export function getMongoCollection(
+ db: MongoDb,
+ conf: MongoDbConfig
+): Promise<MongoCollection>
+{
+ return new Promise( ( resolve, reject ) =>
+ {
+ // attempt to connect to the database
+ db.open( ( e: any, db: any ) =>
+ {
+ // if there was an error, don't bother with anything else
+ if ( e )
+ {
+ // in some circumstances, it may just be telling us that
+ // we're already connected (even though the connection may
+ // have been broken)
+ if ( e.errno !== undefined )
+ {
+ reject( new Error(
+ 'Error opening mongo connection: ' + e
+ ) );
+ return;
+ }
+ } else if ( db == null )
+ {
+ reject( new DaoError( 'No database connection' ) );
+ return;
+ }
+
+ // quotes collection
+ db.collection(
+ conf.collection,
+ ( e: any, collection: MongoCollection ) =>
+ {
+ if ( e )
+ {
+ reject( new DaoError(
+ 'Error creating collection: ' + e
+ ) );
+ return;
+ }
+
+ // initialize indexes
+ collection.createIndex(
+ [
+ ['published', 1],
+ ['deltaError', 1],
+ ],
+ true,
+ ( e: any, _index: { [P: string]: any } ) =>
+ {
+ if ( e )
+ {
+ reject( new DaoError(
+ 'Error creating index: ' + e
+ ) );
+ return;
+ }
+
+ resolve( collection );
+ return;
+ }
+ );
+ }
+ );
+ } );
+ } );
+} \ No newline at end of file
diff --git a/src/types/avro-js.d.ts b/src/types/avro-js.d.ts
new file mode 100644
index 0000000..a2eff67
--- /dev/null
+++ b/src/types/avro-js.d.ts
@@ -0,0 +1,89 @@
+/**
+ * avro-js type definitions
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import { Duplex } from 'stream';
+
+declare module "avro-js";
+
+declare function parse( schema: string ): AvroSchema;
+
+export declare interface AvroSchema
+{
+ /**
+ * Write data to a buffer
+ *
+ * @param data - the data to write
+ *
+ * @return the buffer if successful
+ */
+ toBuffer( data: Record<string, any> ): Buffer | null;
+
+
+ /**
+ * Check if data is valid against schema
+ *
+ * @param data - the data to validate
+ * @param opts - options specified as key/values
+ *
+ * @return the buffer if it is valid
+ */
+ isValid(
+ data: Record<string, any>,
+ opts?: Record<string, any>
+ ): Buffer | null;
+
+
+ /**
+ * Write to a buffer
+ *
+ * @param data - the data to write
+ * @param buffer - the buffer that will be written to
+ */
+ encode( data: Record<string, any>, buffer: Buffer ): void;
+
+
+ /**
+ * Output to a json string
+ *
+ * @param data - the data to format
+ *
+ * @return the formatted data
+ */
+ toString( data: Record<string, any> ): string;
+
+
+ /**
+ * Deserialize from a buffer
+ *
+ * @param buffer - the buffer to read from
+ *
+ * @return the resulting data
+ */
+ fromBuffer( buffer: Buffer ): any;
+}
+
+declare class BlockEncoder extends Duplex
+{
+ constructor( schema: AvroSchema );
+}
+
+export declare const streams: {
+ BlockEncoder: typeof BlockEncoder,
+};
diff --git a/src/types/misc.d.ts b/src/types/misc.d.ts
index 5572739..b26d827 100644
--- a/src/types/misc.d.ts
+++ b/src/types/misc.d.ts
@@ -65,4 +65,7 @@ type UnixTimestampMillis = NominalType<Milliseconds, 'UnixTimestampMillis'>;
* reduce the boilerplate of these function definitions, and to clearly
* document that this pattern is something that used to be done frequently.
*/
-type NodeCallback<T, R = void> = ( e: Error | null, result: T | null ) => R;
+type NodeCallback<T, R = void> = ( e: NullableError, result: T | null ) => R;
+
+/** Nullable error */
+type NullableError = Error | null;
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 808b458..bbbfa46 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -23,13 +23,59 @@
* front.
*/
+import { PositiveInteger } from "../numeric";
+
declare module "mongodb";
+export interface MongoDbConfig extends Record<string, any> {
+ /** Host */
+ host?: string;
+
+ /** Port number */
+ port?: number;
+
+ /** High availability */
+ ha: boolean;
+
+ /** The mongodb collection to read from */
+ collection: string;
+}
+
+
/**
- * Node-style callback for queries
+ * Interface for the mongo database
*/
-type MongoCallback = ( err: Error|null, data: { [P: string]: any } ) => void;
+export interface MongoDb
+{
+ /**
+ * Initialize the database connection
+ *
+ * @param callback continuation on completion
+ */
+ open( callback: MongoCallback ): void;
+
+
+ /**
+ * Close the database connection
+ *
+ * @param callback continuation on completion
+ */
+ close( callback: MongoCallback ): void;
+
+
+ /**
+ * Hook events
+ *
+ * @param event_id - the event to hook
+ * @param callback - a function to call in response to the event
+ */
+ on( event_id: string, callback: ( err: Error ) => void ): void;
+}
+
+
+/** Node-style callback for queries */
+type MongoCallback = ( err: NullableError, data: { [P: string]: any } ) => void;
/**
@@ -52,11 +98,31 @@ interface MongoQueryUpdateOptions
*/
interface MongoFindOneOptions
{
+ /** Fields to select */
fields?: MongoFieldSelector,
}
/**
+ * Options for `find` queries
+ *
+ * This is not at all comprehensive; it covers only the fields we actually
+ * make use of.
+ */
+interface MongoFindOptions
+{
+ /** Limit results returned */
+ limit?: PositiveInteger,
+
+ /** Whether to project only id's */
+ id?: number,
+
+ /** Which fields to include in the result set */
+ fields?: Record<string, number>,
+}
+
+
+/**
* Options for `findAndModify` queries
*
* This is not at all comprehensive; it covers only the fields we actually
@@ -76,21 +142,26 @@ interface MongoFindAndModifyOptions
/** Mongo query selector */
-type MongoSelector = { [P: string]: any };
-
+export type MongoSelector = { [P: string]: any };
/** Field selector */
type MongoFieldSelector = { [P: string]: number };
+/** Mongo index specification */
+type MongoIndexSpecification = Array< Array < string | number >>;
/** Mongo update clause */
-type MongoUpdate = MongoSelector;
+export type MongoUpdate = MongoSelector;
+/** Mongo object */
+type MongoObject = { [P: string]: any };
+
+/** Mongo update clause */
+type MongoInsertSpecification = MongoObject | MongoObject[];
/** Sorting clause **/
type MongoSortClause = Array<string | [ string, MongoSortDirection ]>;
-
/** Sort direction */
type MongoSortDirection = -1 | 1 | 'ascending' | 'descending' | 'asc' | 'desc';
@@ -115,8 +186,6 @@ declare interface MongoCollection
* @param data update data
* @param options query options
* @param callback continuation on completion
- *
- * @return callback return value
*/
update(
selector: MongoSelector,
@@ -127,6 +196,23 @@ declare interface MongoCollection
/**
+ * Execute a query and return the results
+ *
+ * Unlike `update`, the callback return value is not propagated, and so
+ * the callback ought not return anything.
+ *
+ * @param selector document query
+ * @param fields fields to return
+ * @param callback continuation on completion
+ */
+ find(
+ selector: MongoSelector,
+ fields: MongoFindOptions,
+ callback: MongoCallback
+ ): void;
+
+
+ /**
* Execute a query and return the first result
*
* Unlike `update`, the callback return value is not propagated, and so
@@ -158,4 +244,30 @@ declare interface MongoCollection
options: MongoFindAndModifyOptions,
callback: MongoCallback,
): void;
+
+
+ /**
+ * Creates an index on the collection
+ *
+ * @param fieldOrSpec - indexes to create
+ * @param options - mongo options
+ * @param callback - continuation on completion
+ */
+ createIndex(
+ fieldOrSpec: MongoIndexSpecification,
+ options: boolean,
+ callback: MongoCallback,
+ ): void;
+
+
+ /**
+ * Creates an index on the collection
+ *
+ * @param docs - documents to insert
+ * @param callback - continuation on completion
+ */
+ insert(
+ docs: MongoInsertSpecification,
+ callback: MongoCallback,
+ ): void;
}
diff --git a/src/version.d.ts b/src/version.d.ts
new file mode 100644
index 0000000..f7a4b0e
--- /dev/null
+++ b/src/version.d.ts
@@ -0,0 +1,39 @@
+/**
+ * Version information
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework
+ *
+ * Liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/** Format of version array */
+type VersionTuple = [ number, number, number, string ];
+
+/** Version information */
+declare interface Version extends VersionTuple
+{
+ major: number;
+ minor: number;
+ rev: number;
+ suffix: string;
+
+ toString(): string;
+}
+
+/** Exported version data */
+declare const version: Version;
+
+export = version;
diff --git a/test/bucket/delta.ts b/test/bucket/delta.ts
index ba1d192..cc9a790 100644
--- a/test/bucket/delta.ts
+++ b/test/bucket/delta.ts
@@ -19,12 +19,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
-import { createDelta as sut, Kv , DeltaResult} from "../../src/bucket/delta";
+import {
+ createDelta as sutCreate,
+ applyDelta as sutApply,
+ Kv,
+ DeltaResult,
+} from "../../src/bucket/delta";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
-interface SutTestCase<T>
+
+interface SutCreateTestCase<T>
{
label: string;
src_data: T;
@@ -32,68 +38,149 @@ interface SutTestCase<T>
expected: DeltaResult<T>;
}
+
+interface SutApplyTestCase<T>
+{
+ label: string;
+ bucket: T;
+ delta: DeltaResult<T>;
+ expected: T;
+}
+
+
describe( 'Delta', () =>
{
- ( <SutTestCase<Kv<string>>[]>[
- {
- label: "No changes are made, key is dropped",
- src_data: { foo: [ 'bar', 'baz' ] },
- dest_data: { foo: [ 'bar', 'baz' ] },
- expected: {},
- },
- {
- label: "Only the unchanged key is dropped",
- src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] },
- dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] },
- expected: { bar: [ 'asd' ] },
- },
- {
- label: "Changed values are updated by index with old value",
- src_data: { foo: [ "bar", "baz", "quux" ] },
- dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
- expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] },
- },
- {
- label: "The keys are null when they don't exist in first set",
- src_data: {},
- dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
- expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
- },
- {
- label: "Removed keys in new set show up",
- src_data: { foo: [ "bar" ] },
- dest_data: {},
- expected: { foo: null },
- },
- {
- label: "Indexes after a null terminator aren't included",
- src_data: { foo: [ "one", "two", "three", "four" ] },
- dest_data: { foo: [ "one", "done" ] },
- expected: { foo: [ undefined, "done", null ] },
- },
- {
- label: "Consider nested arrays to be scalar values",
- src_data: { foo: [ [ "one" ], [ "two", "three" ] ] },
- dest_data: { foo: [ [ "one" ], [ "two" ] ] },
- expected: { foo: [ undefined, [ "two" ] ] },
- },
- {
- label: "Don't evaluate zeros as falsy",
- src_data: { foo: [ 0 ] },
- dest_data: { foo: [ 0 ] },
- expected: {},
- },
+ describe( '#createDelta', () =>
+ {
+ ( <SutCreateTestCase<Kv<string>>[]>[
+ {
+ label: "No changes are made, key is dropped",
+ src_data: { foo: [ 'bar', 'baz' ] },
+ dest_data: { foo: [ 'bar', 'baz' ] },
+ expected: {},
+ },
+ {
+ label: "Only the unchanged key is dropped",
+ src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] },
+ dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] },
+ expected: { bar: [ 'asd' ] },
+ },
+ {
+ label: "Changed values are updated by index with old value",
+ src_data: { foo: [ "bar", "baz", "quux" ] },
+ dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] },
+ },
+ {
+ label: "The keys are null when they don't exist in first set",
+ src_data: {},
+ dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ },
+ {
+ label: "Removed keys in new set show up",
+ src_data: { foo: [ "bar" ] },
+ dest_data: {},
+ expected: { foo: null },
+ },
+ {
+ label: "Indexes after a null terminator aren't included",
+ src_data: { foo: [ "one", "two", "three", "four" ] },
+ dest_data: { foo: [ "one", "done" ] },
+ expected: { foo: [ undefined, "done", null ] },
+ },
+ {
+ label: "Consider nested arrays to be scalar values",
+ src_data: { foo: [ [ "one" ], [ "two", "three" ] ] },
+ dest_data: { foo: [ [ "one" ], [ "two" ] ] },
+ expected: { foo: [ undefined, [ "two" ] ] },
+ },
+ {
+ label: "Don't evaluate zeros as falsy",
+ src_data: { foo: [ 0 ] },
+ dest_data: { foo: [ 0 ] },
+ expected: {},
+ },
+ {
+ label: "Don't evaluate empty strings as falsy",
+ src_data: { foo: [ '' ] },
+ dest_data: { foo: [ '' ] },
+ expected: {},
+ },
+ ] ).forEach( ( { label, src_data, dest_data, expected } ) =>
{
- label: "Don't evaluate empty strings as falsy",
- src_data: { foo: [ '' ] },
- dest_data: { foo: [ '' ] },
- expected: {},
- },
- ] ).forEach( ( { label, src_data, dest_data, expected } ) =>
+ it( label, () =>
+ {
+ expect( sutCreate( src_data, dest_data ) )
+ .to.deep.equal( expected );
+ } );
+ } );
+ } );
+
+
+ describe( '#applyDelta', () =>
{
- it( label, () =>
+ ( <SutApplyTestCase<Kv<string>>[]>[
+ {
+ label: "Empty delta changes nothing",
+ bucket: { foo: [ 'bar', 'baz' ] },
+ delta: {},
+ expected: { foo: [ 'bar', 'baz' ] },
+ },
+ {
+ label: "Field not in delta is unchanged",
+ bucket: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] },
+ delta: { bar: [ 'asd' ] },
+ expected: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] },
+ },
+ {
+ label: "Undefined doesn't affect its corresponding index",
+ bucket: { foo: [ "bar", "baz", "quux" ] },
+ delta: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] },
+ expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ },
+ {
+ label: "Delta applys correctly on empty bucket",
+ bucket: {},
+ delta: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] },
+ },
+ {
+ label: "Keys are removed properly",
+ bucket: { foo: [ "bar" ] },
+ delta: { foo: null },
+ expected: {},
+ },
+ {
+ label: "Indexes after a null terminator aren't included",
+ bucket: { foo: [ "one", "two", "three", "four" ] },
+ delta: { foo: [ undefined, "done", null ] },
+ expected: { foo: [ "one", "done" ] },
+ },
+ {
+ label: "Consider nested arrays to be scalar values",
+ bucket: { foo: [ [ "one" ], [ "two", "three" ] ] },
+ delta: { foo: [ undefined, [ "two" ] ] },
+ expected: { foo: [ [ "one" ], [ "two" ] ] },
+ },
+ {
+ label: "Don't evaluate zeros as falsy",
+ bucket: { foo: [ 0 ] },
+ delta: {},
+ expected: { foo: [ 0 ] },
+ },
+ {
+ label: "Don't evaluate empty strings as falsy",
+ bucket: { foo: [ '' ] },
+ delta: {},
+ expected: { foo: [ '' ] },
+ },
+ ] ).forEach( ( { label, bucket, delta, expected } ) =>
{
- expect( sut( src_data, dest_data ) ).to.deep.equal( expected );
+ it( label, () =>
+ {
+ expect( sutApply( bucket, delta ) ).to.deep.equal( expected );
+ } );
} );
} );
} );
diff --git a/test/conf/ConfLoaderTest.js b/test/conf/ConfLoaderTest.ts
index b942216..4d71301 100644
--- a/test/conf/ConfLoaderTest.js
+++ b/test/conf/ConfLoaderTest.ts
@@ -1,15 +1,34 @@
/**
* Tests ConfLoader
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-'use strict';
+const chai = require( 'chai' );
+const expect = chai.expect;
+
+import { readFile } from "fs";
+
+import { ConfLoader as Sut } from "../../src/conf/ConfLoader";
+
+type FsLike = { readFile: typeof readFile };
-const chai = require( 'chai' );
-const expect = chai.expect;
const {
- conf: {
- ConfLoader: Sut,
- },
store: {
MemoryStore: Store,
},
@@ -25,8 +44,8 @@ describe( 'ConfLoader', () =>
const expected_path = "/foo/bar/baz.json";
const expected_data = '{ "foo": "bar" }';
- const fs = {
- readFile( path, encoding, callback )
+ const fs = <FsLike>{
+ readFile( path: string, encoding: string, callback: any )
{
expect( path ).to.equal( expected_path );
expect( encoding ).to.equal( 'utf8' );
@@ -36,7 +55,7 @@ describe( 'ConfLoader', () =>
};
return expect(
- Sut( fs, Store )
+ new Sut( fs, Store )
.fromFile( expected_path )
.then( conf => conf.get( 'foo' ) )
).to.eventually.deep.equal( JSON.parse( expected_data ).foo );
@@ -47,14 +66,14 @@ describe( 'ConfLoader', () =>
{
const expected_err = Error( 'rejected' );
- const fs = {
- readFile( _, __, callback )
+ const fs = <FsLike>{
+ readFile( _: any, __: any, callback: any )
{
callback( expected_err, null );
},
};
- return expect( Sut( fs ).fromFile( '' ) )
+ return expect( new Sut( fs, Store ).fromFile( '' ) )
.to.eventually.be.rejectedWith( expected_err );
} );
@@ -64,21 +83,21 @@ describe( 'ConfLoader', () =>
const result = { foo: {} };
const input = "foo";
- const fs = {
- readFile( _, __, callback )
+ const fs = <FsLike>{
+ readFile( _: any, __: any, callback: any )
{
callback( null, input );
},
};
- const sut = Sut.extend(
+ const sut = new class extends Sut
{
- 'override parseConfData'( given_input )
+ parseConfData( given_input: string )
{
expect( given_input ).to.equal( input );
return Promise.resolve( result );
- },
- } )( fs, Store );
+ }
+ }( fs, Store );
return expect(
sut.fromFile( '' )
@@ -91,8 +110,8 @@ describe( 'ConfLoader', () =>
{
const expected_err = SyntaxError( 'test parsing error' );
- const fs = {
- readFile( _, __, callback )
+ const fs = <FsLike>{
+ readFile( _: any, __: any, callback: any )
{
// make async so that we clear the stack, and therefore
// try/catch
@@ -100,13 +119,13 @@ describe( 'ConfLoader', () =>
},
};
- const sut = Sut.extend(
+ const sut = new class extends Sut
{
- 'override parseConfData'( given_input )
+ parseConfData( _given_input: string ): never
{
throw expected_err;
- },
- } )( fs, Store );
+ }
+ }( fs, Store );
return expect( sut.fromFile( '' ) )
.to.eventually.be.rejectedWith( expected_err );
@@ -117,20 +136,21 @@ describe( 'ConfLoader', () =>
{
const expected_err = Error( 'test Store ctor error' );
- const fs = {
- readFile: ( _, __, callback ) => callback( null, '' ),
+ const fs = <FsLike>{
+ readFile: ( _: any, __: any, callback: any ) =>
+ callback( null, '' ),
};
const badstore = () => { throw expected_err };
- return expect( Sut( fs, badstore ).fromFile( '' ) )
+ return expect( new Sut( fs, badstore ).fromFile( '' ) )
.to.eventually.be.rejectedWith( expected_err );
} );
it( "rejects promise on bad fs call", () =>
{
- return expect( Sut( {}, Store ).fromFile( '' ) )
+ return expect( new Sut( <FsLike>{}, Store ).fromFile( '' ) )
.to.eventually.be.rejected;
} );
} );
diff --git a/test/server/dapi/TokenedDataApiTest.ts b/test/server/dapi/TokenedDataApiTest.ts
index 27375fa..0d0a3ad 100644
--- a/test/server/dapi/TokenedDataApiTest.ts
+++ b/test/server/dapi/TokenedDataApiTest.ts
@@ -40,7 +40,7 @@ describe( 'TokenedDataApi', () =>
const expected_ns = 'foo_ns';
- ( <[string, boolean, ( e: Error|null ) => void][]>[
+ ( <[string, boolean, ( e: NullableError ) => void][]>[
[
"creates token and returns data if last_created",
true,
diff --git a/test/server/db/MongoServerDaoTest.js b/test/server/db/MongoServerDaoTest.ts
index 0fa5d30..58e6ab9 100644
--- a/test/server/db/MongoServerDaoTest.js
+++ b/test/server/db/MongoServerDaoTest.ts
@@ -21,9 +21,17 @@
'use strict';
-const chai = require( 'chai' );
-const expect = chai.expect;
-const { MongoServerDao: Sut } = require( '../../../' ).server.db;
+import { MongoServerDao as Sut } from "../../../src/server/db/MongoServerDao";
+import { MongoSelector, MongoUpdate, MongoDb } from "mongodb";
+import { expect, use as chai_use } from 'chai';
+import { ServerSideQuote } from "../../../src/server/quote/ServerSideQuote";
+import { PositiveInteger } from "../../../src/numeric";
+import { Program } from "../../../src/program/Program";
+import { RateResult } from "../../../src/server/rater/Rater";
+import { QuoteDataBucket } from "../../../src/bucket/QuoteDataBucket";
+import { QuoteId } from "../../../src/quote/Quote";
+
+chai_use( require( 'chai-as-promised' ) );
describe( 'MongoServerDao', () =>
@@ -41,9 +49,9 @@ describe( 'MongoServerDao', () =>
const quote = createStubQuote( metadata );
- const sut = Sut( createMockDb(
+ const sut = new Sut( createMockDb(
// update
- ( selector, data ) =>
+ ( _selector: MongoSelector, data: MongoUpdate ) =>
{
expect( data.$set[ 'meta.foo' ] )
.to.deep.equal( metadata.foo );
@@ -75,9 +83,9 @@ describe( 'MongoServerDao', () =>
const quote = createStubQuote( {} );
- const sut = Sut( createMockDb(
+ const sut = new Sut( createMockDb(
// update
- ( selector, data ) =>
+ (_selector: MongoSelector, data: MongoUpdate ) =>
{
expect( data.$push[ 'foo' ] )
.to.deep.equal( push_data.foo );
@@ -106,9 +114,9 @@ describe( 'MongoServerDao', () =>
const quote = createStubQuote( {} );
- const sut = Sut( createMockDb(
+ const sut = new Sut( createMockDb(
// update
- ( selector, data ) =>
+ ( _selector: MongoSelector, data: MongoUpdate ) =>
{
expect( data.$push ).to.equal( undefined );
@@ -131,24 +139,24 @@ describe( 'MongoServerDao', () =>
} );
-function createMockDb( on_update )
+function createMockDb( on_update: any ): MongoDb
{
const collection_quotes = {
update: on_update,
- createIndex: ( _, __, c ) => c(),
+ createIndex: ( _: any, __: any, c: any ) => c(),
};
const collection_seq = {
- find( _, __, c )
+ find( _: any, __: any, c: any )
{
c( null, {
- toArray: c => c( null, { length: 5 } ),
+ toArray: ( c: any ) => c( null, { length: 5 } ),
} );
},
};
const db = {
- collection( id, c )
+ collection( id: any, c: any )
{
const coll = ( id === 'quotes' )
? collection_quotes
@@ -158,8 +166,9 @@ function createMockDb( on_update )
},
};
- const driver = {
- open: c => c( null, db ),
+ const driver = <MongoDb>{
+ open: ( c: any ) => c( null, db ),
+ close: () => {},
on: () => {},
};
@@ -167,24 +176,53 @@ function createMockDb( on_update )
}
-function createStubQuote( metadata )
+function createStubQuote( metadata: Record<string, any> )
{
- return {
- getBucket: () => ( {
+ const program = <Program>{
+ getId: () => '1',
+ ineligibleLockCount: 0,
+ apis: {},
+ internal: {},
+ meta: {
+ arefs: {},
+ fields: {},
+ groups: {},
+ qdata: {},
+ qtypes: {},
+ },
+ mapis: {},
+ initQuote: () => {},
+ };
+
+ const quote = <ServerSideQuote>{
+ getBucket: () => <QuoteDataBucket>( {
getData: () => {},
} ),
- getMetabucket: () => ( {
+ getMetabucket: () => <QuoteDataBucket>( {
getData: () => metadata,
} ),
- getId: () => 1,
- getProgramVersion: () => 0,
- getLastPremiumDate: () => 0,
- getRatedDate: () => 0,
+ getId: () => <QuoteId>123,
+ getProgramVersion: () => 'Foo',
+ getLastPremiumDate: () => <UnixTimestamp>0,
+ getRatedDate: () => <UnixTimestamp>0,
getExplicitLockReason: () => "",
- getExplicitLockStep: () => 0,
+ getExplicitLockStep: () => <PositiveInteger>1,
isImported: () => false,
isBound: () => false,
+ getTopVisitedStepId: () => <PositiveInteger>1,
+ getTopSavedStepId: () => <PositiveInteger>1,
+ setRatedDate: () => quote,
+ setRateBucket: () => quote,
+ setRatingData: () => quote,
+ getRatingData: () => <RateResult>{ _unavailable_all: '0' },
+ getProgram: () => program,
+ setExplicitLock: () => quote,
+ getProgramId: () => 'Foo',
+ getCurrentStepId: () => 0,
+ setLastPremiumDate: () => quote,
};
+
+ return quote;
}
diff --git a/test/server/request/DataProcessorTest.ts b/test/server/request/DataProcessorTest.ts
index d0e52d8..2be9d61 100644
--- a/test/server/request/DataProcessorTest.ts
+++ b/test/server/request/DataProcessorTest.ts
@@ -503,7 +503,7 @@ function createStubs(
function createStubUserRequest( internal: boolean )
{
- return {
+ return <UserRequest>{
getSession: () => ( {
isInternal: () => internal
} )
@@ -713,7 +713,44 @@ function createStubQuote()
getBucket()
{
return new QuoteDataBucket();
- }
+ },
+
+ getMetabucket(){
+ return new QuoteDataBucket();
+ },
+
+ getProgramVersion(){
+ return 'Foo';
+ },
+
+ getExplicitLockReason(){
+ return 'Reason';
+ },
+
+ getExplicitLockStep()
+ {
+ return <PositiveInteger>1;
+ },
+
+ isImported()
+ {
+ return true;
+ },
+
+ isBound()
+ {
+ return true;
+ },
+
+ getTopVisitedStepId()
+ {
+ return <PositiveInteger>1;
+ },
+
+ getTopSavedStepId()
+ {
+ return <PositiveInteger>1;
+ },
};
}
diff --git a/test/server/service/RatingServiceTest.ts b/test/server/service/RatingServiceTest.ts
index 8896c49..546118c 100644
--- a/test/server/service/RatingServiceTest.ts
+++ b/test/server/service/RatingServiceTest.ts
@@ -36,6 +36,7 @@ import { UserRequest } from "../../../src/server/request/UserRequest";
import { UserResponse } from "../../../src/server/request/UserResponse";
import { UserSession } from "../../../src/server/request/UserSession";
import { QuoteDataBucket } from "../../../src/bucket/QuoteDataBucket";
+import { PositiveInteger } from "../../../src/numeric";
import { Kv } from "../../../src/bucket/delta";
import {
@@ -573,19 +574,27 @@ function getStubs()
const response = <UserResponse>{};
const quote = <ServerSideQuote>{
- getProgramId: () => program_id,
- getProgram: () => program,
- getId: () => <QuoteId>0,
- setLastPremiumDate: () => quote,
- setRatedDate: () => quote,
- getRatedDate: () => <UnixTimestamp>0,
- getLastPremiumDate: () => <UnixTimestamp>0,
- getCurrentStepId: () => 0,
- setExplicitLock: () => quote,
- setRateBucket: () => quote,
- setRatingData: () => quote,
- getRatingData: () => stub_rate_data,
- getBucket: () => new QuoteDataBucket(),
+ getProgramId: () => program_id,
+ getProgram: () => program,
+ getId: () => <QuoteId>0,
+ setLastPremiumDate: () => quote,
+ setRatedDate: () => quote,
+ getRatedDate: () => <UnixTimestamp>0,
+ getLastPremiumDate: () => <UnixTimestamp>0,
+ getCurrentStepId: () => 0,
+ setExplicitLock: () => quote,
+ setRateBucket: () => quote,
+ setRatingData: () => quote,
+ getRatingData: () => stub_rate_data,
+ getBucket: () => new QuoteDataBucket(),
+ getMetabucket: () => new QuoteDataBucket(),
+ getProgramVersion: () => 'Foo',
+ getExplicitLockReason: () => 'Reason',
+ getExplicitLockStep: () => <PositiveInteger>1,
+ isImported: () => true,
+ isBound: () => true,
+ getTopVisitedStepId: () => <PositiveInteger>1,
+ getTopSavedStepId: () => <PositiveInteger>1,
};
return {
diff --git a/test/server/token/MongoTokenDaoTest.ts b/test/server/token/MongoTokenDaoTest.ts
index 167685a..23e403e 100644
--- a/test/server/token/MongoTokenDaoTest.ts
+++ b/test/server/token/MongoTokenDaoTest.ts
@@ -26,7 +26,7 @@ import {
} from "../../../src/server/token/TokenDao";
import { MongoTokenDao as Sut } from "../../../src/server/token/MongoTokenDao";
-
+import { MongoCollection } from "mongodb";
import {
TokenId,
TokenNamespace,
@@ -248,6 +248,9 @@ describe( 'server.token.TokenDao', () =>
update() {},
findOne() {},
+ find() {},
+ createIndex() {},
+ insert() {},
};
return expect(
@@ -269,6 +272,9 @@ describe( 'server.token.TokenDao', () =>
update() {},
findOne() {},
+ find() {},
+ createIndex() {},
+ insert() {},
};
return expect(
@@ -477,6 +483,9 @@ describe( 'server.token.TokenDao', () =>
update() {},
findAndModify() {},
+ find() {},
+ createIndex() {},
+ insert() {},
};
const result = new Sut( coll, field, () => <UnixTimestamp>0 )
@@ -520,6 +529,9 @@ describe( 'server.token.TokenDao', () =>
update() {},
findAndModify() {},
+ find() {},
+ createIndex() {},
+ insert() {},
};
return expect(
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
new file mode 100644
index 0000000..1587bb2
--- /dev/null
+++ b/test/system/DeltaProcessorTest.ts
@@ -0,0 +1,665 @@
+/**
+ * Delta Processor test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
+import { AmqpPublisher } from '../../src/system/AmqpPublisher';
+import { DeltaDao } from '../../src/system/db/DeltaDao';
+import { DeltaDocument } from '../../src/bucket/delta';
+import { DocumentId } from '../../src/document/Document';
+import { EventEmitter } from 'events';
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+
+describe( 'system.DeltaProcessor', () =>
+{
+ describe( '#process', () =>
+ {
+ ( <{
+ label: string,
+ given: any[],
+ expected: any
+ }[]>[
+ {
+ label: 'No deltas are processed',
+ given: [
+ {
+ id: 123,
+ lastUpdate: 123123123,
+ data: {},
+ ratedata: {},
+ rdelta: {},
+ },
+ ],
+ expected: [],
+ },
+
+ // when quote is initialized: { foo: [ '' ], state: [ 'a' ] }
+ {
+ label: 'Publishes deltas in order',
+
+ given: [
+ {
+ id: 123,
+ lastUpdate: 123123123,
+
+ data: {
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
+ },
+
+ ratedata: {
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
+ },
+
+ rdelta: {
+ data: [
+ {
+ timestamp: 1,
+ data: {
+ foo: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ {
+ timestamp: 3,
+ data: {
+ foo: [ 'first' ],
+ state: [ undefined, undefined, null ],
+ },
+ },
+ {
+ timestamp: 5,
+ data: {
+ foo: [ 'second' ],
+ state: [ undefined, undefined, undefined, null ],
+ },
+ },
+ ],
+
+ ratedata: [
+ {
+ timestamp: 2,
+ data: {
+ prem: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ {
+ timestamp: 4,
+ data: {
+ prem: [ 'rate_first' ],
+ state: [ undefined, undefined, null ],
+ },
+ },
+ ],
+ },
+ },
+ ],
+
+ expected: [
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {},
+ },
+
+ // rate
+ {
+ doc_id: 123,
+ rdelta: {
+ prem: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
+ },
+ },
+
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ 'first' ],
+ state: [ undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
+ },
+ ratedata: {},
+ },
+
+ // rate
+ {
+ doc_id: 123,
+ rdelta: {
+ prem: [ 'rate_first' ],
+ state: [ undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
+ },
+ ratedata: {
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
+ },
+ },
+
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ 'second' ],
+ state: [ undefined, undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
+ },
+ ratedata: {},
+ },
+ ],
+ },
+
+ {
+ label: 'Publishes deltas in order for multiple documents',
+
+ given: [
+ {
+ id: 123,
+ lastUpdate: 123123123,
+
+ data: {
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+
+ ratedata: {
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
+ },
+
+ rdelta: {
+ data: [
+ {
+ timestamp: 1,
+ data: {
+ foo: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ ],
+
+ ratedata: [
+ {
+ timestamp: 4,
+ data: {
+ prem: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ ],
+ },
+ },
+
+ // timestamps of this document are sandwiched between
+ // the above to make sure documents are processed
+ // independently (without splicing their deltas together)
+ {
+ id: 234,
+ lastUpdate: 121212123,
+
+ data: {
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+
+ ratedata: {
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
+ },
+
+ rdelta: {
+ data: [
+ {
+ timestamp: 2,
+ data: {
+ foo2: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ ],
+
+ ratedata: [
+ {
+ timestamp: 3,
+ data: {
+ prem2: [ '' ],
+ state: [ undefined, null ],
+ },
+ },
+ ],
+ },
+ },
+ ],
+
+ expected: [
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {},
+ },
+
+ // rate
+ {
+ doc_id: 123,
+ rdelta: {
+ prem: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
+ },
+ },
+
+ // bucket
+ {
+ doc_id: 234,
+ rdelta: {
+ foo2: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {},
+ },
+
+ // rate
+ {
+ doc_id: 234,
+ rdelta: {
+ prem2: [ '' ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
+ },
+ ratedata: {
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
+ },
+ },
+ ],
+ },
+
+ {
+ label: 'trims delta array based on index',
+ given: [
+ {
+ id: 111,
+ lastUpdate: 123123123,
+ data: { foo: [ 'second' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ '' ] },
+ timestamp: 123,
+ },
+ {
+ data: { foo: [ 'first' ] },
+ timestamp: 234,
+ },
+ ],
+ },
+ totalPublishDelta: {
+ data: 1,
+ },
+ },
+ ],
+ expected: [
+ {
+ doc_id: 111,
+ rdelta: { foo: [ 'first' ] },
+ bucket: { foo: [ 'second' ] },
+ ratedata: {}
+ },
+ ],
+ },
+ ] ).forEach( ( { label, given, expected } ) => it( label, () =>
+ {
+ let published: any = [];
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ {
+ return Promise.resolve( given );
+ }
+
+ publisher.publish = (
+ meta,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ doc_id: meta.id,
+ rdelta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ return expect( new Sut( dao, publisher, emitter ).process() )
+ .to.eventually.deep.equal( undefined )
+ .then( _ => expect( published ).to.deep.equal( expected ) );
+ } ) );
+ } );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Marks document in error state and continues', () =>
+ {
+ let published: any = [];
+ let error_flag_set = false;
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const entity_num = 'Some Agency';
+ const entity_id = 4321;
+ const lastUpdate = <UnixTimestamp>123123123;
+ const createdData = <UnixTimestamp>234234234;
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: lastUpdate,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ const expected_published = [
+ {
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ },
+ {
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 234,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ }
+ ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ {
+ error_flag_set = true;
+ return Promise.resolve();
+ }
+
+ publisher.publish = (
+ meta,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ meta: meta,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect( new Sut( dao, publisher, emitter ).process() )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( error_flag_set ).to.be.true;
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Failure to set document error state further processing', () =>
+ {
+ let published: any = [];
+ let caught_error = '';
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ // Only one is published
+ const expected_published = [ {
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ } ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( 'Couldn\'t mark document' ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ publisher.publish = (
+ meta,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ meta,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect(
+ new Sut( dao, publisher, emitter ).process()
+ .catch( e => { caught_error = e.message } )
+ )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( caught_error ).to.equal( expected_error );
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
+} );
+
+
+function createMockDeltaDao(): DeltaDao
+{
+ return <DeltaDao>{
+ getUnprocessedDocuments() { return Promise.resolve( [] ); },
+ advanceDeltaIndex() { return Promise.resolve(); },
+ markDocumentAsProcessed() { return Promise.resolve(); },
+ setErrorFlag() { return Promise.resolve(); },
+ getErrorCount() { return Promise.resolve( 0 ); },
+ };
+}
+
+
+function createMockDeltaPublisher(): AmqpPublisher
+{
+ return <AmqpPublisher>{
+ publish() { return Promise.resolve(); },
+ };
+}
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
new file mode 100644
index 0000000..f352b0c
--- /dev/null
+++ b/test/system/DeltaPublisherTest.ts
@@ -0,0 +1,236 @@
+/**
+ * Delta publisher test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { AmqpConnection } from '../../src/system/amqp/AmqpConnection';
+import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
+import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
+import { DocumentId, DocumentMeta } from '../../src/document/Document';
+import { EventEmitter } from 'events';
+import { hasContext } from '../../src/error/ContextError';
+import { AmqpError } from '../../src/error/AmqpError';
+import { Channel } from 'amqplib';
+import { MessageWriter } from '../../src/system/MessageWriter';
+
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+
+describe( 'server.DeltaPublisher', () =>
+{
+ describe( '#publish', () =>
+ {
+ it( 'sends a message', () =>
+ {
+ let publish_called = false;
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ conn.getAmqpChannel = () =>
+ {
+ return <Channel>{
+ publish: ( _: any, __: any, buf: any, ___: any ) =>
+ {
+ expect( buf instanceof Buffer ).to.be.true;
+
+ publish_called = true;
+
+ return true;
+ }
+ };
+ };
+
+ const sut = new Sut( emitter, ts_ctr, conn, writer );
+
+ return expect(
+ sut.publish( meta, delta, bucket, ratedata )
+ ).to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( publish_called ).to.be.true;
+ } );
+ } );
+
+ ( <[string, () => Channel | undefined, Error, string ][]>[
+ [
+ 'Throws an error when publishing was unsuccessful',
+ () =>
+ {
+ return <Channel>{
+ publish: ( _: any, __: any, _buf: any, ___: any ) =>
+ {
+ return false;
+ }
+ };
+ },
+ Error,
+ 'Delta publish failed'
+ ],
+ [
+ 'Throws an error when no amqp channel is found',
+ () =>
+ {
+ return undefined;
+ },
+ AmqpError,
+ 'Error sending message: No channel'
+ ]
+ ] ).forEach( ( [ label, getChannelF, error_type, err_msg ] ) =>
+ it( label, () =>
+ {
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ const expected = {
+ doc_id: meta.id,
+ delta_type: delta.type,
+ delta_ts: delta.timestamp
+ }
+
+ conn.getAmqpChannel = getChannelF;
+
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith(
+ error_type, err_msg
+ ),
+ result.catch( e =>
+ {
+ if ( !hasContext( e ) )
+ {
+ return expect.fail();
+ }
+
+ return expect( e.context ).to.deep.equal( expected );
+ } )
+ ] );
+ } ) );
+
+
+ it( 'writer#write rejects', () =>
+ {
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const error = new Error( 'Bad thing happened' );
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ writer.write = (
+ _: any,
+ __: any,
+ ___: any,
+ ____: any,
+ _____: any
+ ): Promise<Buffer> =>
+ {
+ return Promise.reject( error );
+ };
+
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ return expect( e ).to.deep.equal( error );
+ } )
+ ] );
+ } )
+ } );
+} );
+
+
+function ts_ctr(): UnixTimestamp
+{
+ return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 );
+}
+
+
+function createMockAmqpConnection(): AmqpConnection
+{
+ return <AmqpConnection>{
+ connect: () => {},
+ getExchangeName: () => { 'Foo' },
+ };
+}
+
+
+function createMockBucketData(): Record<string, any>
+{
+ return {
+ foo: [ 'bar', 'baz' ]
+ }
+}
+
+
+function createMockDelta(): Delta<any>
+{
+ return <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ }
+}
+
+
+function createMockWriter(): MessageWriter
+{
+ return <MessageWriter>{
+ write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer>
+ {
+ return Promise.resolve( Buffer.from( '' ) );
+ }
+ };
+} \ No newline at end of file
diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts
new file mode 100644
index 0000000..abfbef8
--- /dev/null
+++ b/test/system/EventMediatorTest.ts
@@ -0,0 +1,145 @@
+/**
+ * Event logger test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { EventMediator as Sut } from '../../src/system/EventMediator';
+import { context } from '../../src/error/ContextError';
+import { EventEmitter } from "events";
+import { expect } from 'chai';
+import { PsrLogger } from '../../src/system/PsrLogger';
+
+
+describe( 'system.EventLogger captures and logs events', () =>
+{
+ it( 'document-processed triggers log#notice', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'document-processed';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.notice = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'delta-publish triggers log#notice', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'delta-publish';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.notice = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'amqp-conn-warn triggers log#warning', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'amqp-conn-warn';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.warning = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'amqp-reconnect triggers log#warning', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'amqp-reconnect';
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+
+ log.warning = ( _str: string ) => { method_called = true; };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id );
+
+ expect( method_called ).to.be.true;
+ } );
+
+ it( 'context and stack are retrieved from error', () =>
+ {
+ let method_called = false;
+
+ const event_id = 'error';
+ const err_msg = 'Foo';
+ const stub_err = new Error( err_msg );
+ const emitter = new EventEmitter();
+ const log = createMockLogger();
+ const err_context = { bar: 'baz' };
+
+ const expected_context = {
+ bar: err_context.bar,
+ stack: stub_err.stack,
+ };
+
+ log.error = ( str: string, context: any ) =>
+ {
+ expect( str ).to.equal( err_msg );
+ expect( context ).to.deep.equal( expected_context );
+
+ method_called = true;
+ };
+
+ new Sut( log, emitter );
+
+ emitter.emit( event_id, context( stub_err, err_context ) );
+
+ expect( method_called ).to.be.true;
+ } );
+} );
+
+
+function createMockLogger(): PsrLogger
+{
+ return <PsrLogger>{
+ debug( _msg: string | object, _context: object ){},
+ info( _msg: string | object, _context: object ){},
+ notice( _msg: string | object, _context: object ){ console.log( 'asdasd msg: ', _msg ); },
+ warning( _msg: string | object, _context: object ){},
+ error( _msg: string | object, _context: object ){},
+ critical( _msg: string | object, _context: object ){},
+ alert( _msg: string | object, _context: object ){},
+ emergency( _msg: string | object, _context: object ){},
+ log( _level: any, _msg: string | object, _context: object ){},
+ };
+}
diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts
new file mode 100644
index 0000000..9a36584
--- /dev/null
+++ b/test/system/MetricsCollectorTest.ts
@@ -0,0 +1,165 @@
+/**
+ * Metrics collector test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import {
+ PrometheusFactory,
+ PrometheusConfig,
+} from '../../src/system/PrometheusFactory';
+import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
+import { EventEmitter } from 'events';
+import { expect } from 'chai';
+import {
+ MetricsCollector as Sut,
+ MetricTimer,
+} from '../../src/system/MetricsCollector';
+
+const sinon = require( 'sinon' );
+
+describe( 'system.MetricsCollector captures events and pushes metrics', () =>
+{
+ it( 'process-complete event is hooked', () =>
+ {
+ let histogram_called = false;
+ let counter_called = false;
+
+ const emitter = new EventEmitter();
+ const conf = createMockConfig();
+ const timer = createMockTimer();
+ const factory = createMockFactory( {
+ histogram_cb: () => { histogram_called = true },
+ counter_cb: () => { counter_called = true },
+ } );
+
+ const sut = new Sut( factory, conf, emitter, timer );
+
+ emitter.emit( 'delta-process-end' );
+
+ expect( histogram_called ).to.be.true;
+ expect( counter_called ).to.be.true;
+
+ sut.stop();
+ } );
+
+
+ it( 'process-error event is hooked', () =>
+ {
+ let counter_called = false;
+
+ const emitter = new EventEmitter();
+ const conf = createMockConfig();
+ const timer = createMockTimer();
+ const factory = createMockFactory( {
+ counter_cb: () => { counter_called = true },
+ } );
+
+ const sut = new Sut( factory, conf, emitter, timer );
+
+ emitter.emit( 'error' );
+
+ expect( counter_called ).to.be.true;
+
+ sut.stop();
+ } );
+
+
+ it( 'process-complete is timed properly', () =>
+ {
+ let actual_ms = 0;
+ const uid = 'foo';
+ const start_time_ns = 1234;
+ const end_time_ns = 5678;
+ const expected_ms = ( end_time_ns - start_time_ns ) / 1000000;
+ const emitter = new EventEmitter();
+ const conf = createMockConfig();
+ const timer = createMockTimer( start_time_ns, end_time_ns );
+ const factory = createMockFactory( {
+ histogram_cb: ( n: number ) => { actual_ms = n },
+ } );
+
+ const sut = new Sut( factory, conf, emitter, timer );
+
+ emitter.emit( 'delta-process-start', uid );
+ emitter.emit( 'delta-process-end', uid );
+
+ expect( actual_ms ).to.be.equal( expected_ms );
+
+ sut.stop();
+ } );
+} );
+
+
+function createMockFactory(
+ {
+ gateway_cb = () => {},
+ counter_cb = () => {},
+ histogram_cb = ( _n: number = 0 ) => {},
+ gauge_cb = ( _n: number = 0 ) => {},
+ }:
+ {
+ gateway_cb ?: () => void;
+ counter_cb ?: () => void;
+ histogram_cb ?: ( _n: number ) => void;
+ gauge_cb ?: ( _n: number ) => void;
+ }
+): PrometheusFactory
+{
+ const gateway = sinon.mock( Pushgateway );
+ const counter = sinon.mock( Counter );
+ const histogram = sinon.mock( Histogram );
+ const gauge = sinon.mock( Gauge );
+
+ gateway.pushAdd = gateway_cb;
+ counter.inc = counter_cb;
+ histogram.observe = histogram_cb;
+ gauge.set = gauge_cb;
+
+ return <PrometheusFactory>{
+ createGateway() { return gateway },
+ createCounter() { return counter },
+ createHistogram(){ return histogram },
+ createGauge() { return gauge },
+ };
+}
+
+
+function createMockConfig(): PrometheusConfig
+{
+ return <PrometheusConfig>{
+ hostname: 'foo.com',
+ port: 123,
+ env: 'test',
+ push_interval_ms: 1000,
+ }
+}
+
+
+function createMockTimer( _start: number = 0, _end: number = 0 ): MetricTimer
+{
+ return ( _start_time?: [ number, number ] ) =>
+ {
+ if ( !_start_time )
+ {
+ return [ 0, _start ];
+ }
+
+ return [ 0, _end - _start_time[ 1 ] ];
+ };
+} \ No newline at end of file
diff --git a/test/system/StandardLoggerTest.ts b/test/system/StandardLoggerTest.ts
new file mode 100644
index 0000000..918bfd1
--- /dev/null
+++ b/test/system/StandardLoggerTest.ts
@@ -0,0 +1,178 @@
+/**
+ * Event logger test
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { StandardLogger as Sut } from '../../src/system/StandardLogger';
+import { LogLevel } from '../../src/system/PsrLogger';
+import { expect } from 'chai';
+
+const sinon = require( 'sinon' );
+
+declare interface MockConsole extends Console {
+ getLevel(): string,
+ getStr(): string,
+}
+
+describe( 'system.EventLogger captures and logs events', () =>
+{
+ it( 'debug triggers console output level: info', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.debug( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'info' );
+ } );
+
+ it( 'info triggers console output level: info', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.info( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'info' );
+ } );
+
+ it( 'notice triggers console output level: log', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.notice( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'log' );
+ } );
+
+ it( 'warning triggers console output level: warn', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.warning( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'warn' );
+ } );
+
+ it( 'error triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.error( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'critical triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.critical( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'alert triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.alert( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'emergency triggers console output level: error', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.emergency( 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'log triggers corresponding log level', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+
+ sut.log( LogLevel.ERROR, 'Foo' );
+
+ expect( con.getLevel() ).to.equal( 'error' );
+ } );
+
+ it( 'Context is included in structured output', () =>
+ {
+ const con = createMockConsole();
+ const env = 'test';
+ const sut = new Sut( con, ts_ctr, env );
+ const context = { bar: 'baz' };
+ const expected_output = {
+ message: 'Foo',
+ timestamp: 123123,
+ service: 'quote-server',
+ env: 'test',
+ severity: 'NOTICE',
+ context: {
+ bar: 'baz',
+ },
+ };
+
+ sut.notice( 'Foo', context );
+
+ expect( con.getStr() ).to.deep.equal( expected_output );
+ } );
+} );
+
+
+function ts_ctr(): UnixTimestamp
+{
+ return <UnixTimestamp>123123;
+}
+
+
+function createMockConsole(): MockConsole
+{
+ const mock = sinon.mock( console );
+
+ mock.lvl = '';
+ mock.str = '';
+ mock.info = ( str: string ) => { mock.str = str; mock.lvl = 'info'; };
+ mock.log = ( str: string ) => { mock.str = str; mock.lvl = 'log'; };
+ mock.warn = ( str: string ) => { mock.str = str; mock.lvl = 'warn'; };
+ mock.error = ( str: string ) => { mock.str = str; mock.lvl = 'error'; };
+ mock.getLevel = () => mock.lvl;
+ mock.getStr = () => mock.str;
+
+ return mock;
+}
diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts
new file mode 100644
index 0000000..271d735
--- /dev/null
+++ b/test/system/V1MessageWriterTest.ts
@@ -0,0 +1,532 @@
+/**
+ * V1 Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Tests for Version 1 of the avro message writer
+ */
+
+import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter';
+import { hasContext, context } from '../../src/error/ContextError';
+import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
+import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
+import { DocumentMeta, DocumentId } from '../../src/document/Document';
+import { Duplex } from 'stream';
+import { AvroSchema } from 'avro-js';
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+const sinon = require( 'sinon' );
+
+describe( 'system.V1MessageWriter', () =>
+{
+ it( 'Rejects improperly formatted data', () =>
+ {
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const error = new Error( 'Oh no' );
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ const expected = {
+ invalid_paths: 'Foo',
+ invalid_data: 'Bar',
+ };
+
+ const error_context = context( error, expected );
+
+ schema.isValid = () => { throw error_context; };
+
+ const result = new Sut(
+ createMockEncoderCtor( schema ),
+ schema,
+ ).write( ts, meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ if ( !hasContext( e ) )
+ {
+ return expect.fail();
+ }
+
+ return expect( e.context ).to.deep.equal( expected );
+ } )
+ ] );
+ } );
+
+
+ describe( '#avroEncode parses', () =>
+ {
+ [
+ {
+ label: 'Null value',
+ valid: true,
+ delta_data: { foo: null },
+ },
+ {
+ label: 'Null array',
+ valid: true,
+ delta_data: { foo: { 'array': [ null ] } },
+ },
+ {
+ label: 'Boolean value',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null,
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] }
+ ] } },
+ },
+ {
+ label: 'Non-array',
+ valid: false,
+ delta_data: { foo: 'bar' },
+ },
+ {
+ label: 'Map objects',
+ valid: true,
+ delta_data: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'baz': { 'double': 1572903485000 },
+ } }
+ } }
+ ] } },
+ }
+ ].forEach( ( { label, delta_data, valid } ) =>
+ {
+ it( label, () =>
+ {
+ const data = createMockData( delta_data );
+ const schema = createMockAvroSchema();
+
+ const sut = new Sut(
+ createMockEncoderCtor( schema ),
+ schema
+ );
+
+ sut.avroEncode( data )
+ .then( b =>
+ {
+ expect( typeof(b) ).to.equal( 'object' );
+ expect( valid ).to.be.true;
+ } )
+ .catch( _ =>
+ {
+ expect( valid ).to.be.false;
+ } );
+ } );
+ } );
+ } );
+
+
+ describe( '#setDataTypes annotates', () =>
+ {
+ [
+ {
+ label: 'Null',
+ delta_data: null,
+ expected: null,
+ },
+ {
+ label: 'Null Value',
+ delta_data: { foo: null },
+ expected: { foo: null },
+ },
+ {
+ label: 'Boolean Value',
+ delta_data: { foo: [ true ] },
+ expected: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ delta_data: { foo: [
+ 123
+ ] },
+ expected: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 'baz',
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Double nested array',
+ delta_data: { foo: [
+ [
+ [
+ 'bar',
+ 123,
+ null
+ ],
+ ],
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123 },
+ null,
+ ] },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ null
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 123321,
+ null,
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] },
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ {
+ 'bar': {
+ 'wer': 'qaz',
+ 'qwe': 1572903485000,
+ 'asd': true,
+ 'zxc': null,
+ },
+ },
+ ] },
+ expected: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'wer': { 'string': 'qaz' },
+ 'qwe': { 'double': 1572903485000 },
+ 'asd': { 'boolean': true },
+ 'zxc': null,
+ } },
+ } },
+ ] } },
+ },
+ ].forEach( ( { label, delta_data, expected } ) =>
+ {
+ it( label, () =>
+ {
+ const encoded = 'FooBar';
+ const avroEncoderCtr = createMockEncoder( encoded );
+ const stub_schema = <AvroSchema>{};
+ const sut = new Sut(
+ avroEncoderCtr,
+ stub_schema,
+ );
+ const actual = sut.setDataTypes( delta_data );
+
+ expect( actual ).to.deep.equal( expected );
+ } );
+ } );
+ } );
+
+
+ it( 'Message is formatted correctly', () =>
+ {
+ const bucket = { foo: [ 'bar', 'baz' ] };
+ const ratedata = {};
+ const doc_id = <DocumentId>123;
+ const entity_name = 'Some Agency';
+ const entity_id = 123;
+ const startDate = <UnixTimestamp>345;
+ const lastUpdate = <UnixTimestamp>456;
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const encoder = createMockEncoderCtor( schema );
+ const meta = <DocumentMeta>{
+ id: doc_id,
+ entity_name: entity_name,
+ entity_id: entity_id,
+ startDate: startDate,
+ lastUpdate: lastUpdate,
+ };
+
+ const delta = <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ };
+
+ const expected = {
+ event: {
+ id: 'STEP_SAVE',
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: doc_id,
+ created: startDate,
+ modified: lastUpdate,
+ },
+ session: {
+ Session: {
+ entity_name: entity_name,
+ entity_id: entity_id,
+ },
+ },
+ data: {
+ Data: {
+ bucket: {
+ 'foo': { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] }
+ },
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: {},
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta.data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: '',
+ },
+ },
+ };
+
+ let is_valid_called = false;
+
+ schema.isValid = ( data: Record<string, any>, _:any ) =>
+ {
+ expect( data ).to.deep.equal( expected );
+
+ is_valid_called = true;
+
+ return null;
+ }
+
+ return expect( new Sut( encoder, schema )
+ .write( ts, meta, delta, bucket, ratedata ) )
+ .to.eventually.deep.equal( Buffer.from( '' ) )
+ .then( _ =>
+ {
+ expect( is_valid_called ).to.be.true;
+ } )
+ } );
+} );
+
+
+function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
+{
+ return ( _schema: AvroSchema ) =>
+ {
+ const mock = sinon.mock( Duplex );
+
+ mock.on = ( _: string, __: any ) => {};
+ mock.end = ( _: any ) => { return mock_encoded_data; };
+
+ return mock;
+ };
+}
+
+
+function createMockData( delta_data: any ): any
+{
+
+ return {
+ event: {
+ id: 'RATE',
+ ts: 1573856916,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: 123123,
+ created: 1573856916,
+ modified: 1573856916,
+ top_visited_step: '2',
+ },
+ data: null,
+ ratedata: null,
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa',
+ },
+ },
+ };
+}
+
+
+function createMockDelta(): Delta<any>
+{
+ return <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ }
+}
+
+
+function createMockBucketData(): Record<string, any>
+{
+ return {
+ foo: [ 'bar', 'baz' ]
+ }
+}
+
+
+function createMockEncoderCtor( stub_schema: AvroSchema ):
+ ( schema: AvroSchema ) => Duplex
+{
+ const events = <Record<string, () => void>>{};
+
+ const mock_duplex = <Duplex>(<unknown>{
+ on( event_name: string, callback: () => void )
+ {
+ events[ event_name ] = callback;
+ },
+
+ end()
+ {
+ events.end();
+ },
+ } );
+
+ return ( schema: AvroSchema ): Duplex =>
+ {
+ expect( schema ).to.equal( stub_schema );
+ return mock_duplex;
+ };
+}
+
+
+function createMockAvroSchema(): AvroSchema
+{
+ return <AvroSchema>{
+ toBuffer() { return null },
+ isValid() { return null },
+ encode() {},
+ toString() { return '' },
+ fromBuffer() { return {} },
+ };
+}
diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts
new file mode 100644
index 0000000..1e4237d
--- /dev/null
+++ b/test/system/amqp/AmqpConnectionTest.ts
@@ -0,0 +1,112 @@
+/**
+ * Tests AmqpConnection
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Amqp Connection
+ */
+
+import { AmqpConnection as Sut } from "../../../src/system/amqp/AmqpConnection";
+import { AmqpConfig } from "../../../src/system/AmqpPublisher";
+import { EventEmitter } from "events";
+import * as amqplib from "amqplib";
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+describe( 'AmqpConnection', () =>
+{
+ describe( '#connect', () =>
+ {
+ it( "fails when exchange cannot be asserted", () =>
+ {
+ const expected_err = new Error( "test failure" );
+
+ const mock_channel = <amqplib.Channel>(<unknown>{
+ assertExchange() {
+ return Promise.reject( expected_err );
+ },
+ } );
+
+ const mock_connection = <amqplib.Connection>(<unknown>{
+ once() {},
+
+ createChannel() {
+ return Promise.resolve( mock_channel );
+ },
+ } );
+
+ const mock_amqp = <typeof amqplib>(<unknown>{
+ connect() {
+ return Promise.resolve( mock_connection );
+ }
+ } );
+
+ const emitter = new EventEmitter();
+ const conf = <AmqpConfig>{};
+ const sut = new Sut( mock_amqp, conf, emitter );
+
+ return expect( sut.connect() )
+ .to.eventually.be.rejectedWith( expected_err );
+ } );
+ } );
+
+
+ describe( '#reconnect', () =>
+ {
+ it( "is called when there is an error with the connection", () =>
+ {
+ let reconnect_called = false;
+
+ const mock_channel = <amqplib.Channel>(<unknown>{
+ assertExchange() {
+ return Promise.resolve();
+ },
+ } );
+
+ const mock_connection = <amqplib.Connection>Object.create(
+ new EventEmitter()
+ );
+
+ mock_connection.createChannel = (): any => {
+ return Promise.resolve( mock_channel );
+ };
+
+ const mock_amqp = <typeof amqplib>(<unknown>{
+ connect() {
+ return Promise.resolve( mock_connection );
+ }
+ } );
+
+ const emitter = new EventEmitter();
+
+ emitter.on( 'amqp-reconnect', () => { reconnect_called = true } );
+
+ const conf = <AmqpConfig>{};
+ const sut = new Sut( mock_amqp, conf, emitter );
+
+ const result = sut.connect()
+ .then( () => mock_connection.emit( 'error' ) )
+
+ return expect( result )
+ .to.eventually.deep.equal( true )
+ .then( _ => expect( reconnect_called ).to.be.true );
+ } );
+ } );
+} );
+
diff --git a/tsconfig.json b/tsconfig.json
index 83b8f5e..ee03891 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -18,6 +18,7 @@
}
},
"include": [
+ "bin/*",
"src/**/*",
"test/**/*"
]