Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-12-10 17:24:19 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-12-12 10:27:09 -0500
commite3dded760d18013e4f91248b0d12ec6c6a9dc368 (patch)
tree71b93936bbd19fe6d70800bbce4299fdb27d3b94
parent9eb1f3afca4228fa9b69dde10336379094fe0bf9 (diff)
downloadliza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.tar.gz
liza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.tar.bz2
liza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.zip
[DEV-5312] Define document meta data and restructure message writer.
Add better tests for message writer
-rw-r--r--.env12
-rw-r--r--bin/delta-processor.ts17
-rw-r--r--package.json.in7
-rw-r--r--src/bucket/delta.ts9
-rw-r--r--src/document/Document.ts26
-rw-r--r--src/server/db/MongoServerDao.ts2
-rw-r--r--src/system/AmqpPublisher.ts27
-rw-r--r--src/system/DeltaProcessor.ts58
-rw-r--r--src/system/DeltaPublisher.ts280
-rw-r--r--src/system/EventMediator.ts7
-rw-r--r--src/system/MessageWriter.ts44
-rw-r--r--src/system/MetricsCollector.ts45
-rw-r--r--src/system/PrometheusFactory.ts25
-rw-r--r--src/system/StandardLogger.ts1
-rw-r--r--src/system/amqp/AmqpConnection.ts5
-rw-r--r--src/system/avro/V1MessageWriter.ts259
-rw-r--r--src/system/avro/schema.avsc22
-rw-r--r--src/system/db/MongoDeltaDao.ts5
-rw-r--r--test/system/DeltaProcessorTest.ts226
-rw-r--r--test/system/DeltaPublisherTest.ts443
-rw-r--r--test/system/EventMediatorTest.ts4
-rw-r--r--test/system/MetricsCollectorTest.ts2
-rw-r--r--test/system/V1MessageWriterTest.ts532
-rw-r--r--test/system/amqp/AmqpConnectionTest.ts49
24 files changed, 1302 insertions, 805 deletions
diff --git a/.env b/.env
index 2959fba..21f018c 100644
--- a/.env
+++ b/.env
@@ -1,15 +1,17 @@
-NODE_ENV=dev
-AMQP_HOSTNAME=localhost
+AMQP_HOST=localhost
AMQP_PORT=5672
-AMQP_USERNAME=
-AMQP_PASSWORD=
+AMQP_USER=
+AMQP_PASS=
AMQP_FRAMEMAX=0
AMQP_HEARTBEAT=2
AMQP_VHOST=
AMQP_EXCHANGE=
AMQP_RETRIES=30
AMQP_RETRY_WAIT=1
-PROM_HOSTNAME=
+PROM_HOST=
PROM_PORT=9091
PROM_PUSH_INTERVAL_MS=5000
+PROM_BUCKETS_START=0
+PROM_BUCKETS_WIDTH=10
+PROM_BUCKETS_COUNT=10
PROCESS_INTERVAL_MS=2000
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index 8a35e51..83e42e3 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -18,13 +18,14 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-import * as amqplib from "amqplib";
+import * as amqplib from 'amqplib';
import { createAmqpConfig } from '../src/system/AmqpPublisher';
import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
import { DeltaProcessor } from '../src/system/DeltaProcessor';
import { DeltaPublisher } from '../src/system/DeltaPublisher';
import { MongoCollection } from '../src/types/mongodb';
import { createAvroEncoder } from '../src/system/avro/AvroFactory';
+import { V1MessageWriter } from '../src/system/avro/V1MessageWriter';
import {
createMongoConfig,
createMongoDB,
@@ -39,8 +40,9 @@ import {
createPrometheusConfig,
} from '../src/system/PrometheusFactory';
import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
-import { parse as avro_parse } from "avro-js";
+import { parse as avro_parse } from 'avro-js';
+require('dotenv-flow').config();
const amqp_conf = createAmqpConfig( process.env );
const prom_conf = createPrometheusConfig( process.env );
@@ -51,12 +53,17 @@ const env = process.env.NODE_ENV || 'Unknown Environment';
const emitter = new EventEmitter();
const log = new StandardLogger( console, ts_ctr, env );
const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter );
-const publisher = new DeltaPublisher(
+
+const message_writer = new V1MessageWriter(
+ createAvroEncoder,
+ avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
+);
+
+const publisher = new DeltaPublisher(
emitter,
ts_ctr,
- createAvroEncoder,
amqp_connection,
- avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
+ message_writer,
);
// Prometheus Metrics
diff --git a/package.json.in b/package.json.in
index 898ec54..49ffc7b 100644
--- a/package.json.in
+++ b/package.json.in
@@ -24,9 +24,10 @@
},
"dependencies": {
- "easejs": "0.2.x",
- "mongodb": "1.2.14",
- "amqplib": "0.5.3"
+ "easejs": "0.2.x",
+ "mongodb": "1.2.14",
+ "dotenv-flow": "3.1.0",
+ "amqplib": "0.5.3"
},
"devDependencies": {
"typescript": "~3.7",
diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts
index b83a8e7..7c5dd6a 100644
--- a/src/bucket/delta.ts
+++ b/src/bucket/delta.ts
@@ -81,6 +81,15 @@ export interface DeltaDocument
/** The document id */
id: DocumentId,
+ /** The entity name */
+ agentName: string,
+
+ /** The entity id */
+ agentEntityId: number,
+
+ /** The time the document was created */
+ startDate: UnixTimestamp,
+
/** The time the document was updated */
lastUpdate: UnixTimestamp,
diff --git a/src/document/Document.ts b/src/document/Document.ts
index 0db893a..8f05bac 100644
--- a/src/document/Document.ts
+++ b/src/document/Document.ts
@@ -18,7 +18,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
- * The term "Quote" is synonymous with "Document"; this project is moving
+ * The term 'Quote' is synonymous with 'Document'; this project is moving
* more toward the latter as it is further generalized.
*/
@@ -31,7 +31,29 @@ export type DocumentId = NominalType<number, 'DocumentId'>;
/**
* Quote (Document) id
*
- * Where the term "Quote" is still used, this will allow for type
+ * Where the term 'Quote' is still used, this will allow for type
* compatibility and an easy transition.
*/
export type QuoteId = DocumentId;
+
+
+/**
+ * Document meta data
+ */
+export type DocumentMeta =
+{
+ /** The document id */
+ id: DocumentId,
+
+ /** The entity name */
+ entity_name: string,
+
+ /** The entity id */
+ entity_id: number,
+
+ /** The time the document was created */
+ startDate: UnixTimestamp,
+
+ /** The time the document was updated */
+ lastUpdate: UnixTimestamp,
+} \ No newline at end of file
diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts
index dd1df2d..3f8128c 100644
--- a/src/server/db/MongoServerDao.ts
+++ b/src/server/db/MongoServerDao.ts
@@ -309,7 +309,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
save_data.id = id;
save_data.pver = quote.getProgramVersion();
save_data.importDirty = 1;
- save_data.published = 0;
+ save_data.published = false;
save_data.lastPremDate = quote.getLastPremiumDate();
save_data.initialRatedDate = quote.getRatedDate();
save_data.explicitLock = quote.getExplicitLockReason();
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index edaa8c1..f41ee63 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -21,8 +21,8 @@
* Publish Amqp message to a queue
*/
-import { DeltaResult } from "../bucket/delta";
-import { DocumentId } from '../document/Document';
+import { DeltaResult } from '../bucket/delta';
+import { DocumentMeta } from '../document/Document';
import { Options } from 'amqplib';
@@ -37,10 +37,10 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
{
return <AmqpConfig>{
protocol: 'amqp',
- hostname: env.AMQP_HOSTNAME,
+ hostname: env.AMQP_HOST,
port: +( env.AMQP_PORT || 0 ),
- username: env.AMQP_USERNAME,
- password: env.AMQP_PASSWORD,
+ username: env.AMQP_USER,
+ password: env.AMQP_PASS,
locale: 'en_US',
frameMax: +( env.AMQP_FRAMEMAX || 0 ),
heartbeat: +( env.AMQP_HEARTBEAT || 0 ),
@@ -52,8 +52,9 @@ export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig
}
-export interface AmqpConfig extends Options.Connect {
- /** The protocol to connect with (should always be "amqp") */
+export interface AmqpConfig extends Options.Connect
+{
+ /** The protocol to connect with (should always be 'amqp') */
protocol: string;
/** The hostname to connect to */
@@ -68,7 +69,7 @@ export interface AmqpConfig extends Options.Connect {
/** A password if one if required */
password?: string;
- /** Locale (should always be "en_US") */
+ /** Locale (should always be 'en_US') */
locale: string;
/** The size in bytes of the maximum frame allowed */
@@ -96,13 +97,13 @@ export interface AmqpPublisher
/**
* Publish quote message to exchange post-rating
*
- * @param doc_id - The doc_id
- * @param delta - The delta
- * @param bucket - The bucket
- * @param ratedata - The rate data bucket
+ * @param meta - document meta data
+ * @param delta - delta
+ * @param bucket - bucket
+ * @param ratedata - rate data bucket
*/
publish(
- doc_id: DocumentId,
+ meta: DocumentMeta,
delta: DeltaResult<any>,
bucket: Record<string, any>,
ratedata?: Record<string, any>,
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 0e5201a..4eba003 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -19,17 +19,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-import { DeltaDao } from "../system/db/DeltaDao";
-import { DocumentId } from "../document/Document";
-import { AmqpPublisher } from "./AmqpPublisher";
-import { EventEmitter } from "events";
+import { DeltaDao } from '../system/db/DeltaDao';
+import { DocumentMeta } from '../document/Document';
+import { AmqpPublisher } from './AmqpPublisher';
+import { EventEmitter } from 'events';
import {
DeltaType,
applyDelta,
DeltaDocument,
Delta,
ReverseDelta,
-} from "../bucket/delta";
+} from '../bucket/delta';
/** Deltas and state of data prior to their application */
type DeltaState = [
@@ -77,6 +77,11 @@ export class DeltaProcessor
}
+ /**
+ * Process the next document
+ *
+ * @param docs - list of documents to process
+ */
private _processNext( docs: DeltaDocument[] ): Promise<void>
{
const doc = docs.shift();
@@ -91,28 +96,38 @@ export class DeltaProcessor
}
+ /**
+ * Process an individual document
+ *
+ * @param doc - individual document to process
+ */
private _processDocument( doc: DeltaDocument ): Promise<void>
{
const deltas = this._getTimestampSortedDeltas( doc );
- const doc_id = doc.id;
const bucket = doc.data;
const ratedata = doc.ratedata || {};
- const last_updated_ts = doc.lastUpdate;
+ const meta = {
+ id: doc.id,
+ entity_name: doc.agentName,
+ entity_id: +doc.agentEntityId,
+ startDate: doc.startDate,
+ lastUpdate: doc.lastUpdate,
+ };
const history = this._applyDeltas( deltas, bucket, ratedata );
- return this._processNextDelta( doc_id, history )
+ return this._processNextDelta( meta, history )
.then( _ =>
- this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
+ this._dao.markDocumentAsProcessed( meta.id, meta.lastUpdate )
)
.then( _ =>
{
- this._emitter.emit( 'document-processed', { doc_id: doc_id } );
+ this._emitter.emit( 'document-processed', { doc_id: meta.id } );
} )
- .catch( e =>
+ .catch( ( e: Error ) =>
{
this._emitter.emit( 'error', e );
- return this._dao.setErrorFlag( doc_id );
+ return this._dao.setErrorFlag( meta.id );
} );
}
@@ -179,8 +194,14 @@ export class DeltaProcessor
}
+ /**
+ * Process the next delta from the history
+ *
+ * @param meta - document meta data
+ * @param history - a history of deltas and their buckets (data, ratedata)
+ */
private _processNextDelta(
- doc_id: DocumentId,
+ meta: DocumentMeta,
history: DeltaState[],
): Promise<void>
{
@@ -191,17 +212,14 @@ export class DeltaProcessor
const [ delta, bucket, ratedata ] = history[ 0 ];
- const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
+ const delta_uid = meta.id + '_' + delta.timestamp + '_' + delta.type;
this._emitter.emit( 'delta-process-start', delta_uid );
- return this._publisher.publish( doc_id, delta, bucket, ratedata )
- .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
+ return this._publisher.publish( meta, delta, bucket, ratedata )
+ .then( _ => this._dao.advanceDeltaIndex( meta.id, delta.type ) )
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
- .then( _ => this._processNextDelta(
- doc_id,
- history.slice( 1 ),
- ) );
+ .then( _ => this._processNextDelta( meta, history.slice( 1 ) ) );
}
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index 3963934..57a5747 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -23,82 +23,43 @@
import { AmqpPublisher } from './AmqpPublisher';
import { Delta } from '../bucket/delta';
-import { EventEmitter } from "events";
-import { DocumentId } from '../document/Document';
+import { EventEmitter } from 'events';
+import { DocumentMeta } from '../document/Document';
import { context } from '../error/ContextError';
import { AmqpError } from '../error/AmqpError';
-import { AvroEncoderCtr } from './avro/AvroFactory';
+import { MessageWriter } from './MessageWriter';
+
import { AmqpConnection } from './amqp/AmqpConnection';
-import { AvroSchema } from "avro-js";
export class DeltaPublisher implements AmqpPublisher
{
- /** A mapping of which delta type translated to which avro event */
- readonly DELTA_MAP: Record<string, string> = {
- data: 'STEP_SAVE',
- ratedata: 'RATE',
- };
-
-
/**
* Delta publisher
*
- * @param _emitter - event emitter instance
- * @param _ts_ctr - a timestamp constructor
- * @param _encoder_ctr - a factory function to create an avro encoder
- * @param _conn - the amqp connection
+ * @param _emitter - event emitter instance
+ * @param _ts_ctr - a timestamp constructor
+ * @param _conn - the amqp connection
+ * @param _writer - message writer
*/
constructor(
- private readonly _emitter: EventEmitter,
- private readonly _ts_ctr: () => UnixTimestamp,
- private readonly _encoder_ctor: AvroEncoderCtr,
- private readonly _conn: AmqpConnection,
- private readonly _schema: AvroSchema,
+ private readonly _emitter: EventEmitter,
+ private readonly _ts_ctr: () => UnixTimestamp,
+ private readonly _conn: AmqpConnection,
+ private readonly _writer: MessageWriter,
) {}
/**
* Publish quote message to exchange post-rating
*
- * @param doc_id - The doc_id
- * @param delta - The delta
- * @param bucket - The bucket
- * @param ratedata - The ratedata bucket
+ * @param meta - document meta data
+ * @param delta - delta
+ * @param bucket - bucket
+ * @param ratedata - rate data bucket
*/
- publish(
- doc_id: DocumentId,
- delta: Delta<any>,
- bucket: Record<string, any>,
- ratedata: Record<string, any> = {},
- ): Promise<void>
- {
- return this._sendMessage( doc_id, delta, bucket, ratedata )
- .then( _ =>
- {
- this._emitter.emit(
- 'delta-publish',
- {
- delta: delta,
- exchange: this._conn.getExchangeName(),
- }
- );
- } );
- }
-
-
- /**
- * Send message to exchange
- *
- * @param doc_id - The doc_id
- * @param delta - The delta to publish
- * @param bucket - The bucket
- * @param ratedata - The ratedata bucket
- *
- * @return whether publish was successful
- */
- private _sendMessage(
- doc_id: DocumentId,
+ publish(
+ meta: DocumentMeta,
delta: Delta<any>,
bucket: Record<string, any>,
ratedata: Record<string, any>,
@@ -106,16 +67,14 @@ export class DeltaPublisher implements AmqpPublisher
{
const ts = this._ts_ctr();
const headers = { version: 1, created: ts };
- const avro_object = this._avroFormat(
+
+ return this._writer.write(
ts,
- doc_id,
+ meta,
delta,
bucket,
- ratedata,
- );
-
- return this.avroEncode( avro_object )
- .then( ( avro_buffer ) =>
+ ratedata
+ ).then( ( avro_buffer: Buffer ) =>
{
const channel = this._conn.getAmqpChannel();
@@ -124,7 +83,7 @@ export class DeltaPublisher implements AmqpPublisher
throw context(
new AmqpError( 'Error sending message: No channel' ),
{
- doc_id: doc_id,
+ doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
},
@@ -144,193 +103,22 @@ export class DeltaPublisher implements AmqpPublisher
throw context(
new Error ( 'Delta publish failed' ),
{
- doc_id: doc_id,
+ doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp,
}
);
}
- } );
- }
-
-
- /**
- * Throw an error with specific information if the schema is invalid
- *
- * @param schema - Avro schema
- * @param data - Data to encode
- */
- private _assertValidAvro(
- schema: AvroSchema,
- data: Record<string, any>,
- ): void
- {
- schema.isValid( data, { errorHook: hook } );
-
- function hook( keys: any, vals: any) {
- throw context( new Error( 'Invalid Avro Schema' ),
- {
- invalid_paths: keys,
- invalid_data: vals,
- }
- );
- }
- }
-
-
- /**
- * Format the avro data with data type labels
- *
- * @param ts - a timestamp
- * @param doc_id - the document id
- * @param delta - the current delta
- * @param bucket - the data bucket
- * @param ratedata - the ratedata bucket
- *
- * @return the formatted data
- */
- private _avroFormat(
- ts: UnixTimestamp,
- doc_id: DocumentId,
- delta: Delta<any>,
- bucket: Record<string, any>,
- ratedata: Record<string, any>,
- ): any
- {
- const delta_formatted = this.setDataTypes( delta.data );
- const bucket_formatted = this.setDataTypes( bucket );
- const ratedata_formatted = this.setDataTypes( ratedata );
- const event_id = this.DELTA_MAP[ delta.type ];
-
- return {
- event: {
- id: event_id,
- ts: ts,
- actor: 'SERVER',
- step: null,
- },
- document: {
- id: doc_id
- },
- data: {
- Data: {
- bucket: bucket_formatted,
- },
- },
- ratedata: {
- Data: {
- bucket: ratedata_formatted,
- },
- },
- delta: {
- Data: {
- bucket: delta_formatted,
- },
- },
- program: {
- Program: {
- id: 'quote_server',
- version: '',
- },
- },
- }
- }
-
-
- /**
- * Encode the data in an avro buffer
- *
- * @param data - the data to encode
- *
- * @return the avro buffer or null if there is an error
- */
- avroEncode( data: Record<string, any> ): Promise<Buffer>
- {
- return new Promise<Buffer>( ( resolve, reject ) =>
- {
- const bufs: Buffer[] = [];
-
- try
- {
- this._assertValidAvro( this._schema, data )
-
- const encoder = this._encoder_ctor( this._schema )
-
- encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
- encoder.on('error', ( err: Error ) => { reject( err ); } )
- encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
- encoder.end( data );
- }
- catch ( e )
+ } )
+ .then( ( _: any ) =>
{
- reject( e );
- }
- } );
- }
-
-
- /**
- * Format the data for avro by add type specifications to the data
- *
- * @param data - the data to format
- *
- * @return the formatted data
- */
- setDataTypes( data: any, top_level: boolean = true ): any
- {
- let data_formatted: any = {};
-
- switch( typeof( data ) )
- {
- case 'object':
- if ( data == null )
- {
- return null;
- }
- else if ( Array.isArray( data ) )
- {
- let arr: any[] = [];
-
- data.forEach( ( datum ) =>
- {
- arr.push( this.setDataTypes( datum, false ) );
- } );
-
- data_formatted = ( top_level )
- ? arr
- : { 'array': arr };
- }
- else
- {
- let datum_formatted: any = {};
-
- Object.keys( data).forEach( ( key: string ) =>
+ this._emitter.emit(
+ 'delta-publish',
{
- const datum = this.setDataTypes( data[ key ], false );
-
- datum_formatted[ key ] = datum;
-
- } );
-
- data_formatted = ( top_level )
- ? datum_formatted
- : { "map": datum_formatted };
- }
- break;
-
- case 'boolean':
- return { 'boolean': data };
-
- case 'number':
- return { 'double': data };
-
- case 'string':
- return { 'string': data };
-
- case 'undefined':
- return null;
- }
-
- return data_formatted;
+ delta: delta,
+ exchange: this._conn.getExchangeName(),
+ }
+ );
+ } );
}
}
diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts
index cd5b703..b95536c 100644
--- a/src/system/EventMediator.ts
+++ b/src/system/EventMediator.ts
@@ -48,7 +48,7 @@ export class EventMediator
msg
) );
- this._emitter.on( 'amqp-conn-error', ( msg ) =>
+ this._emitter.on( 'amqp-conn-warn', ( msg ) =>
this._log.warning( 'AMQP Connection Error', msg ) );
this._emitter.on( 'amqp-reconnect', () =>
@@ -68,6 +68,11 @@ export class EventMediator
}
+ /**
+ * Handle an error event
+ *
+ * @param e - any
+ */
private _handleError( e: any ): void
{
let msg: string = '';
diff --git a/src/system/MessageWriter.ts b/src/system/MessageWriter.ts
new file mode 100644
index 0000000..8ee2b84
--- /dev/null
+++ b/src/system/MessageWriter.ts
@@ -0,0 +1,44 @@
+/**
+ * Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Write a message to be published to a queue
+ */
+import { DocumentMeta } from '../document/Document';
+import { DeltaResult } from '../bucket/delta';
+
+export interface MessageWriter
+{
+ /**
+ * Write the data to a message
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ */
+ write(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: DeltaResult<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): Promise<Buffer>
+} \ No newline at end of file
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index bc4a6cc..9432a9d 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -22,7 +22,7 @@
*/
import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
-import { EventEmitter } from "events";
+import { EventEmitter } from 'events';
import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory';
const client = require( 'prom-client' )
@@ -72,6 +72,7 @@ export class MetricsCollector
* @param _factory - A factory to create prometheus components
* @param _conf - Prometheus configuration
* @param _emitter - Event emitter
+ * @param _timer - A timer function to create a tuple timestamp
*/
constructor(
private readonly _factory: PrometheusFactory,
@@ -96,9 +97,9 @@ export class MetricsCollector
client,
this._process_time_name,
this._process_time_help,
- 0,
- 10,
- 10,
+ this._conf.buckets_start,
+ this._conf.buckets_width,
+ this._conf.buckets_count,
);
this._total_error = this._factory.createCounter(
@@ -123,7 +124,8 @@ export class MetricsCollector
this._push_interval = setInterval( () =>
{
this._gateway.pushAdd(
- { jobName: 'liza_delta_metrics' }, this.pushCallback
+ { jobName: 'liza_delta_metrics' },
+ this.getPushCallback( this )
);
}, this._conf.push_interval_ms
);
@@ -145,7 +147,7 @@ export class MetricsCollector
/**
* List to events to update metrics
*/
- private hookMetrics()
+ private hookMetrics(): void
{
this._emitter.on(
'delta-process-start',
@@ -165,35 +167,36 @@ export class MetricsCollector
}
);
- this._emitter.on(
- 'delta-process-error',
- ( _ ) => this._total_error.inc()
- );
+ this._emitter.on( 'error', ( _ ) => this._total_error.inc() );
}
/**
* Handle push error
*
- * @param error - Any errors that occurred
- * @param response - The http response
- * @param body - The resposne body
+ * @param self - Metrics Collector object
+ *
+ * @return a function to handle the pushAdd callback
*/
- private pushCallback(
- error?: Error | undefined,
- _response?: any,
- _body?: any
- ): void
+ private getPushCallback( self: MetricsCollector ): () => void
{
- if ( error )
+ return (
+ error?: Error | undefined,
+ _response?: any,
+ _body?: any
+ ): void =>
{
- this._emitter.emit( 'error', error );
+ if ( error )
+ {
+ self._emitter.emit( 'error', error );
+ }
}
}
-
/**
* Update metrics with current error count
+ *
+ * @param count - the number of errors found
*/
updateErrorCount( count: number ): void
{
diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts
index 0cf059f..7330ea9 100644
--- a/src/system/PrometheusFactory.ts
+++ b/src/system/PrometheusFactory.ts
@@ -35,6 +35,15 @@ export declare type PrometheusConfig = {
/** The rate (in milliseconds) at which metrics are pushed */
push_interval_ms: number;
+
+ /** The starting point for process time buckets */
+ buckets_start: number;
+
+ /** The width of process time buckets */
+ buckets_width: number;
+
+ /** The number of process time buckets */
+ buckets_count: number;
}
@@ -50,10 +59,13 @@ export function createPrometheusConfig(
): PrometheusConfig
{
return <PrometheusConfig>{
- 'hostname': env.prom_hostname,
- 'port': +( env.prom_port || 0 ),
- 'env': process.env.NODE_ENV,
- 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
+ hostname: env.PROM_HOST,
+ port: +( env.PROM_PORT || 0 ),
+ env: process.env.NODE_ENV,
+ push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ),
+ buckets_start: +( process.env.PROM_BUCKETS_START || 0 ),
+ buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ),
+ buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ),
};
}
@@ -63,8 +75,9 @@ export class PrometheusFactory
/**
* Create a PushGateway
*
- * @param client - prometheus client
- * @param url - the url of the push gateway
+ * @param client - prometheus client
+ * @param hostname - push gateway url
+ * @param port - push gateway port
*
* @return the gateway
*/
diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts
index d69c3d4..cdd062d 100644
--- a/src/system/StandardLogger.ts
+++ b/src/system/StandardLogger.ts
@@ -160,6 +160,7 @@ export class StandardLogger implements PsrLogger
*
* @param msg - the string or object to log
* @param level - the log level
+ * @param context - additional message context
*
* @returns a structured logging object
*/
diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts
index 6d50bfc..410f808 100644
--- a/src/system/amqp/AmqpConnection.ts
+++ b/src/system/amqp/AmqpConnection.ts
@@ -24,8 +24,6 @@ import * as amqplib from "amqplib";
/**
* Connection to AMQP exchange
- *
- * XXX: Needs tests!
*/
export class AmqpConnection
{
@@ -39,6 +37,7 @@ export class AmqpConnection
/**
* Amqp Connection
*
+ * @param _conf - amqp library
* @param _conf - amqp configuration
* @param _emitter - event emitter instance
*/
@@ -65,7 +64,7 @@ export class AmqpConnection
*/
this._conn.once( 'error', e =>
{
- this._emitter.emit( 'amqp-conn-error', e );
+ this._emitter.emit( 'amqp-conn-warn', e );
this._reconnect();
} );
diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts
new file mode 100644
index 0000000..09ee64a
--- /dev/null
+++ b/src/system/avro/V1MessageWriter.ts
@@ -0,0 +1,259 @@
+/**
+ * Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Write a message to be published to a queue
+ */
+import { DocumentMeta } from '../../document/Document';
+import { Delta } from '../../bucket/delta';
+import { AvroEncoderCtr } from '../avro/AvroFactory';
+import { AvroSchema } from 'avro-js';
+import { MessageWriter } from '../MessageWriter';
+import { context } from '../../error/ContextError';
+
+
+export class V1MessageWriter implements MessageWriter
+{
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record<string, string> = {
+ data: 'STEP_SAVE',
+ ratedata: 'RATE',
+ };
+
+
+ /**
+ * Delta publisher
+ *
+ * @param _encoder_ctr - a factory function to create an avro encoder
+ * @param _conn - the amqp connection
+ */
+ constructor(
+ private readonly _encoder_ctor: AvroEncoderCtr,
+ private readonly _schema: AvroSchema,
+ ) {}
+
+
+ /**
+ * Write the data to a message
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ */
+ write(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): Promise<Buffer>
+ {
+ const avro_object = this._avroFormat(
+ ts,
+ meta,
+ delta,
+ bucket,
+ ratedata,
+ );
+
+ return this.avroEncode( avro_object );
+ }
+
+
+ /**
+ * Format the avro data with data type labels
+ *
+ * @param ts - timestamp
+ * @param meta - document meta data
+ * @param delta - current delta
+ * @param bucket - data bucket
+ * @param ratedata - ratedata bucket
+ *
+ * @return the formatted data
+ */
+ private _avroFormat(
+ ts: UnixTimestamp,
+ meta: DocumentMeta,
+ delta: Delta<any>,
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): any
+ {
+ const delta_formatted = this.setDataTypes( delta.data );
+ const bucket_formatted = this.setDataTypes( bucket );
+ const ratedata_formatted = this.setDataTypes( ratedata );
+ const event_id = this.DELTA_MAP[ delta.type ];
+
+ return {
+ event: {
+ id: event_id,
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: meta.id,
+ created: meta.startDate,
+ modified: meta.lastUpdate,
+ },
+ session: {
+ Session: {
+ entity_id: meta.entity_id,
+ entity_name: meta.entity_name,
+ },
+ },
+ data: {
+ Data: {
+ bucket: bucket_formatted,
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: ratedata_formatted,
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta_formatted,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: '',
+ },
+ },
+ }
+ }
+
+
+ /**
+ * Encode the data in an avro buffer
+ *
+ * @param data - the data to encode
+ *
+ * @return the avro buffer or null if there is an error
+ */
+ avroEncode( data: Record<string, any> ): Promise<Buffer>
+ {
+ return new Promise<Buffer>( ( resolve, reject ) =>
+ {
+ const bufs: Buffer[] = [];
+
+ try
+ {
+ this._schema.isValid(
+ data,
+ {
+ errorHook: ( keys: any, vals: any) =>
+ {
+ throw context(
+ new Error( 'Invalid Avro Schema' ),
+ {
+ invalid_paths: keys,
+ invalid_data: vals,
+ }
+ );
+ }
+ }
+ );
+
+ const encoder = this._encoder_ctor( this._schema )
+
+ encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
+ encoder.on('error', ( err: Error ) => { reject( err ); } )
+ encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } )
+ encoder.end( data );
+ }
+ catch ( e )
+ {
+ reject( e );
+ }
+ } );
+ }
+
+
+ /**
+ * Format the data for avro by add type specifications to the data
+ *
+ * @param data - the data to format
+ * @param top_level - whether we are at the top level of the recursion
+ *
+ * @return the formatted data
+ */
+ setDataTypes( data: any, top_level: boolean = true ): any
+ {
+ let data_formatted: any = {};
+
+ switch( typeof( data ) )
+ {
+ case 'object':
+ if ( data == null )
+ {
+ return null;
+ }
+ else if ( Array.isArray( data ) )
+ {
+ let arr: any[] = [];
+
+ data.forEach( ( datum ) =>
+ {
+ arr.push( this.setDataTypes( datum, false ) );
+ } );
+
+ data_formatted = ( top_level )
+ ? arr
+ : { 'array': arr };
+ }
+ else
+ {
+ let datum_formatted: any = {};
+
+ Object.keys( data).forEach( ( key: string ) =>
+ {
+ const datum = this.setDataTypes( data[ key ], false );
+
+ datum_formatted[ key ] = datum;
+
+ } );
+
+ data_formatted = ( top_level )
+ ? datum_formatted
+ : { 'map': datum_formatted };
+ }
+ break;
+
+ case 'boolean':
+ return { 'boolean': data };
+
+ case 'number':
+ return { 'double': data };
+
+ case 'string':
+ return { 'string': data };
+
+ case 'undefined':
+ return null;
+ }
+
+ return data_formatted;
+ }
+} \ No newline at end of file
diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc
index 53e4a9d..f432e9a 100644
--- a/src/system/avro/schema.avsc
+++ b/src/system/avro/schema.avsc
@@ -34,7 +34,7 @@
},
{
"name": "step",
- "type":[
+ "type": [
"null",
{
"type": "record",
@@ -78,6 +78,26 @@
}
},
{
+ "name": "session",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Session",
+ "fields": [
+ {
+ "name": "entity_name",
+ "type": "string"
+ },
+ {
+ "name": "entity_id",
+ "type": "int"
+ }
+ ]
+ }
+ ]
+ },
+ {
"name": "data",
"type": [
"null",
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 8d45684..e058980 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -40,6 +40,9 @@ export class MongoDeltaDao implements DeltaDao
/** The document fields to read */
readonly RESULT_FIELDS: Record<string, number> = {
id: 1,
+ agentName: 1,
+ agentEntityId: 1,
+ startDate: 1,
lastUpdate: 1,
data: 1,
ratedata: 1,
@@ -70,7 +73,7 @@ export class MongoDeltaDao implements DeltaDao
this._collection.find(
{
published: false,
- deltaError: false,
+ deltaError: { $ne: true },
},
{ fields: this.RESULT_FIELDS },
( e, cursor ) =>
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index 2cd68e2..1587bb2 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -22,7 +22,7 @@
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
-import { DeltaDocument } from "../../src/bucket/delta";
+import { DeltaDocument } from '../../src/bucket/delta';
import { DocumentId } from '../../src/document/Document';
import { EventEmitter } from 'events';
@@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () =>
expected: any
}[]>[
{
- label: "No deltas are processed",
+ label: 'No deltas are processed',
given: [
{
id: 123,
@@ -53,9 +53,9 @@ describe( 'system.DeltaProcessor', () =>
expected: [],
},
- // when quote is initialized: { foo: [ "" ], state: [ "a" ] }
+ // when quote is initialized: { foo: [ '' ], state: [ 'a' ] }
{
- label: "Publishes deltas in order",
+ label: 'Publishes deltas in order',
given: [
{
@@ -63,13 +63,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
- foo: [ "third" ],
- state: [ "a", "b", "c", "d" ],
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {
- prem: [ "rate_second" ],
- state: [ "i", "ii", "iii" ],
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
},
rdelta: {
@@ -77,21 +77,21 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 3,
data: {
- foo: [ "first" ],
+ foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
},
{
timestamp: 5,
data: {
- foo: [ "second" ],
+ foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
},
@@ -101,14 +101,14 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 4,
data: {
- prem: [ "rate_first" ],
+ prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
},
@@ -122,12 +122,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -136,16 +136,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
@@ -153,12 +153,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "first" ],
+ foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
bucket: {
- foo: [ "second" ],
- state: [ "a", "b", "c" ],
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
},
ratedata: {},
},
@@ -167,16 +167,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "rate_first" ],
+ prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
bucket: {
- foo: [ "second" ],
- state: [ "a", "b", "c" ],
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
},
ratedata: {
- prem: [ "rate_second" ],
- state: [ "i", "ii", "iii" ],
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
},
},
@@ -184,12 +184,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "second" ],
+ foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
bucket: {
- foo: [ "third" ],
- state: [ "a", "b", "c", "d" ],
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {},
},
@@ -197,7 +197,7 @@ describe( 'system.DeltaProcessor', () =>
},
{
- label: "Publishes deltas in order for multiple documents",
+ label: 'Publishes deltas in order for multiple documents',
given: [
{
@@ -205,13 +205,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
rdelta: {
@@ -219,7 +219,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
},
@@ -229,7 +229,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 4,
data: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
},
@@ -245,13 +245,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 121212123,
data: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem2: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
rdelta: {
@@ -259,7 +259,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
- foo2: [ "" ],
+ foo2: [ '' ],
state: [ undefined, null ],
},
},
@@ -269,7 +269,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 3,
data: {
- prem2: [ "" ],
+ prem2: [ '' ],
state: [ undefined, null ],
},
},
@@ -283,12 +283,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -297,16 +297,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
@@ -314,12 +314,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
- foo2: [ "" ],
+ foo2: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -328,37 +328,37 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
- prem2: [ "" ],
+ prem2: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem2: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
],
},
{
- label: "trims delta array based on index",
+ label: 'trims delta array based on index',
given: [
{
id: 111,
lastUpdate: 123123123,
- data: { foo: [ "second" ] },
+ data: { foo: [ 'second' ] },
ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ "" ] },
+ data: { foo: [ '' ] },
timestamp: 123,
},
{
- data: { foo: [ "first" ] },
+ data: { foo: [ 'first' ] },
timestamp: 234,
},
],
@@ -371,8 +371,8 @@ describe( 'system.DeltaProcessor', () =>
expected: [
{
doc_id: 111,
- rdelta: { foo: [ "first" ] },
- bucket: { foo: [ "second" ] },
+ rdelta: { foo: [ 'first' ] },
+ bucket: { foo: [ 'second' ] },
ratedata: {}
},
],
@@ -390,14 +390,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ doc_id: meta.id,
rdelta: delta.data,
bucket: bucket,
ratedata: ratedata,
@@ -422,12 +422,19 @@ describe( 'system.DeltaProcessor', () =>
const dao = createMockDeltaDao();
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
+ const entity_num = 'Some Agency';
+ const entity_id = 4321;
+ const lastUpdate = <UnixTimestamp>123123123;
+ const createdData = <UnixTimestamp>234234234;
const doc = <DeltaDocument[]>[ {
- id: <DocumentId>123,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>123,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: lastUpdate,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -439,11 +446,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
- id: <DocumentId>234,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>234,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -457,13 +467,25 @@ describe( 'system.DeltaProcessor', () =>
const expected_published = [
{
- doc_id: 123,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
},
{
- doc_id: 234,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 234,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@@ -485,14 +507,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ meta: meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,
@@ -525,11 +547,14 @@ describe( 'system.DeltaProcessor', () =>
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
const doc = <DeltaDocument[]>[ {
- id: <DocumentId>123,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>123,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -541,11 +566,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
- id: <DocumentId>234,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>234,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -559,7 +587,13 @@ describe( 'system.DeltaProcessor', () =>
// Only one is published
const expected_published = [ {
- doc_id: 123,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@@ -577,14 +611,14 @@ describe( 'system.DeltaProcessor', () =>
Promise.reject( new Error( expected_error ) );
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
index 314c7da..f352b0c 100644
--- a/test/system/DeltaPublisherTest.ts
+++ b/test/system/DeltaPublisherTest.ts
@@ -22,20 +22,17 @@
import { AmqpConnection } from '../../src/system/amqp/AmqpConnection';
import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
-import { DocumentId } from '../../src/document/Document';
-import { Duplex } from 'stream';
-import { EventEmitter } from "events";
+import { DocumentId, DocumentMeta } from '../../src/document/Document';
+import { EventEmitter } from 'events';
import { hasContext } from '../../src/error/ContextError';
import { AmqpError } from '../../src/error/AmqpError';
import { Channel } from 'amqplib';
-import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
+import { MessageWriter } from '../../src/system/MessageWriter';
-import { AvroSchema } from "avro-js";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
-const sinon = require( 'sinon' );
describe( 'server.DeltaPublisher', () =>
{
@@ -49,6 +46,15 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
conn.getAmqpChannel = () =>
{
return <Channel>{
@@ -63,23 +69,10 @@ describe( 'server.DeltaPublisher', () =>
};
};
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const sut = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema,
- );
+ const sut = new Sut( emitter, ts_ctr, conn, writer );
return expect(
- sut.publish( <DocumentId>123, delta, bucket, ratedata )
+ sut.publish( meta, delta, bucket, ratedata )
).to.eventually.deep.equal( undefined )
.then( _ =>
{
@@ -119,29 +112,25 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
- const doc_id = <DocumentId>123;
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
const expected = {
- doc_id: doc_id,
+ doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp
}
conn.getAmqpChannel = getChannelF;
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const result = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema,
- ).publish( doc_id, delta, bucket, ratedata );
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith(
@@ -158,276 +147,47 @@ describe( 'server.DeltaPublisher', () =>
} )
] );
} ) );
- } );
-
- describe( '#avroEncode parses', () =>
- {
- [
- {
- label: 'Null value',
- valid: true,
- delta_data: { foo: null },
- },
- {
- label: 'Null array',
- valid: true,
- delta_data: { foo: { "array": [ null ] } },
- },
- {
- label: 'Boolean value',
- valid: true,
- delta_data: { foo: { "array": [
- { "boolean": true },
- ] } },
- },
- {
- label: 'Simple string',
- valid: true,
- delta_data: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] } },
- },
- {
- label: 'Simple int',
- valid: true,
- delta_data: { foo: { "array": [
- { "double": 123 },
- ] } },
- },
- {
- label: 'Nested array',
- valid: true,
- delta_data: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- ] },
- ] } },
- },
- {
- label: 'Array with nulls',
- valid: true,
- delta_data: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- null,
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- valid: true,
- delta_data: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123321 },
- null,
- ] }
- ] } },
- },
- {
- label: 'Non-array',
- valid: false,
- delta_data: { foo: 'bar' },
- },
- {
- label: 'Map objects',
- valid: true,
- delta_data: { "foo": { "array": [
- { "map": {
- "bar": { "map": {
- "baz": { "double": 1572903485000 },
- } }
- } }
- ] } },
- }
- ].forEach( ( { label, delta_data, valid } ) =>
- {
- it( label, () =>
- {
- const emitter = createMockEventEmitter();
- const conn = createMockAmqpConnection();
- const data = createMockData( delta_data );
-
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const sut = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema
- );
-
- sut.avroEncode( data )
- .then( b =>
- {
- expect( typeof(b) ).to.equal( 'object' );
- expect( valid ).to.be.true;
- } )
- .catch( _ =>
- {
- expect( valid ).to.be.false;
- } );
- } );
- } );
- } );
- describe( '#setDataTypes annotates', () =>
- {
- [
- {
- label: 'Null',
- delta_data: null,
- expected: null,
- },
- {
- label: 'Null Value',
- delta_data: { foo: null },
- expected: { foo: null },
- },
- {
- label: 'Boolean Value',
- delta_data: { foo: [ true ] },
- expected: { foo: { "array": [
- { "boolean": true },
- ] } },
- },
- {
- label: 'Simple string',
- delta_data: { foo: [
- 'bar',
- 'baz',
- ] },
- expected: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] } },
- },
- {
- label: 'Simple int',
- delta_data: { foo: [
- 123
- ] },
- expected: { foo: { "array": [
- { "double": 123 },
- ] } },
- },
- {
- label: 'Nested array',
- delta_data: { foo: [
- [
- 'bar',
- 'baz',
- ]
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] },
- ] } },
- },
- {
- label: 'Double nested array',
- delta_data: { foo: [
- [
- [
- 'bar',
- 123,
- null
- ],
- ],
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123 },
- null,
- ] },
- ] },
- ] } },
- },
- {
- label: 'Array with nulls',
- delta_data: { foo: [
- 'bar',
- 'baz',
- null
- ] },
- expected: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- null
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- delta_data: { foo: [
- [
- 'bar',
- 123321,
- null,
- ]
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123321 },
- null,
- ] },
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- delta_data: { foo: [
- {
- "bar": {
- "wer": 'qaz',
- "qwe": 1572903485000,
- "asd": true,
- "zxc": null,
- },
- },
- ] },
- expected: { "foo": { "array": [
- { "map": {
- "bar": { "map": {
- "wer": { "string": 'qaz' },
- "qwe": { "double": 1572903485000 },
- "asd": { "boolean": true },
- "zxc": null,
- } },
- } },
- ] } },
- },
- ].forEach( ( { label, delta_data, expected } ) =>
+ it( 'writer#write rejects', () =>
{
- it( label, () =>
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const error = new Error( 'Bad thing happened' );
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ writer.write = (
+ _: any,
+ __: any,
+ ___: any,
+ ____: any,
+ _____: any
+ ): Promise<Buffer> =>
{
- const encoded = 'FooBar';
- const emitter = createMockEventEmitter();
- const conn = createMockAmqpConnection();
- const avroEncoderCtr = createMockEncoder( encoded );
- const stub_schema = <AvroSchema>{};
- const sut = new Sut(
- emitter,
- ts_ctr,
- avroEncoderCtr,
- conn,
- stub_schema,
- );
- const actual = sut.setDataTypes( delta_data );
+ return Promise.reject( error );
+ };
- expect( actual ).to.deep.equal( expected );
- } );
- } );
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ return expect( e ).to.deep.equal( error );
+ } )
+ ] );
+ } )
} );
} );
@@ -438,26 +198,6 @@ function ts_ctr(): UnixTimestamp
}
-function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
-{
- return ( _schema: AvroSchema ) =>
- {
- const mock = sinon.mock( Duplex );
-
- mock.on = ( _: string, __: any ) => {};
- mock.end = ( _: any ) => { return mock_encoded_data; };
-
- return mock;
- };
-}
-
-
-function createMockEventEmitter(): EventEmitter
-{
- return <EventEmitter>{};
-}
-
-
function createMockAmqpConnection(): AmqpConnection
{
return <AmqpConnection>{
@@ -467,39 +207,6 @@ function createMockAmqpConnection(): AmqpConnection
}
-function createMockData( delta_data: any ): any
-{
-
- return {
- event: {
- id: 'RATE',
- ts: 1573856916,
- actor: 'SERVER',
- step: null,
- },
- document: {
- id: 123123,
- created: 1573856916,
- modified: 1573856916,
- top_visited_step: '2',
- },
- data: null,
- ratedata: null,
- delta: {
- Data: {
- bucket: delta_data,
- },
- },
- program: {
- Program: {
- id: 'quote_server',
- version: 'dadaddwafdwa',
- },
- },
- };
-}
-
-
function createMockBucketData(): Record<string, any>
{
return {
@@ -518,26 +225,12 @@ function createMockDelta(): Delta<any>
}
-function createMockEncoderCtor( stub_schema: AvroSchema ):
- ( schema: AvroSchema ) => Duplex
+function createMockWriter(): MessageWriter
{
- const events = <Record<string, () => void>>{};
-
- const mock_duplex = <Duplex>(<unknown>{
- on( event_name: string, callback: () => void )
+ return <MessageWriter>{
+ write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer>
{
- events[ event_name ] = callback;
- },
-
- end()
- {
- events.end();
- },
- } );
-
- return ( schema: AvroSchema ): Duplex =>
- {
- expect( schema ).to.equal( stub_schema );
- return mock_duplex;
+ return Promise.resolve( Buffer.from( '' ) );
+ }
};
-}
+} \ No newline at end of file
diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts
index caab191..abfbef8 100644
--- a/test/system/EventMediatorTest.ts
+++ b/test/system/EventMediatorTest.ts
@@ -62,11 +62,11 @@ describe( 'system.EventLogger captures and logs events', () =>
expect( method_called ).to.be.true;
} );
- it( 'amqp-conn-error triggers log#warning', () =>
+ it( 'amqp-conn-warn triggers log#warning', () =>
{
let method_called = false;
- const event_id = 'amqp-conn-error';
+ const event_id = 'amqp-conn-warn';
const emitter = new EventEmitter();
const log = createMockLogger();
diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts
index eafc77d..9a36584 100644
--- a/test/system/MetricsCollectorTest.ts
+++ b/test/system/MetricsCollectorTest.ts
@@ -72,7 +72,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
const sut = new Sut( factory, conf, emitter, timer );
- emitter.emit( 'delta-process-error' );
+ emitter.emit( 'error' );
expect( counter_called ).to.be.true;
diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts
new file mode 100644
index 0000000..271d735
--- /dev/null
+++ b/test/system/V1MessageWriterTest.ts
@@ -0,0 +1,532 @@
+/**
+ * V1 Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Tests for Version 1 of the avro message writer
+ */
+
+import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter';
+import { hasContext, context } from '../../src/error/ContextError';
+import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
+import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
+import { DocumentMeta, DocumentId } from '../../src/document/Document';
+import { Duplex } from 'stream';
+import { AvroSchema } from 'avro-js';
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+const sinon = require( 'sinon' );
+
+describe( 'system.V1MessageWriter', () =>
+{
+ it( 'Rejects improperly formatted data', () =>
+ {
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const error = new Error( 'Oh no' );
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ const expected = {
+ invalid_paths: 'Foo',
+ invalid_data: 'Bar',
+ };
+
+ const error_context = context( error, expected );
+
+ schema.isValid = () => { throw error_context; };
+
+ const result = new Sut(
+ createMockEncoderCtor( schema ),
+ schema,
+ ).write( ts, meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ if ( !hasContext( e ) )
+ {
+ return expect.fail();
+ }
+
+ return expect( e.context ).to.deep.equal( expected );
+ } )
+ ] );
+ } );
+
+
+ describe( '#avroEncode parses', () =>
+ {
+ [
+ {
+ label: 'Null value',
+ valid: true,
+ delta_data: { foo: null },
+ },
+ {
+ label: 'Null array',
+ valid: true,
+ delta_data: { foo: { 'array': [ null ] } },
+ },
+ {
+ label: 'Boolean value',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null,
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] }
+ ] } },
+ },
+ {
+ label: 'Non-array',
+ valid: false,
+ delta_data: { foo: 'bar' },
+ },
+ {
+ label: 'Map objects',
+ valid: true,
+ delta_data: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'baz': { 'double': 1572903485000 },
+ } }
+ } }
+ ] } },
+ }
+ ].forEach( ( { label, delta_data, valid } ) =>
+ {
+ it( label, () =>
+ {
+ const data = createMockData( delta_data );
+ const schema = createMockAvroSchema();
+
+ const sut = new Sut(
+ createMockEncoderCtor( schema ),
+ schema
+ );
+
+ sut.avroEncode( data )
+ .then( b =>
+ {
+ expect( typeof(b) ).to.equal( 'object' );
+ expect( valid ).to.be.true;
+ } )
+ .catch( _ =>
+ {
+ expect( valid ).to.be.false;
+ } );
+ } );
+ } );
+ } );
+
+
+ describe( '#setDataTypes annotates', () =>
+ {
+ [
+ {
+ label: 'Null',
+ delta_data: null,
+ expected: null,
+ },
+ {
+ label: 'Null Value',
+ delta_data: { foo: null },
+ expected: { foo: null },
+ },
+ {
+ label: 'Boolean Value',
+ delta_data: { foo: [ true ] },
+ expected: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ delta_data: { foo: [
+ 123
+ ] },
+ expected: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 'baz',
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Double nested array',
+ delta_data: { foo: [
+ [
+ [
+ 'bar',
+ 123,
+ null
+ ],
+ ],
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123 },
+ null,
+ ] },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ null
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 123321,
+ null,
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] },
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ {
+ 'bar': {
+ 'wer': 'qaz',
+ 'qwe': 1572903485000,
+ 'asd': true,
+ 'zxc': null,
+ },
+ },
+ ] },
+ expected: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'wer': { 'string': 'qaz' },
+ 'qwe': { 'double': 1572903485000 },
+ 'asd': { 'boolean': true },
+ 'zxc': null,
+ } },
+ } },
+ ] } },
+ },
+ ].forEach( ( { label, delta_data, expected } ) =>
+ {
+ it( label, () =>
+ {
+ const encoded = 'FooBar';
+ const avroEncoderCtr = createMockEncoder( encoded );
+ const stub_schema = <AvroSchema>{};
+ const sut = new Sut(
+ avroEncoderCtr,
+ stub_schema,
+ );
+ const actual = sut.setDataTypes( delta_data );
+
+ expect( actual ).to.deep.equal( expected );
+ } );
+ } );
+ } );
+
+
+ it( 'Message is formatted correctly', () =>
+ {
+ const bucket = { foo: [ 'bar', 'baz' ] };
+ const ratedata = {};
+ const doc_id = <DocumentId>123;
+ const entity_name = 'Some Agency';
+ const entity_id = 123;
+ const startDate = <UnixTimestamp>345;
+ const lastUpdate = <UnixTimestamp>456;
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const encoder = createMockEncoderCtor( schema );
+ const meta = <DocumentMeta>{
+ id: doc_id,
+ entity_name: entity_name,
+ entity_id: entity_id,
+ startDate: startDate,
+ lastUpdate: lastUpdate,
+ };
+
+ const delta = <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ };
+
+ const expected = {
+ event: {
+ id: 'STEP_SAVE',
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: doc_id,
+ created: startDate,
+ modified: lastUpdate,
+ },
+ session: {
+ Session: {
+ entity_name: entity_name,
+ entity_id: entity_id,
+ },
+ },
+ data: {
+ Data: {
+ bucket: {
+ 'foo': { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] }
+ },
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: {},
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta.data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: '',
+ },
+ },
+ };
+
+ let is_valid_called = false;
+
+ schema.isValid = ( data: Record<string, any>, _:any ) =>
+ {
+ expect( data ).to.deep.equal( expected );
+
+ is_valid_called = true;
+
+ return null;
+ }
+
+ return expect( new Sut( encoder, schema )
+ .write( ts, meta, delta, bucket, ratedata ) )
+ .to.eventually.deep.equal( Buffer.from( '' ) )
+ .then( _ =>
+ {
+ expect( is_valid_called ).to.be.true;
+ } )
+ } );
+} );
+
+
+function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
+{
+ return ( _schema: AvroSchema ) =>
+ {
+ const mock = sinon.mock( Duplex );
+
+ mock.on = ( _: string, __: any ) => {};
+ mock.end = ( _: any ) => { return mock_encoded_data; };
+
+ return mock;
+ };
+}
+
+
+function createMockData( delta_data: any ): any
+{
+
+ return {
+ event: {
+ id: 'RATE',
+ ts: 1573856916,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: 123123,
+ created: 1573856916,
+ modified: 1573856916,
+ top_visited_step: '2',
+ },
+ data: null,
+ ratedata: null,
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa',
+ },
+ },
+ };
+}
+
+
+function createMockDelta(): Delta<any>
+{
+ return <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ }
+}
+
+
+function createMockBucketData(): Record<string, any>
+{
+ return {
+ foo: [ 'bar', 'baz' ]
+ }
+}
+
+
+function createMockEncoderCtor( stub_schema: AvroSchema ):
+ ( schema: AvroSchema ) => Duplex
+{
+ const events = <Record<string, () => void>>{};
+
+ const mock_duplex = <Duplex>(<unknown>{
+ on( event_name: string, callback: () => void )
+ {
+ events[ event_name ] = callback;
+ },
+
+ end()
+ {
+ events.end();
+ },
+ } );
+
+ return ( schema: AvroSchema ): Duplex =>
+ {
+ expect( schema ).to.equal( stub_schema );
+ return mock_duplex;
+ };
+}
+
+
+function createMockAvroSchema(): AvroSchema
+{
+ return <AvroSchema>{
+ toBuffer() { return null },
+ isValid() { return null },
+ encode() {},
+ toString() { return '' },
+ fromBuffer() { return {} },
+ };
+}
diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts
index 36a332c..1e4237d 100644
--- a/test/system/amqp/AmqpConnectionTest.ts
+++ b/test/system/amqp/AmqpConnectionTest.ts
@@ -41,7 +41,7 @@ describe( 'AmqpConnection', () =>
assertExchange() {
return Promise.reject( expected_err );
},
- });
+ } );
const mock_connection = <amqplib.Connection>(<unknown>{
once() {},
@@ -49,13 +49,13 @@ describe( 'AmqpConnection', () =>
createChannel() {
return Promise.resolve( mock_channel );
},
- });
+ } );
const mock_amqp = <typeof amqplib>(<unknown>{
connect() {
return Promise.resolve( mock_connection );
}
- });
+ } );
const emitter = new EventEmitter();
const conf = <AmqpConfig>{};
@@ -65,5 +65,48 @@ describe( 'AmqpConnection', () =>
.to.eventually.be.rejectedWith( expected_err );
} );
} );
+
+
+ describe( '#reconnect', () =>
+ {
+ it( "is called when there is an error with the connection", () =>
+ {
+ let reconnect_called = false;
+
+ const mock_channel = <amqplib.Channel>(<unknown>{
+ assertExchange() {
+ return Promise.resolve();
+ },
+ } );
+
+ const mock_connection = <amqplib.Connection>Object.create(
+ new EventEmitter()
+ );
+
+ mock_connection.createChannel = (): any => {
+ return Promise.resolve( mock_channel );
+ };
+
+ const mock_amqp = <typeof amqplib>(<unknown>{
+ connect() {
+ return Promise.resolve( mock_connection );
+ }
+ } );
+
+ const emitter = new EventEmitter();
+
+ emitter.on( 'amqp-reconnect', () => { reconnect_called = true } );
+
+ const conf = <AmqpConfig>{};
+ const sut = new Sut( mock_amqp, conf, emitter );
+
+ const result = sut.connect()
+ .then( () => mock_connection.emit( 'error' ) )
+
+ return expect( result )
+ .to.eventually.deep.equal( true )
+ .then( _ => expect( reconnect_called ).to.be.true );
+ } );
+ } );
} );