Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/system/AmqpPublisher.ts2
-rw-r--r--src/system/DeltaLogger.ts135
-rw-r--r--src/system/DeltaProcessor.ts123
-rw-r--r--src/system/DeltaPublisher.ts217
-rw-r--r--src/system/MetricsCollector.ts80
-rw-r--r--src/system/avro/schema.avsc113
-rw-r--r--src/system/db/DeltaDao.ts24
-rw-r--r--src/system/db/MongoDeltaDao.ts212
-rw-r--r--src/system/event/EventDispatcher.ts44
-rw-r--r--src/system/event/EventSubscriber.ts44
-rw-r--r--src/types/mongodb.d.ts28
11 files changed, 786 insertions, 236 deletions
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index bfd6dc3..019ea0d 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -38,5 +38,5 @@ export interface AmqpPublisher
*
* @param delta - The delta to publish
*/
- publish( delta: DeltaResult<any> ): void;
+ publish( delta: DeltaResult<any> ): Promise<null>;
}
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
new file mode 100644
index 0000000..50e616e
--- /dev/null
+++ b/src/system/DeltaLogger.ts
@@ -0,0 +1,135 @@
+/**
+ * Delta logger
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Logger for delta events
+ */
+
+import { EventSubscriber } from "./event/EventSubscriber";
+
+enum LogLevel {
+ DEBUG,
+ INFO,
+ NOTICE,
+ WARNING,
+ ERROR,
+ CRITICAL,
+ ALERT,
+ EMERGENCY,
+};
+
+declare type StructuredLog = {
+ message: string;
+ timestamp: UnixTimestamp;
+ service: string;
+ env: string;
+ severity: string;
+}
+
+export class DeltaLogger
+{
+ /**
+ * Initialize delta logger
+ *
+ * @param _env - The environment ( dev, test, demo, live )
+ * @param _subscriber - An event subscriber
+ * @param _ts_ctr - a timestamp constructor
+ */
+ constructor(
+ private readonly _env: string,
+ private readonly _subscriber: EventSubscriber,
+ private readonly _ts_ctr : () => UnixTimestamp,
+ ) {}
+
+
+ /**
+ * Initialize the logger to look for specific events
+ */
+ init(): void
+ {
+ this._registerEvent( 'document-processed', LogLevel.NOTICE );
+ this._registerEvent( 'delta-publish', LogLevel.NOTICE );
+ this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
+ this._registerEvent( 'mongodb-err', LogLevel.ERROR );
+ this._registerEvent( 'publish-err', LogLevel.ERROR );
+ }
+
+
+ /**
+ * Register an event at a specific log level
+ *
+ * @param event_id - the event id
+ * @param level - the log level
+ */
+ private _registerEvent( event_id: string, level: LogLevel ): void
+ {
+ const logF = this._getLogLevelFunction( level )
+
+ this._subscriber.subscribe( event_id, logF );
+ }
+
+
+ /**
+ * Get a logging function for the specified log level
+ *
+ * @param event_id - the event id
+ *
+ * @return the function to log with
+ */
+ private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void
+ {
+ switch( level )
+ {
+ case LogLevel.DEBUG:
+ case LogLevel.INFO:
+ return ( _ ) => console.info( this._formatLog( _, level ) );
+ case LogLevel.NOTICE:
+ return ( _ ) => console.log( this._formatLog( _, level ) );
+ case LogLevel.WARNING:
+ return ( _ ) => console.warn( this._formatLog( _, level ) );
+ case LogLevel.ERROR:
+ case LogLevel.CRITICAL:
+ case LogLevel.ALERT:
+ case LogLevel.EMERGENCY:
+ return ( _ ) => console.error( this._formatLog( _, level ) );
+ default:
+ return ( _ ) => console.log( "UNKNOWN LOG LEVEL: " + _ );
+ }
+ }
+
+
+ /**
+ * Get structured log object
+ *
+ * @param str - the string to log
+ * @param level - the log level
+ *
+ * @returns a structured logging object
+ */
+ private _formatLog( str: string, level: LogLevel ): StructuredLog
+ {
+ return <StructuredLog>{
+ message: str,
+ timestamp: this._ts_ctr(),
+ service: 'quote-server',
+ env: this._env,
+ severity: LogLevel[level],
+ };
+ }
+}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 678e700..39cdf3d 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao";
import { DeltaResult } from "../bucket/delta";
import { DocumentId } from "../document/Document";
import { AmqpPublisher } from "./AmqpPublisher";
-
+import { EventDispatcher } from "./event/EventDispatcher";
/**
* Process deltas for a quote and publish to a queue
@@ -41,11 +41,14 @@ export class DeltaProcessor
/**
* Initialize processor
*
- * @param _collection Mongo collection
+ * @param _dao - Mongo collection
+ * @param _publisher - Amqp Publisher
+ * @param _dispatcher - Event dispatcher instance
*/
constructor(
- private readonly _dao: DeltaDao,
- private readonly _publisher: AmqpPublisher,
+ private readonly _dao: DeltaDao,
+ private readonly _publisher: AmqpPublisher,
+ private readonly _dispatcher: EventDispatcher
) {}
@@ -56,31 +59,48 @@ export class DeltaProcessor
{
let self = this;
- this._dao.getUnprocessedDocuments( function( docs )
+ this._dao.getUnprocessedDocuments()
+ .then( docs =>
{
- docs.forEach( doc => {
-
- const deltas = self.getTimestampSortedDeltas( doc );
-
- deltas.forEach( delta => {
-
- self._publisher.publish( delta );
-
- });
-
- const last_updated_ts = doc.lastUpdated;
+ docs.forEach( doc =>
+ {
+ const deltas = self.getTimestampSortedDeltas( doc );
const doc_id: DocumentId = doc.id;
+ const last_updated_ts = doc.lastUpdate;
- self._dao.markDocumentAsProcessed(
- doc_id,
- last_updated_ts,
- function( err, markedSuccessfully )
+ deltas.forEach( delta =>
+ {
+ self._publisher.publish( delta )
+ .then( _ =>
+ {
+ self._dao.advanceDeltaIndex( doc_id, delta.type );
+ } )
+ .catch( _ =>
{
- console.log( err, markedSuccessfully );
- },
- );
+ // TODO: blow up?
+ } );
+ });
+
+ self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
+ .then( _ =>
+ {
+ this._dispatcher.dispatch(
+ 'document-processed',
+ 'Deltas on document ' + doc_id + ' processed '
+ + 'successfully. Document has been marked as '
+ + 'completely processed.'
+ );
+ } )
+ .catch( err =>
+ {
+ this._dispatcher.dispatch( 'mongodb-err', err );
+ } );
});
- });
+ } )
+ .catch( err =>
+ {
+ this._dispatcher.dispatch( 'mongodb-err', err );
+ } );
}
@@ -91,9 +111,7 @@ export class DeltaProcessor
*
* @return a list of deltas sorted by timestamp
*/
- getTimestampSortedDeltas(
- doc: any,
- ): DeltaResult<any>[]
+ getTimestampSortedDeltas( doc: any ): DeltaResult<any>[]
{
const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA );
const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA );
@@ -113,32 +131,26 @@ export class DeltaProcessor
*
* @return a trimmed list of deltas
*/
- getDeltas(
- doc: any,
- type: MongoDeltaType,
- ): DeltaResult<any>[]
+ getDeltas( doc: any, type: MongoDeltaType ): DeltaResult<any>[]
{
- // Get objects so we can get the index by type
- const deltas_obj = doc.rdelta || {};
+ const deltas_obj = doc.rdelta || {};
+ const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
- // Get type specific deltas
+ // Get type specific delta index
let last_published_index = 0;
if ( doc.lastPublishDelta )
{
- const last_published_indexes = doc.lastPublishDelta;
-
- last_published_index = last_published_indexes[ type ] || 0;
+ last_published_index = doc.lastPublishDelta[ type ] || 0;
}
- const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
-
// Only return the unprocessed deltas
const deltas_trimmed = deltas.slice( last_published_index );
// Mark each delta with its type
- deltas_trimmed.forEach( delta => {
+ deltas_trimmed.forEach( delta =>
+ {
delta.type = type;
- });
+ } );
return deltas_trimmed;
}
@@ -148,14 +160,11 @@ export class DeltaProcessor
* Sort an array of deltas by timestamp
*
* @param a - The first delta to compare
- * @param a - The second delta to compare
+ * @param b - The second delta to compare
*
* @return a sort value
*/
- private _sortByTimestamp(
- a: DeltaResult<any>,
- b: DeltaResult<any>,
- ): number
+ private _sortByTimestamp( a: DeltaResult<any>, b: DeltaResult<any> ): number
{
if ( a.timestamp < b.timestamp )
{
@@ -168,26 +177,4 @@ export class DeltaProcessor
return 0;
}
-
-
- /**
- * Generate amqp config from environment variables
- *
- * @returns the amqp configuration
- */
- // generateConfigFromEnv(): AmqpConfig
- // {
- // return <AmqpConfig>{
- // "protocol": "amqp",
- // "hostname": process.env.hostname,
- // "port": process.env.port,
- // "username": process.env.username,
- // "password": process.env.password,
- // "locale": "en_US",
- // "frameMax": 0,
- // "heartbeat": 0,
- // "vhost": process.env.vhost,
- // "exchange": process.env.exchange,
- // };
- // }
} \ No newline at end of file
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index 57a74b6..c67cde4 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -21,8 +21,9 @@
* Publish delta message to a queue
*/
-import { AmqpPublisher } from "./AmqpPublisher";
-import { DeltaResult } from "../bucket/delta";
+import { AmqpPublisher } from './AmqpPublisher';
+import { DeltaResult } from '../bucket/delta';
+import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
Options,
@@ -31,7 +32,6 @@ import {
const avro = require( 'avro-js' );
-
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
exchange: string;
@@ -41,24 +41,26 @@ export interface AmqpConfig extends Options.Connect {
export class DeltaPublisher implements AmqpPublisher
{
/** The path to the avro schema */
- readonly SCHEMA_PATH = './avro/schema.avsc';
+ readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
- data: 'rate',
- ratedata: 'update',
+ data: 'STEP_SAVE',
+ ratedata: 'RATE',
};
/**
- * Initialize trait
+ * Initialize publisher
*
- * @param _conf - amqp configuration
- * @param _logger - logger instance
+ * @param _conf - amqp configuration
+ * @param _emitter - event emitter instance
+ * @param _ts_ctr - a timestamp constructor
*/
constructor(
- private readonly _conf: AmqpConfig,
- private readonly _logger: any
+ private readonly _conf: AmqpConfig,
+ private readonly _dispatcher: EventDispatcher,
+ private readonly _ts_ctr : () => UnixTimestamp,
) {}
@@ -66,35 +68,65 @@ export class DeltaPublisher implements AmqpPublisher
* Publish quote message to exchange post-rating
*
* @param delta - The delta to publish
+ *
+ * @return whether the message was published successfully
*/
- publish( delta: DeltaResult<any> ): void
+ publish( delta: DeltaResult<any> ): Promise<null>
{
- // check both as we transition from one to the other
const exchange = this._conf.exchange;
- amqpConnect( this._conf )
+ return new Promise<null>( ( resolve, reject ) =>
+ {
+ amqpConnect( this._conf )
.then( conn =>
{
setTimeout( () => conn.close(), 10000 );
return conn.createChannel();
} )
- .then( ch => {
+ .then( ch =>
+ {
ch.assertExchange( exchange, 'fanout', { durable: true } );
- return this._sendMessage( ch, exchange, delta );
+ return this.sendMessage( ch, exchange, delta );
+ } )
+ .then( sentSuccessfully =>
+ {
+ console.log('sentSuccessfully', sentSuccessfully);
+ if ( sentSuccessfully )
+ {
+ this._dispatcher.dispatch(
+ 'delta-publish',
+ "Published " + delta.type + " delta with ts '"
+ + delta.timestamp + "' to '" + exchange
+ + '" exchange',
+ );
+
+ resolve();
+ }
+ else
+ {
+ this._dispatcher.dispatch(
+ 'publish-err',
+ "Error publishing " + delta.type + " delta with ts '"
+ + delta.timestamp + "' to '" + exchange
+ + "' exchange",
+ );
+
+ reject();
+ }
} )
- .then( () => this._logger.log(
- this._logger.PRIORITY_INFO,
- "Published " + delta.type + " delta with timestamp '" +
- delta.timestamp + "' to quote-update exchange '"+
- exchange + "'"
- ) )
- .catch( e => this._logger.log(
- this._logger.PRIORITY_ERROR,
- "Error publishing " + delta.type + " delta with timestamp '" +
- delta.timestamp + "' to quote-update exchange '"+
- exchange + "'" + ": " + e
- ) );
+ .catch( e =>
+ {
+ this._dispatcher.dispatch(
+ 'publish-err',
+ "Error publishing " + delta.type + " delta with ts '"
+ + delta.timestamp + '" to "' + exchange + "' exchange '"
+ + e,
+ )
+
+ reject();
+ } );
+ } );
}
@@ -107,7 +139,7 @@ export class DeltaPublisher implements AmqpPublisher
*
* @return whether publish was successful
*/
- _sendMessage(
+ sendMessage(
channel: Channel,
exchange: string,
delta: DeltaResult<any>,
@@ -118,14 +150,48 @@ export class DeltaPublisher implements AmqpPublisher
created: Date.now(),
};
- const event_id = this.DELTA_MAP[ delta.type ];
+ // Convert all delta datums to string for avro
+ const delta_data = this.avroFormat( delta.data );
+ const event_id = this.DELTA_MAP[ delta.type ];
const data = {
- delta: delta,
- event: event_id,
+ event: {
+ id: event_id,
+ ts: this._ts_ctr(),
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: 123123, // Fix
+ },
+ session: {
+ entity_name: 'Foobar', // Fix
+ entity_id: 123123, // Fix
+ },
+ data: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa', // Fix
+ },
+ },
};
- const avro_buffer = this._avroEncode( data );
+ const avro_buffer = this.avroEncode( data );
+
+ if ( !avro_buffer )
+ {
+ return false;
+ }
// we don't use a routing key; fanout exchange
const routing_key = '';
@@ -144,14 +210,91 @@ export class DeltaPublisher implements AmqpPublisher
*
* @param data - the data to encode
*
- * @return the avro buffer
+ * @return the avro buffer or null if there is an error
*/
- _avroEncode( data: Record<string, any> ): Buffer
+ avroEncode( data: Record<string, any> ): Buffer | null
{
- const type = avro.parse( this.SCHEMA_PATH );
+ let buffer = null;
- const buffer = type.toBuffer( data );
+ try
+ {
+ const type = avro.parse( this.SCHEMA_PATH );
+ buffer = type.toBuffer( data );
+ }
+ catch( e )
+ {
+ this._dispatcher.dispatch(
+ 'avro-parse-err',
+ 'Error encoding data to avro: ' + e,
+ );
+ }
return buffer;
}
+
+
+ /**
+ * Format the data for avro by add type specifications to the data
+ *
+ * @param data - the data to format
+ *
+ * @return the formatted data
+ */
+ avroFormat( data: any, top_level: boolean = true ): any
+ {
+ let data_formatted: any = {};
+
+ switch( typeof( data ) )
+ {
+ case 'object': // Typescript treats arrays as objects
+ if ( data == null )
+ {
+ return null;
+ }
+ else if ( Array.isArray( data ) )
+ {
+ let arr: any[] = [];
+
+ data.forEach( ( datum ) =>
+ {
+ arr.push( this.avroFormat( datum, false ) );
+ } );
+
+ data_formatted = ( top_level )
+ ? arr
+ : { 'array': arr };
+ }
+ else
+ {
+ let datum_formatted: any = {};
+
+ Object.keys( data).forEach( ( key: string ) =>
+ {
+ const datum = this.avroFormat( data[ key ], false );
+
+ datum_formatted[ key ] = datum;
+
+ } );
+
+ data_formatted = ( top_level )
+ ? datum_formatted
+ : { "map": datum_formatted };
+ }
+ break;
+
+ case 'boolean':
+ return { 'boolean': data };
+
+ case 'number':
+ return { 'double': data };
+
+ case 'string':
+ return { 'string': data };
+
+ case 'undefined':
+ return null;
+ }
+
+ return data_formatted;
+ }
}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
new file mode 100644
index 0000000..583b5a6
--- /dev/null
+++ b/src/system/MetricsCollector.ts
@@ -0,0 +1,80 @@
+/**
+ * Metrics Collector
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Collect Metrics for Prometheus
+ */
+
+import { EventSubscriber } from "./event/EventSubscriber";
+
+const client = require('prom-client');
+
+declare type MetricStructure = {
+ path: string;
+ code: number;
+ service: string;
+ env: string;
+}
+
+export class MetricsCollector
+{
+ /**
+ * Initialize delta logger
+ */
+ constructor(
+ private readonly _env: string,
+ private readonly _subscriber: EventSubscriber,
+ ) {}
+
+
+ /**
+ * Initialize the logger to look for specific events
+ */
+ init(): void
+ {
+ const collectDefaultMetrics = client.collectDefaultMetrics;
+
+ console.log( this._subscriber, collectDefaultMetrics)
+ this._formatLog( '', 123 );
+ // this._registerEvent( 'document-processed', LogLevel.NOTICE );
+ // this._registerEvent( 'delta-publish', LogLevel.NOTICE );
+ // this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
+ // this._registerEvent( 'mongodb-err', LogLevel.ERROR );
+ // this._registerEvent( 'publish-err', LogLevel.ERROR );
+ }
+
+
+ /**
+ * Get structured metric object
+ *
+ * @param path - the endpoint being hit
+ * @param code - the response code
+ *
+ * @returns a structured logging object
+ */
+ private _formatLog( path: string, code: number ): MetricStructure
+ {
+ return <MetricStructure>{
+ path: path,
+ code: code,
+ service: 'quote-server',
+ env: this._env,
+ };
+ }
+}
diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc
index ee793a6..4a9a609 100644
--- a/src/system/avro/schema.avsc
+++ b/src/system/avro/schema.avsc
@@ -34,28 +34,31 @@
},
{
"name": "step",
- "type": {
- "type": "record",
- "name": "EventStep",
- "fields": [
- {
- "name": "transition",
- "type": {
- "type": "enum",
- "name": "EventStepTransition",
- "symbols": [ "BACK", "FORWARD", "END" ]
+ "type":[
+ "null",
+ {
+ "type": "record",
+ "name": "EventStep",
+ "fields": [
+ {
+ "name": "transition",
+ "type": {
+ "type": "enum",
+ "name": "EventStepTransition",
+ "symbols": [ "BACK", "FORWARD", "END" ]
+ }
+ },
+ {
+ "name": "src",
+ "type": "string"
+ },
+ {
+ "name": "dest",
+ "type": "string"
}
- },
- {
- "name": "src",
- "type": "string"
- },
- {
- "name": "dest",
- "type": "string"
- }
- ]
- }
+ ]
+ }
+ ]
}
]
}
@@ -70,20 +73,6 @@
{
"name": "id",
"type": "int"
- },
- {
- "name": "created",
- "type": "long",
- "logicalType": "timestamp-millis"
- },
- {
- "name": "modified",
- "type": "long",
- "logicalType": "timestamp-millis"
- },
- {
- "name": "top_visited_step",
- "type": "string"
}
]
}
@@ -115,12 +104,56 @@
"fields": [
{
"name": "bucket",
- "type": {
+ "type":{
"type": "map",
- "values": {
- "type" : "array",
- "items" : "string"
- }
+ "values": [
+ "null",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "boolean",
+ "double",
+ "string"
+ ]
+ }
+ ]
+ },
+ {
+ "type": "map",
+ "values": [
+ "null",
+ "boolean",
+ "double",
+ "string",
+ {
+ "type": "map",
+ "values": [
+ "null",
+ "boolean",
+ "double",
+ "string"
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
}
}
]
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index 53cd8f5..64e1f0f 100644
--- a/src/system/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -28,7 +28,6 @@
*/
import { DocumentId } from "../../document/Document";
-import { PositiveInteger } from "../../numeric";
/** Manage deltas */
@@ -39,23 +38,21 @@ export interface DeltaDao
*
* @return documents in need of processing
*/
- getUnprocessedDocuments(
- callback: ( data: Record<string, any>[] ) => void,
- ): this;
+ getUnprocessedDocuments(): Promise<Record<string, any>[]>
/**
* Set the document's processed index
*
- * @param doc_id - The document whose index will be set
- * @param index - The index to set
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ *
+ * @return any errors that occured
*/
- advanceDeltaIndexByType(
+ advanceDeltaIndex(
doc_id: DocumentId,
type: string,
- index: PositiveInteger,
- callback: ( err: NullableError, indexHasAdvanced: boolean ) => void,
- ): this;
+ ): Promise<NullableError>
/**
@@ -68,9 +65,8 @@ export interface DeltaDao
* @return true if the document was successfully marked as processed
*/
markDocumentAsProcessed(
- doc_id: DocumentId,
- last_update_ts: UnixTimestamp,
- callback: ( err: NullableError, markedSuccessfully: boolean ) => void,
- ): this;
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ ): Promise<NullableError>
}
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index cebf453..443ae85 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -22,10 +22,8 @@
*/
import { DocumentId } from "../../document/Document";
-import { PositiveInteger } from "../../numeric";
-import { MongoCollection } from "mongodb";
import { DeltaDao } from "./DeltaDao";
-
+import { MongoCollection } from "mongodb";
export type MongoDeltaType = 'ratedata' | 'data';
@@ -33,56 +31,134 @@ export type MongoDeltaType = 'ratedata' | 'data';
/** Manage deltas */
export class MongoDeltaDao implements DeltaDao
{
+ /** Collection used to store quotes */
+ readonly COLLECTION: string = 'quotes';
+
/** The ratedata delta type */
static readonly DELTA_RATEDATA: string = 'ratedata';
/** The data delta type */
static readonly DELTA_DATA: string = 'data';
+ /** The mongo quotes collection */
+ private _collection?: MongoCollection | null;
+
/**
* Initialize connection
*
- * @param _collection Mongo collection
+ * @param _db Mongo db
*/
constructor(
- private readonly _collection: MongoCollection,
+ private readonly _db: any,
) {}
/**
- * Get documents in need of processing
+ * Attempts to connect to the database
*
- * @return documents in need of processing
+ * connectError event will be emitted on failure.
+ *
+ * @return any errors that occured
*/
- getUnprocessedDocuments(
- callback: ( data: Record<string, any>[] ) => void,
- ): this
+ init(): Promise<NullableError>
{
- var self = this;
+ var dao = this;
- this._collection.find(
- { published: false },
- {},
- function( _err, cursor )
+ return new Promise( ( resolve, reject ) =>
+ {
+ // attempt to connect to the database
+ this._db.open( function( err: any, db: any )
{
- cursor.toArray( function( _err: NullableError, data: any[] )
+ // if there was an error, don't bother with anything else
+ if ( err )
{
- // was the quote found?
- if ( data.length == 0 )
+ // in some circumstances, it may just be telling us that we're
+ // already connected (even though the connection may have been
+ // broken)
+ if ( err.errno !== undefined )
{
- callback.call( self, [] );
-
+ reject( 'Error opening mongo connection: ' + err );
return;
}
+ }
- // return the quote data
- callback.call( self, data );
- });
+ // quotes collection
+ db.collection(
+ dao.COLLECTION,
+ function(
+ _err: any,
+ collection: MongoCollection,
+ ) {
+ // for some reason this gets called more than once
+ if ( collection == null )
+ {
+ return;
+ }
+
+ // initialize indexes
+ collection.createIndex(
+ [ ['id', 1] ],
+ true,
+ function( err: any, _index: { [P: string]: any } )
+ {
+ if ( err )
+ {
+ reject( 'Error creating index: ' + err );
+ return;
+ }
+
+ // mark the DAO as ready to be used
+ dao._collection = collection;
+ resolve();
+ return;
+ }
+ );
+ }
+ );
+ });
+ } );
+ }
+
+
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(): Promise<Record<string, any>[]>
+ {
+ var self = this;
+
+ return new Promise( ( resolve, reject ) =>
+ {
+ if ( !self._collection )
+ {
+ reject( 'Database not ready' );
+ return;
}
- )
- return this;
+
+ this._collection!.find(
+ { published: false },
+ {},
+ function( _err, cursor )
+ {
+ cursor.toArray( function( _err: NullableError, data: any[] )
+ {
+ // was the quote found?
+ if ( data.length == 0 )
+ {
+ resolve( [] );
+ return;
+ }
+
+ // return the quote data
+ resolve( data );
+ });
+ }
+ )
+ } );
}
@@ -91,42 +167,35 @@ export class MongoDeltaDao implements DeltaDao
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
- * @param index - Index to set
- * @param callback - Callback function
*/
- advanceDeltaIndexByType(
+ advanceDeltaIndex(
doc_id: DocumentId,
type: MongoDeltaType,
- index: PositiveInteger,
- callback: ( err: NullableError, indexAdvanced: boolean ) => void,
- ): this
+ ): Promise<NullableError>
{
- var self = this;
+ return new Promise( ( resolve, reject ) =>
+ {
+ const inc_data: Record<string, any> = {};
- const set_data: Record<string, any> = {};
+ inc_data[ 'lastPublishDelta.' + type ] = 1;
- set_data[ 'lastPublishDelta.' + type ] = index;
-
- this._collection.update(
- { id: doc_id },
- { $set: set_data },
- { upsert: true },
- function( err )
- {
- if ( err )
+ this._collection!.update(
+ { id: doc_id },
+ { $inc: inc_data },
+ { upsert: false },
+ function( err )
{
- callback.call( self, err, false );
+ if ( err )
+ {
+ reject( 'Error advancing delta index: ' + err )
+ return;
+ }
+ resolve();
return;
}
-
- callback.call( self, null, true );
-
- return;
- }
- );
-
- return this;
+ );
+ } );
}
@@ -140,35 +209,30 @@ export class MongoDeltaDao implements DeltaDao
* @return true if the document was successfully marked as processed
*/
markDocumentAsProcessed(
- doc_id: DocumentId,
- last_update_ts: UnixTimestamp,
- callback: ( err: NullableError, indexAdvanced: boolean ) => void,
- ): this
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ ): Promise<NullableError>
{
- var self = this;
-
- this._collection.update(
- { id: doc_id, lastUpdate: { $gt: last_update_ts } },
- { $set: { processed: true } },
- { upsert: false },
- function( err, result )
- {
- if ( err )
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.update(
+ { id: doc_id, lastUpdate: { $lte: last_update_ts } },
+ { $set: { published: true } },
+ { upsert: false },
+ function( err )
{
- callback.call( self, err, false );
+ if ( err )
+ {
+ reject( "Error marking document as processed: " + err );
+ return;
+ }
+ resolve();
return;
}
+ );
- console.log( '-------', result );
-
- callback.call( self, null, true );
-
- return;
- }
- );
-
- return this;
+ } );
}
}
diff --git a/src/system/event/EventDispatcher.ts b/src/system/event/EventDispatcher.ts
new file mode 100644
index 0000000..45a15b8
--- /dev/null
+++ b/src/system/event/EventDispatcher.ts
@@ -0,0 +1,44 @@
+/**
+ * Event Dispatcher
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Dispatch events
+ */
+
+import { EventEmitter } from "events";
+
+export class EventDispatcher extends EventEmitter
+{
+ /**
+ * Initialize dispatcher
+ *
+ * @param _emitter - the event emitter
+ */
+ constructor(
+ private readonly _emitter: EventEmitter
+ ) {
+ super();
+ }
+
+
+ dispatch( event_id: string, arg: any ): void
+ {
+ this._emitter.emit( event_id, arg );
+ }
+}
diff --git a/src/system/event/EventSubscriber.ts b/src/system/event/EventSubscriber.ts
new file mode 100644
index 0000000..2950460
--- /dev/null
+++ b/src/system/event/EventSubscriber.ts
@@ -0,0 +1,44 @@
+/**
+ * Event Subscriber
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Subscribe to events
+ */
+
+import { EventEmitter } from "events";
+
+export class EventSubscriber extends EventEmitter
+{
+ /**
+ * Initialize subscriber
+ *
+ * @param _emitter - the event emitter
+ */
+ constructor(
+ private readonly _emitter: EventEmitter
+ ) {
+ super();
+ }
+
+
+ subscribe( event_id: string, callback: ( arg: any ) => void ): void
+ {
+ this._emitter.on( event_id, callback );
+ }
+}
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 6cc221f..61d764e 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -28,6 +28,32 @@ import { PositiveInteger } from "../numeric";
declare module "mongodb";
+export interface MongoDbConfig extends Record<string, any> {
+ /** Host */
+ host?: string;
+
+ /** Port number */
+ port?: number;
+
+ /** High availability */
+ ha: boolean;
+}
+
+
+/**
+ * Interface for the mongo database
+ */
+export interface MongoDb
+{
+ /**
+ * Initialize the database connection
+ *
+ * @param callback continuation on completion
+ */
+ open( callback: MongoCallback ): void;
+}
+
+
/**
* Node-style callback for queries
*/
@@ -139,8 +165,6 @@ declare interface MongoCollection
* @param data update data
* @param options query options
* @param callback continuation on completion
- *
- * @return callback return value
*/
update(
selector: MongoSelector,