diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/system/AmqpPublisher.ts | 2 | ||||
-rw-r--r-- | src/system/DeltaLogger.ts | 135 | ||||
-rw-r--r-- | src/system/DeltaProcessor.ts | 123 | ||||
-rw-r--r-- | src/system/DeltaPublisher.ts | 217 | ||||
-rw-r--r-- | src/system/MetricsCollector.ts | 80 | ||||
-rw-r--r-- | src/system/avro/schema.avsc | 113 | ||||
-rw-r--r-- | src/system/db/DeltaDao.ts | 24 | ||||
-rw-r--r-- | src/system/db/MongoDeltaDao.ts | 212 | ||||
-rw-r--r-- | src/system/event/EventDispatcher.ts | 44 | ||||
-rw-r--r-- | src/system/event/EventSubscriber.ts | 44 | ||||
-rw-r--r-- | src/types/mongodb.d.ts | 28 |
11 files changed, 786 insertions, 236 deletions
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts index bfd6dc3..019ea0d 100644 --- a/src/system/AmqpPublisher.ts +++ b/src/system/AmqpPublisher.ts @@ -38,5 +38,5 @@ export interface AmqpPublisher * * @param delta - The delta to publish */ - publish( delta: DeltaResult<any> ): void; + publish( delta: DeltaResult<any> ): Promise<null>; } diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts new file mode 100644 index 0000000..50e616e --- /dev/null +++ b/src/system/DeltaLogger.ts @@ -0,0 +1,135 @@ +/** + * Delta logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Logger for delta events + */ + +import { EventSubscriber } from "./event/EventSubscriber"; + +enum LogLevel { + DEBUG, + INFO, + NOTICE, + WARNING, + ERROR, + CRITICAL, + ALERT, + EMERGENCY, +}; + +declare type StructuredLog = { + message: string; + timestamp: UnixTimestamp; + service: string; + env: string; + severity: string; +} + +export class DeltaLogger +{ + /** + * Initialize delta logger + * + * @param _env - The environment ( dev, test, demo, live ) + * @param _subscriber - An event subscriber + * @param _ts_ctr - a timestamp constructor + */ + constructor( + private readonly _env: string, + private readonly _subscriber: EventSubscriber, + private readonly _ts_ctr : () => UnixTimestamp, + ) {} + + + /** + * Initialize the logger to look for specific events + */ + init(): void + { + this._registerEvent( 'document-processed', LogLevel.NOTICE ); + this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + this._registerEvent( 'avro-parse-err', LogLevel.ERROR ); + this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + this._registerEvent( 'publish-err', LogLevel.ERROR ); + } + + + /** + * Register an event at a specific log level + * + * @param event_id - the event id + * @param level - the log level + */ + private _registerEvent( event_id: string, level: LogLevel ): void + { + const logF = this._getLogLevelFunction( level ) + + this._subscriber.subscribe( event_id, logF ); + } + + + /** + * Get a logging function for the specified log level + * + * @param event_id - the event id + * + * @return the function to log with + */ + private _getLogLevelFunction( level: LogLevel ): ( str: string ) => void + { + switch( level ) + { + case LogLevel.DEBUG: + case LogLevel.INFO: + return ( _ ) => console.info( this._formatLog( _, level ) ); + case LogLevel.NOTICE: + return ( _ ) => console.log( this._formatLog( _, level ) ); + case LogLevel.WARNING: + return ( _ ) => console.warn( this._formatLog( _, level ) ); + case LogLevel.ERROR: + case LogLevel.CRITICAL: + case LogLevel.ALERT: + case LogLevel.EMERGENCY: + return ( _ ) => console.error( this._formatLog( _, level ) ); + default: + return ( _ ) => console.log( "UNKNOWN LOG LEVEL: " + _ ); + } + } + + + /** + * Get structured log object + * + * @param str - the string to log + * @param level - the log level + * + * @returns a structured logging object + */ + private _formatLog( str: string, level: LogLevel ): StructuredLog + { + return <StructuredLog>{ + message: str, + timestamp: this._ts_ctr(), + service: 'quote-server', + env: this._env, + severity: LogLevel[level], + }; + } +} diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index 678e700..39cdf3d 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -24,7 +24,7 @@ import { MongoDeltaType } from "../system/db/MongoDeltaDao"; import { DeltaResult } from "../bucket/delta"; import { DocumentId } from "../document/Document"; import { AmqpPublisher } from "./AmqpPublisher"; - +import { EventDispatcher } from "./event/EventDispatcher"; /** * Process deltas for a quote and publish to a queue @@ -41,11 +41,14 @@ export class DeltaProcessor /** * Initialize processor * - * @param _collection Mongo collection + * @param _dao - Mongo collection + * @param _publisher - Amqp Publisher + * @param _dispatcher - Event dispatcher instance */ constructor( - private readonly _dao: DeltaDao, - private readonly _publisher: AmqpPublisher, + private readonly _dao: DeltaDao, + private readonly _publisher: AmqpPublisher, + private readonly _dispatcher: EventDispatcher ) {} @@ -56,31 +59,48 @@ export class DeltaProcessor { let self = this; - this._dao.getUnprocessedDocuments( function( docs ) + this._dao.getUnprocessedDocuments() + .then( docs => { - docs.forEach( doc => { - - const deltas = self.getTimestampSortedDeltas( doc ); - - deltas.forEach( delta => { - - self._publisher.publish( delta ); - - }); - - const last_updated_ts = doc.lastUpdated; + docs.forEach( doc => + { + const deltas = self.getTimestampSortedDeltas( doc ); const doc_id: DocumentId = doc.id; + const last_updated_ts = doc.lastUpdate; - self._dao.markDocumentAsProcessed( - doc_id, - last_updated_ts, - function( err, markedSuccessfully ) + deltas.forEach( delta => + { + self._publisher.publish( delta ) + .then( _ => + { + self._dao.advanceDeltaIndex( doc_id, delta.type ); + } ) + .catch( _ => { - console.log( err, markedSuccessfully ); - }, - ); + // TODO: blow up? + } ); + }); + + self._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) + .then( _ => + { + this._dispatcher.dispatch( + 'document-processed', + 'Deltas on document ' + doc_id + ' processed ' + + 'successfully. Document has been marked as ' + + 'completely processed.' + ); + } ) + .catch( err => + { + this._dispatcher.dispatch( 'mongodb-err', err ); + } ); }); - }); + } ) + .catch( err => + { + this._dispatcher.dispatch( 'mongodb-err', err ); + } ); } @@ -91,9 +111,7 @@ export class DeltaProcessor * * @return a list of deltas sorted by timestamp */ - getTimestampSortedDeltas( - doc: any, - ): DeltaResult<any>[] + getTimestampSortedDeltas( doc: any ): DeltaResult<any>[] { const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA ); const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA ); @@ -113,32 +131,26 @@ export class DeltaProcessor * * @return a trimmed list of deltas */ - getDeltas( - doc: any, - type: MongoDeltaType, - ): DeltaResult<any>[] + getDeltas( doc: any, type: MongoDeltaType ): DeltaResult<any>[] { - // Get objects so we can get the index by type - const deltas_obj = doc.rdelta || {}; + const deltas_obj = doc.rdelta || {}; + const deltas: DeltaResult<any>[] = deltas_obj[ type ] || []; - // Get type specific deltas + // Get type specific delta index let last_published_index = 0; if ( doc.lastPublishDelta ) { - const last_published_indexes = doc.lastPublishDelta; - - last_published_index = last_published_indexes[ type ] || 0; + last_published_index = doc.lastPublishDelta[ type ] || 0; } - const deltas: DeltaResult<any>[] = deltas_obj[ type ] || []; - // Only return the unprocessed deltas const deltas_trimmed = deltas.slice( last_published_index ); // Mark each delta with its type - deltas_trimmed.forEach( delta => { + deltas_trimmed.forEach( delta => + { delta.type = type; - }); + } ); return deltas_trimmed; } @@ -148,14 +160,11 @@ export class DeltaProcessor * Sort an array of deltas by timestamp * * @param a - The first delta to compare - * @param a - The second delta to compare + * @param b - The second delta to compare * * @return a sort value */ - private _sortByTimestamp( - a: DeltaResult<any>, - b: DeltaResult<any>, - ): number + private _sortByTimestamp( a: DeltaResult<any>, b: DeltaResult<any> ): number { if ( a.timestamp < b.timestamp ) { @@ -168,26 +177,4 @@ export class DeltaProcessor return 0; } - - - /** - * Generate amqp config from environment variables - * - * @returns the amqp configuration - */ - // generateConfigFromEnv(): AmqpConfig - // { - // return <AmqpConfig>{ - // "protocol": "amqp", - // "hostname": process.env.hostname, - // "port": process.env.port, - // "username": process.env.username, - // "password": process.env.password, - // "locale": "en_US", - // "frameMax": 0, - // "heartbeat": 0, - // "vhost": process.env.vhost, - // "exchange": process.env.exchange, - // }; - // } }
\ No newline at end of file diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts index 57a74b6..c67cde4 100644 --- a/src/system/DeltaPublisher.ts +++ b/src/system/DeltaPublisher.ts @@ -21,8 +21,9 @@ * Publish delta message to a queue */ -import { AmqpPublisher } from "./AmqpPublisher"; -import { DeltaResult } from "../bucket/delta"; +import { AmqpPublisher } from './AmqpPublisher'; +import { DeltaResult } from '../bucket/delta'; +import { EventDispatcher } from './event/EventDispatcher'; import { connect as amqpConnect, Options, @@ -31,7 +32,6 @@ import { const avro = require( 'avro-js' ); - export interface AmqpConfig extends Options.Connect { /** The name of a queue or exchange to publish to */ exchange: string; @@ -41,24 +41,26 @@ export interface AmqpConfig extends Options.Connect { export class DeltaPublisher implements AmqpPublisher { /** The path to the avro schema */ - readonly SCHEMA_PATH = './avro/schema.avsc'; + readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc'; /** A mapping of which delta type translated to which avro event */ readonly DELTA_MAP: Record<string, string> = { - data: 'rate', - ratedata: 'update', + data: 'STEP_SAVE', + ratedata: 'RATE', }; /** - * Initialize trait + * Initialize publisher * - * @param _conf - amqp configuration - * @param _logger - logger instance + * @param _conf - amqp configuration + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor */ constructor( - private readonly _conf: AmqpConfig, - private readonly _logger: any + private readonly _conf: AmqpConfig, + private readonly _dispatcher: EventDispatcher, + private readonly _ts_ctr : () => UnixTimestamp, ) {} @@ -66,35 +68,65 @@ export class DeltaPublisher implements AmqpPublisher * Publish quote message to exchange post-rating * * @param delta - The delta to publish + * + * @return whether the message was published successfully */ - publish( delta: DeltaResult<any> ): void + publish( delta: DeltaResult<any> ): Promise<null> { - // check both as we transition from one to the other const exchange = this._conf.exchange; - amqpConnect( this._conf ) + return new Promise<null>( ( resolve, reject ) => + { + amqpConnect( this._conf ) .then( conn => { setTimeout( () => conn.close(), 10000 ); return conn.createChannel(); } ) - .then( ch => { + .then( ch => + { ch.assertExchange( exchange, 'fanout', { durable: true } ); - return this._sendMessage( ch, exchange, delta ); + return this.sendMessage( ch, exchange, delta ); + } ) + .then( sentSuccessfully => + { + console.log('sentSuccessfully', sentSuccessfully); + if ( sentSuccessfully ) + { + this._dispatcher.dispatch( + 'delta-publish', + "Published " + delta.type + " delta with ts '" + + delta.timestamp + "' to '" + exchange + + '" exchange', + ); + + resolve(); + } + else + { + this._dispatcher.dispatch( + 'publish-err', + "Error publishing " + delta.type + " delta with ts '" + + delta.timestamp + "' to '" + exchange + + "' exchange", + ); + + reject(); + } } ) - .then( () => this._logger.log( - this._logger.PRIORITY_INFO, - "Published " + delta.type + " delta with timestamp '" + - delta.timestamp + "' to quote-update exchange '"+ - exchange + "'" - ) ) - .catch( e => this._logger.log( - this._logger.PRIORITY_ERROR, - "Error publishing " + delta.type + " delta with timestamp '" + - delta.timestamp + "' to quote-update exchange '"+ - exchange + "'" + ": " + e - ) ); + .catch( e => + { + this._dispatcher.dispatch( + 'publish-err', + "Error publishing " + delta.type + " delta with ts '" + + delta.timestamp + '" to "' + exchange + "' exchange '" + + e, + ) + + reject(); + } ); + } ); } @@ -107,7 +139,7 @@ export class DeltaPublisher implements AmqpPublisher * * @return whether publish was successful */ - _sendMessage( + sendMessage( channel: Channel, exchange: string, delta: DeltaResult<any>, @@ -118,14 +150,48 @@ export class DeltaPublisher implements AmqpPublisher created: Date.now(), }; - const event_id = this.DELTA_MAP[ delta.type ]; + // Convert all delta datums to string for avro + const delta_data = this.avroFormat( delta.data ); + const event_id = this.DELTA_MAP[ delta.type ]; const data = { - delta: delta, - event: event_id, + event: { + id: event_id, + ts: this._ts_ctr(), + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, // Fix + }, + session: { + entity_name: 'Foobar', // Fix + entity_id: 123123, // Fix + }, + data: { + Data: { + bucket: delta_data, + }, + }, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', // Fix + }, + }, }; - const avro_buffer = this._avroEncode( data ); + const avro_buffer = this.avroEncode( data ); + + if ( !avro_buffer ) + { + return false; + } // we don't use a routing key; fanout exchange const routing_key = ''; @@ -144,14 +210,91 @@ export class DeltaPublisher implements AmqpPublisher * * @param data - the data to encode * - * @return the avro buffer + * @return the avro buffer or null if there is an error */ - _avroEncode( data: Record<string, any> ): Buffer + avroEncode( data: Record<string, any> ): Buffer | null { - const type = avro.parse( this.SCHEMA_PATH ); + let buffer = null; - const buffer = type.toBuffer( data ); + try + { + const type = avro.parse( this.SCHEMA_PATH ); + buffer = type.toBuffer( data ); + } + catch( e ) + { + this._dispatcher.dispatch( + 'avro-parse-err', + 'Error encoding data to avro: ' + e, + ); + } return buffer; } + + + /** + * Format the data for avro by add type specifications to the data + * + * @param data - the data to format + * + * @return the formatted data + */ + avroFormat( data: any, top_level: boolean = true ): any + { + let data_formatted: any = {}; + + switch( typeof( data ) ) + { + case 'object': // Typescript treats arrays as objects + if ( data == null ) + { + return null; + } + else if ( Array.isArray( data ) ) + { + let arr: any[] = []; + + data.forEach( ( datum ) => + { + arr.push( this.avroFormat( datum, false ) ); + } ); + + data_formatted = ( top_level ) + ? arr + : { 'array': arr }; + } + else + { + let datum_formatted: any = {}; + + Object.keys( data).forEach( ( key: string ) => + { + const datum = this.avroFormat( data[ key ], false ); + + datum_formatted[ key ] = datum; + + } ); + + data_formatted = ( top_level ) + ? datum_formatted + : { "map": datum_formatted }; + } + break; + + case 'boolean': + return { 'boolean': data }; + + case 'number': + return { 'double': data }; + + case 'string': + return { 'string': data }; + + case 'undefined': + return null; + } + + return data_formatted; + } } diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts new file mode 100644 index 0000000..583b5a6 --- /dev/null +++ b/src/system/MetricsCollector.ts @@ -0,0 +1,80 @@ +/** + * Metrics Collector + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Collect Metrics for Prometheus + */ + +import { EventSubscriber } from "./event/EventSubscriber"; + +const client = require('prom-client'); + +declare type MetricStructure = { + path: string; + code: number; + service: string; + env: string; +} + +export class MetricsCollector +{ + /** + * Initialize delta logger + */ + constructor( + private readonly _env: string, + private readonly _subscriber: EventSubscriber, + ) {} + + + /** + * Initialize the logger to look for specific events + */ + init(): void + { + const collectDefaultMetrics = client.collectDefaultMetrics; + + console.log( this._subscriber, collectDefaultMetrics) + this._formatLog( '', 123 ); + // this._registerEvent( 'document-processed', LogLevel.NOTICE ); + // this._registerEvent( 'delta-publish', LogLevel.NOTICE ); + // this._registerEvent( 'avro-parse-err', LogLevel.ERROR ); + // this._registerEvent( 'mongodb-err', LogLevel.ERROR ); + // this._registerEvent( 'publish-err', LogLevel.ERROR ); + } + + + /** + * Get structured metric object + * + * @param path - the endpoint being hit + * @param code - the response code + * + * @returns a structured logging object + */ + private _formatLog( path: string, code: number ): MetricStructure + { + return <MetricStructure>{ + path: path, + code: code, + service: 'quote-server', + env: this._env, + }; + } +} diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc index ee793a6..4a9a609 100644 --- a/src/system/avro/schema.avsc +++ b/src/system/avro/schema.avsc @@ -34,28 +34,31 @@ }, { "name": "step", - "type": { - "type": "record", - "name": "EventStep", - "fields": [ - { - "name": "transition", - "type": { - "type": "enum", - "name": "EventStepTransition", - "symbols": [ "BACK", "FORWARD", "END" ] + "type":[ + "null", + { + "type": "record", + "name": "EventStep", + "fields": [ + { + "name": "transition", + "type": { + "type": "enum", + "name": "EventStepTransition", + "symbols": [ "BACK", "FORWARD", "END" ] + } + }, + { + "name": "src", + "type": "string" + }, + { + "name": "dest", + "type": "string" } - }, - { - "name": "src", - "type": "string" - }, - { - "name": "dest", - "type": "string" - } - ] - } + ] + } + ] } ] } @@ -70,20 +73,6 @@ { "name": "id", "type": "int" - }, - { - "name": "created", - "type": "long", - "logicalType": "timestamp-millis" - }, - { - "name": "modified", - "type": "long", - "logicalType": "timestamp-millis" - }, - { - "name": "top_visited_step", - "type": "string" } ] } @@ -115,12 +104,56 @@ "fields": [ { "name": "bucket", - "type": { + "type":{ "type": "map", - "values": { - "type" : "array", - "items" : "string" - } + "values": [ + "null", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + }, + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string", + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + } + ] + } + ] } } ] diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index 53cd8f5..64e1f0f 100644 --- a/src/system/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -28,7 +28,6 @@ */ import { DocumentId } from "../../document/Document"; -import { PositiveInteger } from "../../numeric"; /** Manage deltas */ @@ -39,23 +38,21 @@ export interface DeltaDao * * @return documents in need of processing */ - getUnprocessedDocuments( - callback: ( data: Record<string, any>[] ) => void, - ): this; + getUnprocessedDocuments(): Promise<Record<string, any>[]> /** * Set the document's processed index * - * @param doc_id - The document whose index will be set - * @param index - The index to set + * @param doc_id - Document whose index will be set + * @param type - Delta type + * + * @return any errors that occured */ - advanceDeltaIndexByType( + advanceDeltaIndex( doc_id: DocumentId, type: string, - index: PositiveInteger, - callback: ( err: NullableError, indexHasAdvanced: boolean ) => void, - ): this; + ): Promise<NullableError> /** @@ -68,9 +65,8 @@ export interface DeltaDao * @return true if the document was successfully marked as processed */ markDocumentAsProcessed( - doc_id: DocumentId, - last_update_ts: UnixTimestamp, - callback: ( err: NullableError, markedSuccessfully: boolean ) => void, - ): this; + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise<NullableError> } diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index cebf453..443ae85 100644 --- a/src/system/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -22,10 +22,8 @@ */ import { DocumentId } from "../../document/Document"; -import { PositiveInteger } from "../../numeric"; -import { MongoCollection } from "mongodb"; import { DeltaDao } from "./DeltaDao"; - +import { MongoCollection } from "mongodb"; export type MongoDeltaType = 'ratedata' | 'data'; @@ -33,56 +31,134 @@ export type MongoDeltaType = 'ratedata' | 'data'; /** Manage deltas */ export class MongoDeltaDao implements DeltaDao { + /** Collection used to store quotes */ + readonly COLLECTION: string = 'quotes'; + /** The ratedata delta type */ static readonly DELTA_RATEDATA: string = 'ratedata'; /** The data delta type */ static readonly DELTA_DATA: string = 'data'; + /** The mongo quotes collection */ + private _collection?: MongoCollection | null; + /** * Initialize connection * - * @param _collection Mongo collection + * @param _db Mongo db */ constructor( - private readonly _collection: MongoCollection, + private readonly _db: any, ) {} /** - * Get documents in need of processing + * Attempts to connect to the database * - * @return documents in need of processing + * connectError event will be emitted on failure. + * + * @return any errors that occured */ - getUnprocessedDocuments( - callback: ( data: Record<string, any>[] ) => void, - ): this + init(): Promise<NullableError> { - var self = this; + var dao = this; - this._collection.find( - { published: false }, - {}, - function( _err, cursor ) + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + this._db.open( function( err: any, db: any ) { - cursor.toArray( function( _err: NullableError, data: any[] ) + // if there was an error, don't bother with anything else + if ( err ) { - // was the quote found? - if ( data.length == 0 ) + // in some circumstances, it may just be telling us that we're + // already connected (even though the connection may have been + // broken) + if ( err.errno !== undefined ) { - callback.call( self, [] ); - + reject( 'Error opening mongo connection: ' + err ); return; } + } - // return the quote data - callback.call( self, data ); - }); + // quotes collection + db.collection( + dao.COLLECTION, + function( + _err: any, + collection: MongoCollection, + ) { + // for some reason this gets called more than once + if ( collection == null ) + { + return; + } + + // initialize indexes + collection.createIndex( + [ ['id', 1] ], + true, + function( err: any, _index: { [P: string]: any } ) + { + if ( err ) + { + reject( 'Error creating index: ' + err ); + return; + } + + // mark the DAO as ready to be used + dao._collection = collection; + resolve(); + return; + } + ); + } + ); + }); + } ); + } + + + /** + * Get documents in need of processing + * + * @return documents in need of processing + */ + getUnprocessedDocuments(): Promise<Record<string, any>[]> + { + var self = this; + + return new Promise( ( resolve, reject ) => + { + if ( !self._collection ) + { + reject( 'Database not ready' ); + return; } - ) - return this; + + this._collection!.find( + { published: false }, + {}, + function( _err, cursor ) + { + cursor.toArray( function( _err: NullableError, data: any[] ) + { + // was the quote found? + if ( data.length == 0 ) + { + resolve( [] ); + return; + } + + // return the quote data + resolve( data ); + }); + } + ) + } ); } @@ -91,42 +167,35 @@ export class MongoDeltaDao implements DeltaDao * * @param doc_id - Document whose index will be set * @param type - Delta type - * @param index - Index to set - * @param callback - Callback function */ - advanceDeltaIndexByType( + advanceDeltaIndex( doc_id: DocumentId, type: MongoDeltaType, - index: PositiveInteger, - callback: ( err: NullableError, indexAdvanced: boolean ) => void, - ): this + ): Promise<NullableError> { - var self = this; + return new Promise( ( resolve, reject ) => + { + const inc_data: Record<string, any> = {}; - const set_data: Record<string, any> = {}; + inc_data[ 'lastPublishDelta.' + type ] = 1; - set_data[ 'lastPublishDelta.' + type ] = index; - - this._collection.update( - { id: doc_id }, - { $set: set_data }, - { upsert: true }, - function( err ) - { - if ( err ) + this._collection!.update( + { id: doc_id }, + { $inc: inc_data }, + { upsert: false }, + function( err ) { - callback.call( self, err, false ); + if ( err ) + { + reject( 'Error advancing delta index: ' + err ) + return; + } + resolve(); return; } - - callback.call( self, null, true ); - - return; - } - ); - - return this; + ); + } ); } @@ -140,35 +209,30 @@ export class MongoDeltaDao implements DeltaDao * @return true if the document was successfully marked as processed */ markDocumentAsProcessed( - doc_id: DocumentId, - last_update_ts: UnixTimestamp, - callback: ( err: NullableError, indexAdvanced: boolean ) => void, - ): this + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise<NullableError> { - var self = this; - - this._collection.update( - { id: doc_id, lastUpdate: { $gt: last_update_ts } }, - { $set: { processed: true } }, - { upsert: false }, - function( err, result ) - { - if ( err ) + return new Promise( ( resolve, reject ) => + { + this._collection!.update( + { id: doc_id, lastUpdate: { $lte: last_update_ts } }, + { $set: { published: true } }, + { upsert: false }, + function( err ) { - callback.call( self, err, false ); + if ( err ) + { + reject( "Error marking document as processed: " + err ); + return; + } + resolve(); return; } + ); - console.log( '-------', result ); - - callback.call( self, null, true ); - - return; - } - ); - - return this; + } ); } } diff --git a/src/system/event/EventDispatcher.ts b/src/system/event/EventDispatcher.ts new file mode 100644 index 0000000..45a15b8 --- /dev/null +++ b/src/system/event/EventDispatcher.ts @@ -0,0 +1,44 @@ +/** + * Event Dispatcher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Dispatch events + */ + +import { EventEmitter } from "events"; + +export class EventDispatcher extends EventEmitter +{ + /** + * Initialize dispatcher + * + * @param _emitter - the event emitter + */ + constructor( + private readonly _emitter: EventEmitter + ) { + super(); + } + + + dispatch( event_id: string, arg: any ): void + { + this._emitter.emit( event_id, arg ); + } +} diff --git a/src/system/event/EventSubscriber.ts b/src/system/event/EventSubscriber.ts new file mode 100644 index 0000000..2950460 --- /dev/null +++ b/src/system/event/EventSubscriber.ts @@ -0,0 +1,44 @@ +/** + * Event Subscriber + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Subscribe to events + */ + +import { EventEmitter } from "events"; + +export class EventSubscriber extends EventEmitter +{ + /** + * Initialize subscriber + * + * @param _emitter - the event emitter + */ + constructor( + private readonly _emitter: EventEmitter + ) { + super(); + } + + + subscribe( event_id: string, callback: ( arg: any ) => void ): void + { + this._emitter.on( event_id, callback ); + } +} diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts index 6cc221f..61d764e 100644 --- a/src/types/mongodb.d.ts +++ b/src/types/mongodb.d.ts @@ -28,6 +28,32 @@ import { PositiveInteger } from "../numeric"; declare module "mongodb"; +export interface MongoDbConfig extends Record<string, any> { + /** Host */ + host?: string; + + /** Port number */ + port?: number; + + /** High availability */ + ha: boolean; +} + + +/** + * Interface for the mongo database + */ +export interface MongoDb +{ + /** + * Initialize the database connection + * + * @param callback continuation on completion + */ + open( callback: MongoCallback ): void; +} + + /** * Node-style callback for queries */ @@ -139,8 +165,6 @@ declare interface MongoCollection * @param data update data * @param options query options * @param callback continuation on completion - * - * @return callback return value */ update( selector: MongoSelector, |