diff options
61 files changed, 6206 insertions, 475 deletions
@@ -0,0 +1,17 @@ +AMQP_HOST=localhost +AMQP_PORT=5672 +AMQP_USER= +AMQP_PASS= +AMQP_FRAMEMAX=0 +AMQP_HEARTBEAT=2 +AMQP_VHOST= +AMQP_EXCHANGE= +AMQP_RETRIES=30 +AMQP_RETRY_WAIT=1 +PROM_HOST= +PROM_PORT=9091 +PROM_PUSH_INTERVAL_MS=5000 +PROM_BUCKETS_START=0 +PROM_BUCKETS_WIDTH=10 +PROM_BUCKETS_COUNT=10 +PROCESS_INTERVAL_MS=2000 @@ -8,6 +8,7 @@ Makefile.in # generated by configure bin/server +bin/delta-processor src/version.js /config.* Makefile @@ -27,6 +28,7 @@ src/**/index.js # npm node_modules -# typescript +# typescript output +bin/*.js tsconfig.tsbuildinfo diff --git a/bin/delta-processor.in b/bin/delta-processor.in new file mode 100644 index 0000000..f0984e1 --- /dev/null +++ b/bin/delta-processor.in @@ -0,0 +1,33 @@ +#!/bin/sh +# Start Liza delta processor using Node.js executable determined at +# configure-time +# +# Copyright (C) 2010-2019 R-T Specialty, LLC. +# +# This file is part of liza. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# In addition to the configure-time NODE_FLAGS, the NODE_FLAGS environment +# variable can be used to add additional arguments to this script. +# WARNING: NODE_FLAGS arguments provided via environment varialbes are _not_ +# escaped, so be mindful of word expansion! +# +# @AUTOGENERATED@ +## + +cd "$( dirname $( readlink -f "$0" ) )" + +exec "@NODE@" @NODE_FLAGS@ $NODE_FLAGS delta-processor.js "$@" + diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts new file mode 100644 index 0000000..83e42e3 --- /dev/null +++ b/bin/delta-processor.ts @@ -0,0 +1,175 @@ +/** + * Start the Liza delta processor + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +import * as amqplib from 'amqplib'; +import { createAmqpConfig } from '../src/system/AmqpPublisher'; +import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; +import { DeltaProcessor } from '../src/system/DeltaProcessor'; +import { DeltaPublisher } from '../src/system/DeltaPublisher'; +import { MongoCollection } from '../src/types/mongodb'; +import { createAvroEncoder } from '../src/system/avro/AvroFactory'; +import { V1MessageWriter } from '../src/system/avro/V1MessageWriter'; +import { + createMongoConfig, + createMongoDB, + getMongoCollection, +} from '../src/system/db/MongoFactory'; +import { EventMediator } from '../src/system/EventMediator'; +import { EventEmitter } from 'events'; +import { StandardLogger } from '../src/system/StandardLogger'; +import { MetricsCollector } from '../src/system/MetricsCollector'; +import { + PrometheusFactory, + createPrometheusConfig, +} from '../src/system/PrometheusFactory'; +import { AmqpConnection } from '../src/system/amqp/AmqpConnection'; +import { parse as avro_parse } from 'avro-js'; + +require('dotenv-flow').config(); + +const amqp_conf = createAmqpConfig( process.env ); +const prom_conf = createPrometheusConfig( process.env ); +const db_conf = createMongoConfig( process.env ); +const db = createMongoDB( db_conf ); +const process_interval_ms = +( process.env.process_interval_ms || 2000 ); +const env = process.env.NODE_ENV || 'Unknown Environment'; +const emitter = new EventEmitter(); +const log = new StandardLogger( console, ts_ctr, env ); +const amqp_connection = new AmqpConnection( amqplib, amqp_conf, emitter ); + +const message_writer = new V1MessageWriter( + createAvroEncoder, + avro_parse( __dirname + '/../src/system/avro/schema.avsc' ), +); + +const publisher = new DeltaPublisher( + emitter, + ts_ctr, + amqp_connection, + message_writer, +); + +// Prometheus Metrics +const prom_factory = new PrometheusFactory(); +const metrics = new MetricsCollector( + prom_factory, + prom_conf, + emitter, + process.hrtime, +); + +// Structured logging +new EventMediator( log, emitter ); + +let process_interval: NodeJS.Timer; +let dao: MongoDeltaDao; + +getMongoCollection( db, db_conf ) + .then( ( conn: MongoCollection ) => { return new MongoDeltaDao( conn ); } ) + .then( ( mongoDao: MongoDeltaDao ) => { dao = mongoDao; } ) + .then( _ => amqp_connection.connect() ) + .then( _ => + { + log.info( 'Liza Delta Processor' ); + + handleShutdown(); + + const processor = new DeltaProcessor( dao, publisher, emitter ); + + return new Promise( ( _resolve, reject ) => + { + process_interval = setInterval( () => + { + try + { + processor.process() + .catch( err => reject( err ) ); + } + catch ( err ) + { + reject( err ); + } + + dao.getErrorCount() + .then( count => { metrics.updateErrorCount( count ) } ); + }, process_interval_ms ); + } ); + } ) + .catch( e => + { + emitter.emit( 'error', e ); + process.exit( 1 ); + } ); + + +/** + * Hook shutdown events + */ +function handleShutdown(): void +{ + process.on( 'SIGINT', () => { shutdown( 'SIGINT' ); } ) + .on( 'SIGTERM', () => { shutdown( 'SIGTERM' ); } ); +} + + +/** + * Perform a graceful shutdown + * + * @param signal - the signal that caused the shutdown + */ +function shutdown( signal: string ): void +{ + log.info( 'Received ' + signal + '. Beginning graceful shutdown:' ); + log.info( '...Stopping processing interval' ); + + clearInterval( process_interval ); + + log.info( '...Closing MongoDb connection' ); + + db.close( ( err, _data ) => + { + if ( err ) + { + console.error( ' Error closing connection: ' + err ); + } + } ); + + log.info( '...Closing AMQP connection...' ); + + amqp_connection.close(); + + log.info( '...Stopping the metrics collector...' ); + + metrics.stop(); + + log.info( 'Shutdown complete. Exiting.' ); + + process.exit(); +} + + +/** Timestamp constructor + * + * @return a timestamp + */ +function ts_ctr(): UnixTimestamp +{ + return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 ); +} diff --git a/bin/server.js b/bin/server.ts index ec93fb7..b14a7c7 100644 --- a/bin/server.js +++ b/bin/server.ts @@ -1,7 +1,7 @@ /** * Start the Liza Server * - * Copyright (C) 2017 R-T Specialty, LLC. + * Copyright (C) 2010-2019 R-T Specialty, LLC. * * This file is part of the Liza Data Collection Framework. * @@ -19,19 +19,12 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -'use strict'; +import fs = require( 'fs' ); +import path = require( 'path' ); -const fs = require( 'fs' ); -const path = require( 'path' ); - -const { - conf: { - ConfLoader, - ConfStore, - }, - server, - version, -} = require( '../' ); +import { ConfLoader } from "../src/conf/ConfLoader"; +import { ConfStore } from "../src/conf/ConfStore"; +import * as version from "../src/version"; // kluge for now const conf_path = ( @@ -42,7 +35,7 @@ const conf_path = ( const conf_dir = path.dirname( conf_path ); -ConfLoader( fs, ConfStore ) +new ConfLoader( fs, ConfStore ) .fromFile( conf_path ) .then( conf => Promise.all( [ conf.get( 'name' ), @@ -70,12 +63,12 @@ ConfLoader( fs, ConfStore ) * Produce an absolute path if `path` is absolute, otherwise a path relative * to the configuration directory * - * @param {string} conf_path configuration path (for relative `path`) - * @param {string} path path to resolve + * @param conf_path - configuration path (for relative `path`) + * @param path - path to resolve * * @return resolved path */ -function _resolvePath( conf_path, path ) +function _resolvePath( conf_path: string, path: string ): string { return ( path[ 0 ] === '/' ) ? path @@ -83,15 +76,29 @@ function _resolvePath( conf_path, path ) } -function writePidFile( pid_path ) +/** + * Write process id (PID) file + * + * @param pid_path - path to pid file + */ +function writePidFile( pid_path: string ): void { - fs.writeFile( pid_path, process.pid ); + fs.writeFileSync( pid_path, process.pid ); - process.on( 'exit', () => fs.unlink( pid_path ) ); + process.on( 'exit', () => fs.unlink( pid_path, () => {} ) ); } -function greet( name, pid_path ) +/** + * Output greeting + * + * The greeting contains the program name, version, configuration path, + * and PID file path. + * + * @param name - program name + * @param pid_path - path to PID file + */ +function greet( name: string, pid_path: string ): void { console.log( `${name} (liza-${version})`); console.log( `Server configuration: ${conf_path}` ); diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json index da222bb..523bb7b 100644 --- a/conf/vanilla-server.json +++ b/conf/vanilla-server.json @@ -62,6 +62,7 @@ "vhost": "/", "queueName": "postrate" } + }, "c1export": { "host": "localhost", diff --git a/configure.ac b/configure.ac index 1f1675f..3a0fbc5 100644 --- a/configure.ac +++ b/configure.ac @@ -1,6 +1,6 @@ ## For use my automake and autoconf # -# Copyright (C) 2014--2017 R-T Specialty, LLC. +# Copyright (C) 2010-2019 R-T Specialty, LLC. # # This file is part of liza. # @@ -88,6 +88,8 @@ AC_CONFIG_FILES([Makefile package.json src/version.js]) AC_CONFIG_FILES([bin/server], [chmod +x bin/server]) +AC_CONFIG_FILES([bin/delta-processor], + [chmod +x bin/delta-processor]) AC_OUTPUT diff --git a/package.json.in b/package.json.in index c0db61e..f7bed5e 100644 --- a/package.json.in +++ b/package.json.in @@ -16,7 +16,8 @@ }, "bin": { - "liza-server": "bin/server" + "liza-server": "bin/server", + "delta-processor": "bin/delta-processor" }, "scripts": { @@ -24,13 +25,14 @@ }, "dependencies": { - "easejs": "0.2.x", - "mongodb": "1.2.14", - "amqplib": "0.5.3" + "easejs": "0.2.x", + "mongodb": "1.2.14", + "dotenv-flow": "3.1.0", + "amqplib": "0.5.3" }, "devDependencies": { "typescript": "~3.7", - "@types/node": "@TS_NODE_VERSION@", + "@types/node": "12.12.11", "chai": ">=1.9.1 < 4", "@types/chai": ">=1.9.1 < 4", "chai-as-promised": "7.1.0", @@ -38,7 +40,10 @@ "mocha": "5.2.0", "@types/mocha": "5.2.0", "sinon": ">=1.17.4", - "es6-promise": "~3" + "es6-promise": "~3", + "@types/amqplib": "0.5.13", + "avro-js": "1.9.1", + "prom-client": "11.0.0" }, "licenses": [ diff --git a/src/bucket/delta.ts b/src/bucket/delta.ts index 58c75ec..7c5dd6a 100644 --- a/src/bucket/delta.ts +++ b/src/bucket/delta.ts @@ -18,14 +18,21 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +import { DocumentId } from '../document/Document'; + /** The data structure expected for a document's internal key/value store */ export type Kv<T = any> = Record<string, T[]>; + /** Possible delta values for Kv array indexes */ export type DeltaDatum<T> = T | null | undefined; +/** Possible delta types */ +export type DeltaType = 'ratedata' | 'data'; + + /** * The constructor type for a delta generating function * @@ -44,7 +51,63 @@ export type DeltaConstructor<T = any, U extends Kv<T> = Kv<T>, V extends Kv<T> = export type DeltaResult<T> = { [K in keyof T]: DeltaDatum<T[K]> | null }; - /** +/** Complete delta type */ +export type Delta<T> = { + type: DeltaType, + timestamp: UnixTimestamp, + data: DeltaResult<T>, +} + + +/** Reverse delta type */ +export type ReverseDelta<T> = { + data: Delta<T>[], + ratedata: Delta<T>[], +} + + +/** Structure for Published delta count */ +export type PublishDeltaCount = { + data?: number, + ratedata?: number, +} + + +/** + * Document structure + */ +export interface DeltaDocument +{ + /** The document id */ + id: DocumentId, + + /** The entity name */ + agentName: string, + + /** The entity id */ + agentEntityId: number, + + /** The time the document was created */ + startDate: UnixTimestamp, + + /** The time the document was updated */ + lastUpdate: UnixTimestamp, + + /** The data bucket */ + data: Record<string, any>, + + /** The rate data bucket */ + ratedata?: Record<string, any>, + + /** The calculated reverse deltas */ + rdelta?: ReverseDelta<any>, + + /** A count of how many of each delta type have been processed */ + totalPublishDelta?: PublishDeltaCount, +}; + + +/** * Create delta to transform from src into dest * * @param src - the source data set @@ -98,12 +161,114 @@ export function createDelta<T, U extends Kv<T>, V extends Kv<T>>( /** + * Apply a delta to a bucket + * + * @param bucket - The bucket data + * @param delta - The delta to apply + * + * @return the bucket with the delta applied + */ +export function applyDelta<T, U extends Kv<T>, V extends Kv<T>>( + bucket: U = <U>{}, + delta: DeltaResult<U & V>, +): U +{ + const appliedDelta: DeltaResult<any> = {}; + + if( !delta ) + { + return bucket; + } + + // Loop through all keys + const key_set = new Set( + Object.keys( bucket ).concat( Object.keys( delta ) ) ); + + key_set.forEach( key => + { + const bucket_data = bucket[ key ]; + const delta_data = delta[ key ]; + + // If bucket does not contain the key, use entire delta data + if ( !bucket_data || !bucket_data.length ) + { + appliedDelta[ key ] = delta_data; + + return; + } + + // If delta does not contain the key then retain bucket data + if ( delta_data === null ) + { + return; + } + + // If delta does not contain the key then retain bucket data + if ( delta_data === undefined ) + { + appliedDelta[ key ] = bucket_data; + + return; + } + + // If neither condition above is true then create the key iteratively + appliedDelta[ key ] = _applyDeltaKey( bucket_data, delta_data ); + } ); + + return <U>appliedDelta; +} + + +/** + * Apply the delta key iteratively + * + * @param bucket - The bucket data array + * @param delta - The delta data array + * + * @return the applied delta + */ +function _applyDeltaKey<T>( + bucket: T[], + delta: T[], +): DeltaDatum<T>[] +{ + const data = []; + const max_size = Math.max( delta.length, bucket.length ); + + for ( let i = 0; i < max_size; i++ ) + { + const delta_datum = delta[ i ]; + const bucket_datum = bucket[ i ]; + + if ( delta_datum === null ) + { + break; + } + else if ( delta_datum === undefined ) + { + data[ i ] = bucket_datum; + } + else if ( _deepEqual( delta_datum, bucket_datum ) ) + { + data[ i ] = bucket_datum; + } + else + { + data[ i ] = delta_datum; + } + } + + return data; +} + + +/** * Build the delta key iteratively * * @param src - the source data array * @param dest - the destination data array * - * @return an object with an identical flag and a data array + * @return an object with an changed flag and a data array */ function _createDeltaKey<T>( src: T[], diff --git a/src/conf/ConfLoader.js b/src/conf/ConfLoader.ts index 17f26ed..7d556e5 100644 --- a/src/conf/ConfLoader.js +++ b/src/conf/ConfLoader.ts @@ -19,9 +19,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -'use strict'; - -const { Class } = require( 'easejs' ); +import { readFile } from "fs"; +import { Store } from "../store/Store"; /** @@ -35,36 +34,22 @@ const { Class } = require( 'easejs' ); * TODO: Merging multiple configuration files would be convenient for * modular configuration. */ -module.exports = Class( 'ConfLoader', +export class ConfLoader { /** - * Filesystem module - * @type {fs} - */ - 'private _fs': null, - - /** - * Store object constructor - * @type {function():Store} - */ - 'private _storeCtor': null, - - - /** * Initialize with provided filesystem module and Store constructor * * The module should implement `#readFile` compatible with * Node.js'. The Store constructor `store_ctor` is used to instantiate * new stores to be populated with configuration data. * - * @param {fs} fs filesystem module - * @param {function():Store} store_ctor Store object constructor + * @param fs - filesystem module + * @param store_ctor - Store object constructor */ - constructor( fs, store_ctor ) - { - this._fs = fs; - this._storeCtor = store_ctor; - }, + constructor( + private _fs: { readFile: typeof readFile }, + private _storeCtor: () => Store, + ) {} /** @@ -72,11 +57,11 @@ module.exports = Class( 'ConfLoader', * * A Store will be produced, populated with the configuration data. * - * @param {string} filename path to configuration JSON + * @param filename - path to configuration JSON * - * @return {Promise.<Store>} a promise of a populated Store + * @return a promise of a populated Store */ - 'public fromFile'( filename ) + fromFile( filename: string ): Promise<Store> { return new Promise( ( resolve, reject ) => { @@ -104,7 +89,7 @@ module.exports = Class( 'ConfLoader', } } ); } ); - }, + } /** @@ -112,12 +97,12 @@ module.exports = Class( 'ConfLoader', * * Parses configuration string as JSON. * - * @param {string} data raw configuration data + * @param data raw configuration data * - * @return {Promise.<Object>} `data` parsed as JSON + * @return `data` parsed as JSON */ - 'virtual protected parseConfData'( data ) + protected parseConfData( data: string ): Promise<any> { return Promise.resolve( JSON.parse( data ) ); - }, -} ); + } +} diff --git a/src/conf/ConfStore.d.ts b/src/conf/ConfStore.d.ts new file mode 100644 index 0000000..8140736 --- /dev/null +++ b/src/conf/ConfStore.d.ts @@ -0,0 +1,32 @@ +/** + * Ideal Store for system configuration + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { Store } from "../store/Store"; + +/** + * A store that recursively instantiates itself + * + * This store is ideal for nested configurations, and handles cases where + * configuration might be asynchronously retrieved. Nested values may be + * retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see + * trait `DelimitedKey` for more information and examples. + */ +export declare function ConfStore(): Store; diff --git a/src/conf/ConfStore.js b/src/conf/ConfStore.js index 0c0569f..d51ad38 100644 --- a/src/conf/ConfStore.js +++ b/src/conf/ConfStore.js @@ -36,7 +36,7 @@ const { * retrieved by delimiting the key with `.` (e.g. `foo.bar.baz`); see * trait `DelimitedKey` for more information and examples. */ -module.exports = function ConfStore() +exports.ConfStore = function ConfStore() { return MemoryStore .use( AutoObjectStore( ConfStore ) ) diff --git a/src/document/Document.ts b/src/document/Document.ts index 0db893a..8f05bac 100644 --- a/src/document/Document.ts +++ b/src/document/Document.ts @@ -18,7 +18,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. * - * The term "Quote" is synonymous with "Document"; this project is moving + * The term 'Quote' is synonymous with 'Document'; this project is moving * more toward the latter as it is further generalized. */ @@ -31,7 +31,29 @@ export type DocumentId = NominalType<number, 'DocumentId'>; /** * Quote (Document) id * - * Where the term "Quote" is still used, this will allow for type + * Where the term 'Quote' is still used, this will allow for type * compatibility and an easy transition. */ export type QuoteId = DocumentId; + + +/** + * Document meta data + */ +export type DocumentMeta = +{ + /** The document id */ + id: DocumentId, + + /** The entity name */ + entity_name: string, + + /** The entity id */ + entity_id: number, + + /** The time the document was created */ + startDate: UnixTimestamp, + + /** The time the document was updated */ + lastUpdate: UnixTimestamp, +}
\ No newline at end of file diff --git a/src/error/AmqpError.ts b/src/error/AmqpError.ts new file mode 100644 index 0000000..8bf308e --- /dev/null +++ b/src/error/AmqpError.ts @@ -0,0 +1,27 @@ +/** + * Amqp error + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * This still uses ease.js because it does a good job of transparently + * creating Error subtypes. + */ + +const { Class } = require( 'easejs' ); + +export const AmqpError = Class( 'AmqpError' ).extend( Error, {} ); diff --git a/src/error/DaoError.ts b/src/error/DaoError.ts new file mode 100644 index 0000000..2939b69 --- /dev/null +++ b/src/error/DaoError.ts @@ -0,0 +1,27 @@ +/** + * Dao error + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * This still uses ease.js because it does a good job of transparently + * creating Error subtypes. + */ + +const { Class } = require( 'easejs' ); + +export const DaoError = Class( 'DaoError' ).extend( Error, {} ); diff --git a/src/quote/BaseQuote.d.ts b/src/quote/BaseQuote.d.ts index f2271f4..eb2204a 100644 --- a/src/quote/BaseQuote.d.ts +++ b/src/quote/BaseQuote.d.ts @@ -24,6 +24,7 @@ import { Program } from "../program/Program"; import { Quote, QuoteId } from "./Quote"; import { QuoteDataBucket } from "../bucket/QuoteDataBucket"; +import { PositiveInteger } from "../numeric"; export declare class BaseQuote implements Quote @@ -98,5 +99,55 @@ export declare class BaseQuote implements Quote * * @return the data bucket */ - getBucket(): QuoteDataBucket + getBucket(): QuoteDataBucket; + + + /** + * Retrieves the reason for an explicit lock + * + * @return lock reason + */ + getExplicitLockReason(): string; + + + /** + * Returns the maximum step to which the explicit lock applies + * + * If no step restriction is set, then 0 will be returned. + * + * @return {number} locked max step or 0 if not applicable + */ + getExplicitLockStep(): PositiveInteger; + + + /** + * Returns whether the quote has been imported + * + * @return true if imported, otherwise false + */ + isImported(): boolean; + + + /** + * Returns whether the quote has been bound + * + * @return true if bound, otherwise false + */ + isBound(): boolean; + + + /** + * Returns the id of the highest step the quote has reached + * + * @return top visited step id + */ + getTopVisitedStepId(): PositiveInteger; + + + /** + * Returns the id of the highest step the quote has saved + * + * @return top saved step id + */ + getTopSavedStepId(): PositiveInteger; } diff --git a/src/server/daemon/controller.js b/src/server/daemon/controller.js index 06b7a5e..116ed39 100644 --- a/src/server/daemon/controller.js +++ b/src/server/daemon/controller.js @@ -69,7 +69,7 @@ const { DocumentServer, db: { - MongoServerDao, + MongoServerDao: { MongoServerDao }, }, lock: { @@ -126,8 +126,8 @@ exports.post_rate_publish = {}; exports.init = function( logger, enc_service, conf ) { - var db = _createDB( logger ); - const dao = MongoServerDao( db ); + var db = _createDB( logger ); + const dao = new MongoServerDao( db ); db.collection( 'quotes', function( err, collection ) { diff --git a/src/server/db/MongoServerDao.js b/src/server/db/MongoServerDao.ts index db6140e..3f8128c 100644 --- a/src/server/db/MongoServerDao.js +++ b/src/server/db/MongoServerDao.ts @@ -19,83 +19,56 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -var Class = require( 'easejs' ).Class, - EventEmitter = require( '../../events' ).EventEmitter, - ServerDao = require( './ServerDao' ).ServerDao; +import { ServerDao, Callback } from "./ServerDao"; +import { MongoCollection, MongoUpdate, MongoDb } from "mongodb"; +import { PositiveInteger } from "../../numeric"; +import { ServerSideQuote } from "../quote/ServerSideQuote"; +import { QuoteId } from "../../document/Document"; +import { WorksheetData } from "../rater/Rater"; +const EventEmitter = require( 'events' ).EventEmitter; + +type ErrorCallback = ( err: NullableError ) => void; /** * Uses MongoDB as a data store */ -module.exports = Class( 'MongoServerDao' ) - .implement( ServerDao ) - .extend( EventEmitter, +export class MongoServerDao extends EventEmitter implements ServerDao { - /** - * Collection used to store quotes - * @type String - */ - 'const COLLECTION': 'quotes', + /** Collection used to store quotes */ + readonly COLLECTION: string = 'quotes'; - /** - * Sequence (auto-increment) collection - * @type {string} - */ - 'const COLLECTION_SEQ': 'seq', + /** Sequence (auto-increment) collection */ + readonly COLLECTION_SEQ: string = 'seq'; - /** - * Sequence key for quote ids - * - * @type {string} - * @const - */ - 'const SEQ_QUOTE_ID': 'quoteId', + /** Sequence key for quote ids */ + readonly SEQ_QUOTE_ID: string = 'quoteId'; - /** - * Sequence quoteId default - * - * @type {number} - * @const - */ - 'const SEQ_QUOTE_ID_DEFAULT': 200000, + /** Sequence quoteId default */ + readonly SEQ_QUOTE_ID_DEFAULT: number = 200000; - /** - * Database instance - * @type Mongo.Db - */ - 'private _db': null, + /** Whether the DAO is initialized and ready to be used */ + private _ready: boolean = false; - /** - * Whether the DAO is initialized and ready to be used - * @type Boolean - */ - 'private _ready': false, + /** Collection to save data to */ + private _collection?: MongoCollection | null; - /** - * Collection to save data to - * @type null|Collection - */ - 'private _collection': null, - - /** - * Collection to read sequences (auto-increments) from - * @type {null|Collection} - */ - 'private _seqCollection': null, + /** Collection to read sequences (auto-increments) from */ + private _seqCollection?: MongoCollection | null; /** * Initializes DAO * * @param {Mongo.Db} db mongo database connection - * - * @return undefined */ - 'public __construct': function( db ) + constructor( + private readonly _db: MongoDb + ) { - this._db = db; - }, + super(); + } /** @@ -108,12 +81,12 @@ module.exports = Class( 'MongoServerDao' ) * * @return MongoServerDao self to allow for method chaining */ - 'public init': function( callback ) + init( callback: () => void ): this { var dao = this; // map db error event (on connection error) to our connectError event - this._db.on( 'error', function( err ) + this._db.on( 'error', function( err: Error ) { dao._ready = false; dao._collection = null; @@ -123,7 +96,7 @@ module.exports = Class( 'MongoServerDao' ) this.connect( callback ); return this; - }, + } /** @@ -136,12 +109,12 @@ module.exports = Class( 'MongoServerDao' ) * * @return MongoServerDao self to allow for method chaining */ - 'public connect': function( callback ) + connect( callback: () => void ): this { var dao = this; // attempt to connect to the database - this._db.open( function( err, db ) + this._db.open( function( err: any, db: any ) { // if there was an error, don't bother with anything else if ( err ) @@ -176,84 +149,97 @@ module.exports = Class( 'MongoServerDao' ) } // quotes collection - db.collection( dao.__self.$('COLLECTION'), function( err, collection ) - { - // for some reason this gets called more than once - if ( collection == null ) - { - return; - } - - // initialize indexes - collection.createIndex( - [ ['id', 1] ], - true, - function( err, index ) + db.collection( + dao.COLLECTION, + function( + _err: any, + collection: MongoCollection, + ) { + // for some reason this gets called more than once + if ( collection == null ) { - // mark the DAO as ready to be used - dao._collection = collection; - check_ready(); + return; } - ); - }); - - // seq collection - db.collection( dao.__self.$('COLLECTION_SEQ'), function( err, collection ) - { - if ( err ) - { - dao.emit( 'seqError', err ); - return; - } - if ( collection == null ) - { - return; + // initialize indexes + collection.createIndex( + [ ['id', 1] ], + true, + function( + _err: NullableError, + _index: { [P: string]: any, + } ) + { + // mark the DAO as ready to be used + dao._collection = collection; + check_ready(); + } + ); } + ); - dao._seqCollection = collection; + // seq collection + db.collection( + dao.COLLECTION_SEQ, + function( + err: Error, + collection: MongoCollection, + ) { + if ( err ) + { + dao.emit( 'seqError', err ); + return; + } - // has the sequence we'll be referencing been initialized? - collection.find( - { _id: dao.__self.$('SEQ_QUOTE_ID') }, - { limit: 1 }, - function( err, cursor ) + if ( collection == null ) { - if ( err ) - { - dao.initQuoteIdSeq( check_ready ) - return; - } + return; + } + + dao._seqCollection = collection; - cursor.toArray( function( err, data ) + // has the sequence we'll be referencing been initialized? + collection.find( + { _id: dao.SEQ_QUOTE_ID }, + { limit: <PositiveInteger>1 }, + function( err: NullableError, cursor ) { - if ( data.length == 0 ) + if ( err ) { - dao.initQuoteIdSeq( check_ready ); + dao._initQuoteIdSeq( check_ready ) return; } - check_ready(); - }); - } - ); - }); + cursor.toArray( function( _err: Error, data: any[] ) + { + if ( data.length == 0 ) + { + dao._initQuoteIdSeq( check_ready ); + return; + } + + check_ready(); + }); + } + ); + } + ); }); return this; - }, + } - 'public initQuoteIdSeq': function( callback ) + private _initQuoteIdSeq( callback: () => void ) { var dao = this; - this._seqCollection.insert( + this._seqCollection!.insert( { - _id: this.__self.$('SEQ_QUOTE_ID'), - val: this.__self.$('SEQ_QUOTE_ID_DEFAULT'), + _id: this.SEQ_QUOTE_ID, + val: this.SEQ_QUOTE_ID_DEFAULT, }, - function( err, docs ) + function( err: NullableError, _docs: any ) { if ( err ) { @@ -261,11 +247,11 @@ module.exports = Class( 'MongoServerDao' ) return; } - dao.emit( 'seqInit', this.__self.$('SEQ_QUOTE_ID') ); - callback.call( this ); + dao.emit( 'seqInit', dao.SEQ_QUOTE_ID ); + callback.call( dao ); } ); - }, + } /** @@ -281,15 +267,17 @@ module.exports = Class( 'MongoServerDao' ) * @param Function failure_callback function to call if save fails * @param Object save_data quote data to save (optional) * @param Object push_data quote data to push (optional) - * - * @return MongoServerDao self to allow for method chaining */ - 'public saveQuote': function( - quote, success_callback, failure_callback, save_data, push_data - ) + saveQuote( + quote: ServerSideQuote, + success_callback: Callback, + failure_callback: Callback, + save_data?: any, + push_data?: any, + ): this { - var dao = this; - var meta = {}; + var dao = this; + var meta: Record<string, any> = {}; // if we're not ready, then we can't save the quote! if ( this._ready === false ) @@ -301,7 +289,7 @@ module.exports = Class( 'MongoServerDao' ) ); failure_callback.call( this, quote ); - return; + return dao; } if ( save_data === undefined ) @@ -321,6 +309,7 @@ module.exports = Class( 'MongoServerDao' ) save_data.id = id; save_data.pver = quote.getProgramVersion(); save_data.importDirty = 1; + save_data.published = false; save_data.lastPremDate = quote.getLastPremiumDate(); save_data.initialRatedDate = quote.getRatedDate(); save_data.explicitLock = quote.getExplicitLockReason(); @@ -349,14 +338,14 @@ module.exports = Class( 'MongoServerDao' ) // update the quote data if it already exists (same id), otherwise // insert it - this._collection.update( { id: id }, + this._collection!.update( { id: id }, document, // create record if it does not yet exist { upsert: true }, // on complete - function( err, docs ) + function( err, _docs ) { // if an error occurred, then we cannot continue if ( err ) @@ -381,7 +370,7 @@ module.exports = Class( 'MongoServerDao' ) ); return this; - }, + } /** @@ -391,21 +380,24 @@ module.exports = Class( 'MongoServerDao' ) * @param {Object} data quote data * @param {Function} scallback successful callback * @param {Function} fcallback failure callback - * - * @return {MongoServerDao} self */ - 'public mergeData': function( quote, data, scallback, fcallback ) + mergeData( + quote: ServerSideQuote, + data: MongoUpdate, + scallback: Callback, + fcallback: Callback, + ): this { // we do not want to alter the original data; use it as a prototype var update = data; // save the stack so we can track this call via the oplog var _self = this; - this._collection.update( { id: quote.getId() }, + this._collection!.update( { id: quote.getId() }, { '$set': update }, {}, - function( err, docs ) + function( err, _docs ) { if ( err ) { @@ -427,7 +419,7 @@ module.exports = Class( 'MongoServerDao' ) ); return this; - }, + } /** @@ -441,9 +433,14 @@ module.exports = Class( 'MongoServerDao' ) * * @return {MongoServerDao} self */ - 'public mergeBucket': function( quote, data, scallback, fcallback ) + mergeBucket( + quote: ServerSideQuote, + data: MongoUpdate, + success: Callback, + failure: Callback, + ): this { - var update = {}; + var update: MongoUpdate = {}; for ( var field in data ) { @@ -455,8 +452,8 @@ module.exports = Class( 'MongoServerDao' ) update[ 'data.' + field ] = data[ field ]; } - return this.mergeData( quote, update, scallback, fcallback ); - }, + return this.mergeData( quote, update, success, failure ); + } /** @@ -471,8 +468,10 @@ module.exports = Class( 'MongoServerDao' ) * * @return MongoServerDao self */ - 'public saveQuoteState': function( - quote, success_callback, failure_callback + saveQuoteState( + quote: ServerSideQuote, + success_callback: Callback, + failure_callback: Callback, ) { var update = { @@ -484,10 +483,15 @@ module.exports = Class( 'MongoServerDao' ) return this.mergeData( quote, update, success_callback, failure_callback ); - }, + } - 'public saveQuoteClasses': function( quote, classes, success, failure ) + saveQuoteClasses( + quote: ServerSideQuote, + classes: any, + success: Callback, + failure: Callback, + ) { return this.mergeData( quote, @@ -495,7 +499,7 @@ module.exports = Class( 'MongoServerDao' ) success, failure ); - }, + } /** @@ -511,9 +515,14 @@ module.exports = Class( 'MongoServerDao' ) * * @return {undefined} */ - 'public saveQuoteMeta'( quote, new_meta, success, failure ) + saveQuoteMeta( + quote: ServerSideQuote, + new_meta: any, + success: Callback, + failure: Callback, + ): void { - const update = {}; + const update: MongoUpdate = {}; for ( var key in new_meta ) { @@ -521,13 +530,12 @@ module.exports = Class( 'MongoServerDao' ) for ( var i in meta ) { - update[ 'meta.' + key + '.' + i ] = - new_meta[ key ][ i ]; + update[ 'meta.' + key + '.' + i ] = new_meta[ key ][ i ]; } } this.mergeData( quote, update, success, failure ); - }, + } /** @@ -539,13 +547,20 @@ module.exports = Class( 'MongoServerDao' ) * * @return MongoServerDao self */ - 'public saveQuoteLockState': function( - quote, success_callback, failure_callback - ) + saveQuoteLockState( + quote: ServerSideQuote, + success_callback: Callback, + failure_callback: Callback, + ): this { // lock state is saved by default - return this.saveQuote( quote, success_callback, failure_callback, {} ); - }, + return this.saveQuote( + quote, + success_callback, + failure_callback, + {} + ); + } /** @@ -556,16 +571,19 @@ module.exports = Class( 'MongoServerDao' ) * * @return MongoServerDao self to allow for method chaining */ - 'public pullQuote': function( quote_id, callback ) + pullQuote( + quote_id: PositiveInteger, + callback: ( data: Record<string, any> | null ) => void + ): this { var dao = this; // XXX: TODO: Do not read whole of record into memory; filter out // revisions! - this._collection.find( { id: quote_id }, { limit: 1 }, - function( err, cursor ) + this._collection!.find( { id: quote_id }, { limit: <PositiveInteger>1 }, + function( _err, cursor ) { - cursor.toArray( function( err, data ) + cursor.toArray( function( _err: NullableError, data: any[] ) { // was the quote found? if ( data.length == 0 ) @@ -581,27 +599,28 @@ module.exports = Class( 'MongoServerDao' ) ); return this; - }, + } - 'public getMinQuoteId': function( callback ) + getMinQuoteId( callback: ( min_id: number ) => void ): this { // just in case it's asynchronous later on - callback.call( this, this.__self.$('SEQ_QUOTE_ID_DEFAULT') ); + callback.call( this, this.SEQ_QUOTE_ID_DEFAULT ); + return this; - }, + } - 'public getMaxQuoteId': function( callback ) + getMaxQuoteId( callback: ( min_id: number ) => void ): void { var dao = this; - this._seqCollection.find( - { _id: this.__self.$('SEQ_QUOTE_ID') }, - { limit: 1 }, - function( err, cursor ) + this._seqCollection!.find( + { _id: this.SEQ_QUOTE_ID }, + { limit: <PositiveInteger>1 }, + function( _err, cursor ) { - cursor.toArray( function( err, data ) + cursor.toArray( function( _err: NullableError, data: any[] ) { if ( data.length == 0 ) { @@ -614,15 +633,15 @@ module.exports = Class( 'MongoServerDao' ) }); } ); - }, + } - 'public getNextQuoteId': function( callback ) + getNextQuoteId( callback: ( quote_id: number ) => void ): this { var dao = this; - this._seqCollection.findAndModify( - { _id: this.__self.$('SEQ_QUOTE_ID') }, + this._seqCollection!.findAndModify( + { _id: this.SEQ_QUOTE_ID }, [ [ 'val', 'descending' ] ], { $inc: { val: 1 } }, { 'new': true }, @@ -643,7 +662,7 @@ module.exports = Class( 'MongoServerDao' ) ); return this; - }, + } /** @@ -654,13 +673,16 @@ module.exports = Class( 'MongoServerDao' ) * model of storing the deltas in previous revisions and the whole of the * bucket in the most recently created revision). */ - 'public createRevision': function( quote, callback ) + createRevision( + quote: ServerSideQuote, + callback: ErrorCallback, + ): void { var _self = this, qid = quote.getId(), data = quote.getBucket().getData(); - this._collection.update( { id: qid }, + this._collection!.update( { id: qid }, { '$push': { revisions: { data: data } } }, // create record if it does not yet exist @@ -678,20 +700,24 @@ module.exports = Class( 'MongoServerDao' ) return; } ); - }, + } - 'public getRevision': function( quote, revid, callback ) + getRevision( + quote: ServerSideQuote, + revid: PositiveInteger, + callback: ErrorCallback, + ): void { - revid = +revid; + revid = <PositiveInteger>+revid; // XXX: TODO: Filter out all but the revision we want - this._collection.find( + this._collection!.find( { id: quote.getId() }, - { limit: 1 }, - function( err, cursor ) + { limit: <PositiveInteger>1 }, + function( _err, cursor ) { - cursor.toArray( function( err, data ) + cursor.toArray( function( _err: NullableError, data: any[] ) { // was the quote found? if ( ( data.length === 0 ) @@ -707,12 +733,16 @@ module.exports = Class( 'MongoServerDao' ) }); } ); - }, + } - 'public setWorksheets': function( qid, data, callback ) + setWorksheets( + qid: QuoteId, + data: MongoUpdate, + callback: NodeCallback<void>, + ): void { - this._collection.update( { id: qid }, + this._collection!.update( { id: qid }, { '$set': { worksheets: { data: data } } }, // create record if it does not yet exist @@ -725,17 +755,22 @@ module.exports = Class( 'MongoServerDao' ) return; } ); - }, + } - 'public getWorksheet': function( qid, supplier, index, callback ) + getWorksheet( + qid: QuoteId, + supplier: string, + index: PositiveInteger, + callback: ( data: WorksheetData | null ) => void, + ): void { - this._collection.find( + this._collection!.find( { id: qid }, - { limit: 1 }, - function( err, cursor ) + { limit: <PositiveInteger>1 }, + function( _err, cursor ) { - cursor.toArray( function( err, data ) + cursor.toArray( function( _err: NullableError, data: any[] ) { // was the quote found? if ( ( data.length === 0 ) @@ -750,74 +785,8 @@ module.exports = Class( 'MongoServerDao' ) // return the quote data callback( data[ 0 ].worksheets.data[ supplier ][ index ] ); - }); - } - ); - }, - - - /** - * Set arbitrary data on a document - * - * @param {number} qid quote/document id - * @param {string} key field key - * @param {*} value field value - * @param {function(?Error)} callback completion callback - * - * @return {undefined} - */ - 'public setDocumentField'( qid, key, value, callback ) - { - this._collection.update( - { id: qid }, - { '$set': { [key]: value } }, - - // create record if it does not yet exist - { upsert: true }, - - // on complete - function( err ) - { - callback && callback( err ); - return; - } - ); - }, - - - /** - * Retrieve arbitrary data on a document - * - * @param {number} qid quote/document id - * @param {string} key field key - * @param {function(?Error)} callback completion callback - * - * @return {undefined} - */ - 'public getDocumentField'( qid, key, callback ) - { - this._collection.find( - { id: qid }, - { limit: 1 }, - function( err, cursor ) - { - if ( err !== null ) - { - callback( err, null ); - return; - } - - cursor.toArray( function( err, data ) - { - if ( err !== null ) - { - callback( err, null ); - return; - } - - callback( null, ( data[ 0 ] || {} )[ key ] ); } ); } ); - }, -} ); + } +}; diff --git a/src/server/db/ServerDao.d.ts b/src/server/db/ServerDao.d.ts index 6cc8025..59228c3 100644 --- a/src/server/db/ServerDao.d.ts +++ b/src/server/db/ServerDao.d.ts @@ -131,7 +131,7 @@ export interface ServerDao qid: QuoteId, data: WorksheetData, callback: NodeCallback<void>, - ): this; + ): void; /** @@ -147,5 +147,5 @@ export interface ServerDao supplier: string, index: PositiveInteger, callback: ( data: WorksheetData | null ) => void, - ): this; + ): void; } diff --git a/src/server/log/Log.js b/src/server/log/Log.js index 85a7f57..ffede67 100644 --- a/src/server/log/Log.js +++ b/src/server/log/Log.js @@ -116,7 +116,7 @@ module.exports = Class( 'Log', if ( this._fd !== null ) { var buffer = new Buffer( sprintf.apply( this, args ) + "\n" ); - fs.write( this._fd, buffer, 0, buffer.length, null ); + fs.writeSync( this._fd, buffer, 0, buffer.length, null ); } return this; diff --git a/src/server/quote/ServerSideQuote.d.ts b/src/server/quote/ServerSideQuote.d.ts index c8bdf2f..1e00d3f 100644 --- a/src/server/quote/ServerSideQuote.d.ts +++ b/src/server/quote/ServerSideQuote.d.ts @@ -68,4 +68,20 @@ export declare class ServerSideQuote extends BaseQuote * @return rating data */ getRatingData(): Record<string, any>; + + + /** + * Metadata bucket + * + * @return the metadata bucket + */ + getMetabucket(): QuoteDataBucket; + + + /** + * Get the program version + * + * @return program version + */ + getProgramVersion(): string; } diff --git a/src/server/rater/DslRaterContext.js b/src/server/rater/DslRaterContext.js index e5113b8..79dfaef 100644 --- a/src/server/rater/DslRaterContext.js +++ b/src/server/rater/DslRaterContext.js @@ -28,6 +28,12 @@ module.exports = Class( 'DslRaterContext' ) .extend( EventEmitter, { /** + * TODO: Remove workaround for bug extending class across + * multiple easejs instances + */ + 'public _events': {}, + + /** * Hash of classes that will result in a global submit * @type {Object} */ diff --git a/src/server/request/UserSession.d.ts b/src/server/request/UserSession.d.ts index 01937d7..f7d08ae 100644 --- a/src/server/request/UserSession.d.ts +++ b/src/server/request/UserSession.d.ts @@ -20,6 +20,9 @@ */ +import { PositiveInteger } from "../../numeric"; + + /** * Session management */ @@ -31,4 +34,28 @@ export declare class UserSession * @return true if internal user, otherwise false */ isInternal(): boolean; + + + /** + * Gets the agent id, if available + * + * @return agent id or undefined if unavailable + */ + agentId(): PositiveInteger | undefined + + + /** + * Gets the broker entity id, if available + * + * @return agent entity id or undefined if unavailable + */ + agentEntityId(): PositiveInteger | undefined + + + /** + * Gets the agent name, if available + * + * @return agent name or undefined if unavailable + */ + agentName(): string | undefined } diff --git a/src/server/service/RatingService.ts b/src/server/service/RatingService.ts index ea4667e..83cbd10 100644 --- a/src/server/service/RatingService.ts +++ b/src/server/service/RatingService.ts @@ -433,7 +433,7 @@ export class RatingService } } - this._dao.setWorksheets( qid, worksheets, ( err: Error | null ) => + this._dao.setWorksheets( qid, worksheets, ( err: NullableError ) => { if ( err ) { diff --git a/src/server/token/MongoTokenDao.ts b/src/server/token/MongoTokenDao.ts index ee90d7a..5d3dede 100644 --- a/src/server/token/MongoTokenDao.ts +++ b/src/server/token/MongoTokenDao.ts @@ -34,6 +34,8 @@ import { DocumentId } from "../../document/Document"; import { TokenId, TokenNamespace, TokenState } from "./Token"; import { UnknownTokenError } from "./UnknownTokenError"; import { context } from "../../error/ContextError"; +import { MongoCollection } from "mongodb"; + /** @@ -118,7 +120,7 @@ export class MongoTokenDao implements TokenDao }, }, - ( err: Error|null, prev_data ) => + ( err: NullableError, prev_data ) => { if ( err ) { @@ -250,7 +252,7 @@ export class MongoTokenDao implements TokenDao this._collection.findOne( { id: +doc_id }, { fields: fields }, - ( err: Error|null, data: TokenQueryResult ) => + ( err: NullableError, data: TokenQueryResult ) => { if ( err || !data ) { diff --git a/src/store/Store.d.ts b/src/store/Store.d.ts new file mode 100644 index 0000000..0b62417 --- /dev/null +++ b/src/store/Store.d.ts @@ -0,0 +1,114 @@ +/** + * Generic key/value store + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework + * + * Liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** Store key type */ +type K = string; + + +/** + * Generic key/value store with bulk clear + * + * @todo There's a lot of overlap between this concept and that of the + * Bucket. Maybe have the Bucket layer atop of simple Store + * interface as a step toward a new, simpler Bucket + * implementation. This was not implemented atop of the Bucket + * interface because its haphazard implementation would + * overcomplicate this. + */ +export interface Store<T = any> +{ + /** + * Add item to store under `key` with value `value` + * + * The promise will be fulfilled with an object containing the + * `key` and `value` added to the store; this is convenient for + * promises. + * + * @param key - store key + * @param value - value for key + * + * @return promise to add item to store, resolving to self (for + * chaining) + */ + add( key: K, value: T ): Promise<Store>; + + + /** + * Populate store with each element in object `obj` + * + * This is simply a convenient way to call `#add` for each element in an + * object. This does directly call `#add`, so overriding that method + * will also affect this one. + * + * If the intent is to change the behavior of what happens when an item + * is added to the store, override the `#add` method instead of this one + * so that it affects _all_ adds, not just calls to this method. + * + * @param obj - object with which to populate store + * + * @return array of #add promises + */ + populate( obj: Record<K, T> ): Promise<Store>[]; + + + /** + * Retrieve item from store under `key` + * + * The promise will be rejected if the key is unavailable. + * + * @param key - store key + * + * @return promise for the key value + */ + get( key: K ): Promise<T>; + + + /** + * Clear all items in store + * + * @return promise to clear store, resolving to self (for chaining) + */ + clear(): Promise<Store>; + + + /** + * Fold (reduce) all stored values + * + * This provides a way to iterate through all stored values and + * their keys while providing a useful functional result (folding). + * + * The order of folding is undefined. + * + * The ternary function `callback` is of the same form as + * {@link Array#fold}: the first argument is the value of the + * accumulator (initialized to the value of `initial`; the second + * is the stored item; and the third is the key of that item. + * + * @param callback - folding function + * @param initial - initial value for accumulator + * + * @return promise of a folded value (final accumulator value) + */ + reduce( + callback: ( accum: T, value: T, key: K ) => T, + initial: T, + ): Promise<T>; +} diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts new file mode 100644 index 0000000..f41ee63 --- /dev/null +++ b/src/system/AmqpPublisher.ts @@ -0,0 +1,111 @@ +/** + * Amqp Publisher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Publish Amqp message to a queue + */ + +import { DeltaResult } from '../bucket/delta'; +import { DocumentMeta } from '../document/Document'; +import { Options } from 'amqplib'; + + +/** + * Create an amqp configuration from the environment + * + * @param env - the environment variables + * + * @return the amqp configuration + */ +export function createAmqpConfig( env: NodeJS.ProcessEnv ): AmqpConfig +{ + return <AmqpConfig>{ + protocol: 'amqp', + hostname: env.AMQP_HOST, + port: +( env.AMQP_PORT || 0 ), + username: env.AMQP_USER, + password: env.AMQP_PASS, + locale: 'en_US', + frameMax: +( env.AMQP_FRAMEMAX || 0 ), + heartbeat: +( env.AMQP_HEARTBEAT || 0 ), + vhost: env.AMQP_VHOST, + exchange: env.AMQP_EXCHANGE, + retries: env.AMQP_RETRIES || 30, + retry_wait: env.AMQP_RETRY_WAIT || 1000, + }; +} + + +export interface AmqpConfig extends Options.Connect +{ + /** The protocol to connect with (should always be 'amqp') */ + protocol: string; + + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** A username if one if required */ + username?: string; + + /** A password if one if required */ + password?: string; + + /** Locale (should always be 'en_US') */ + locale: string; + + /** The size in bytes of the maximum frame allowed */ + frameMax: number; + + /** How often to check for a live connection */ + heartbeat: number; + + /** The virtual host we are on (e.g. live, demo, test) */ + vhost?: string; + + /** The name of a queue or exchange to publish to */ + exchange: string; + + /** The number of times to retry connecting */ + retries: number; + + /** The time to wait in between retries */ + retry_wait: number; +} + + +export interface AmqpPublisher +{ + /** + * Publish quote message to exchange post-rating + * + * @param meta - document meta data + * @param delta - delta + * @param bucket - bucket + * @param ratedata - rate data bucket + */ + publish( + meta: DocumentMeta, + delta: DeltaResult<any>, + bucket: Record<string, any>, + ratedata?: Record<string, any>, + ): Promise<void> +} diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts new file mode 100644 index 0000000..4eba003 --- /dev/null +++ b/src/system/DeltaProcessor.ts @@ -0,0 +1,300 @@ +/** + * Delta Processor + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { DeltaDao } from '../system/db/DeltaDao'; +import { DocumentMeta } from '../document/Document'; +import { AmqpPublisher } from './AmqpPublisher'; +import { EventEmitter } from 'events'; +import { + DeltaType, + applyDelta, + DeltaDocument, + Delta, + ReverseDelta, +} from '../bucket/delta'; + +/** Deltas and state of data prior to their application */ +type DeltaState = [ + Delta<any>, + Record<string, any>, + Record<string, any>, +]; + + +/** + * Process deltas for a quote and publish to a queue + * + * TODO: Decouple from applyDelta + */ +export class DeltaProcessor +{ + /** The ratedata delta type */ + readonly DELTA_RATEDATA: DeltaType = 'ratedata'; + + /** The data delta type */ + readonly DELTA_DATA: DeltaType = 'data'; + + + /** + * Initialize processor + * + * @param _dao - Delta dao + * @param _publisher - Amqp Publisher + * @param _emitter - Event emiter instance + */ + constructor( + private readonly _dao: DeltaDao, + private readonly _publisher: AmqpPublisher, + private readonly _emitter: EventEmitter, + ) {} + + + /** + * Process unpublished deltas + */ + process(): Promise<void> + { + return this._dao.getUnprocessedDocuments() + .then( docs => this._processNext( docs ) ); + } + + + /** + * Process the next document + * + * @param docs - list of documents to process + */ + private _processNext( docs: DeltaDocument[] ): Promise<void> + { + const doc = docs.shift(); + + if ( !doc ) + { + return Promise.resolve(); + } + + return this._processDocument( doc ) + .then( _ => this._processNext( docs ) ) + } + + + /** + * Process an individual document + * + * @param doc - individual document to process + */ + private _processDocument( doc: DeltaDocument ): Promise<void> + { + const deltas = this._getTimestampSortedDeltas( doc ); + const bucket = doc.data; + const ratedata = doc.ratedata || {}; + const meta = { + id: doc.id, + entity_name: doc.agentName, + entity_id: +doc.agentEntityId, + startDate: doc.startDate, + lastUpdate: doc.lastUpdate, + }; + + const history = this._applyDeltas( deltas, bucket, ratedata ); + + return this._processNextDelta( meta, history ) + .then( _ => + this._dao.markDocumentAsProcessed( meta.id, meta.lastUpdate ) + ) + .then( _ => + { + this._emitter.emit( 'document-processed', { doc_id: meta.id } ); + } ) + .catch( ( e: Error ) => + { + this._emitter.emit( 'error', e ); + return this._dao.setErrorFlag( meta.id ); + } ); + } + + + /** + * Produce states of buckets at each point in history + * + * For bucket data, each tuple will contain the state of the bucket + * prior to the corresponding delta having been applied. For rate data, + * the tuple will also contain the state of the bucket at the point of + * rating. + * + * @param deltas - deltas to apply + * @param bucket - current state of bucket prior to deltas + * @param ratedata - current state of ratedata prior to deltas + * + * @return deltas paired with state prior to its application + */ + private _applyDeltas( + deltas: Delta<any>[], + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): DeltaState[] + { + const pairs: DeltaState[] = []; + + let bucket_state = bucket; + let ratedata_state = ratedata; + let i = deltas.length; + + while ( i-- ) + { + let delta = deltas[ i ]; + + pairs[ i ] = [ + delta, + bucket_state, + ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {}, + ]; + + // Don't apply the final delta, since we won't use it + if ( i === 0 ) + { + break; + } + + if ( delta.type === this.DELTA_DATA ) + { + bucket_state = applyDelta( + Object.create( bucket_state ), + deltas[ i ].data, + ); + } + else + { + ratedata_state = applyDelta( + Object.create( ratedata_state ), + deltas[ i ].data, + ); + } + } + + return pairs; + } + + + /** + * Process the next delta from the history + * + * @param meta - document meta data + * @param history - a history of deltas and their buckets (data, ratedata) + */ + private _processNextDelta( + meta: DocumentMeta, + history: DeltaState[], + ): Promise<void> + { + if ( history.length === 0 ) + { + return Promise.resolve(); + } + + const [ delta, bucket, ratedata ] = history[ 0 ]; + + const delta_uid = meta.id + '_' + delta.timestamp + '_' + delta.type; + + this._emitter.emit( 'delta-process-start', delta_uid ); + + return this._publisher.publish( meta, delta, bucket, ratedata ) + .then( _ => this._dao.advanceDeltaIndex( meta.id, delta.type ) ) + .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) + .then( _ => this._processNextDelta( meta, history.slice( 1 ) ) ); + } + + + + /** + * Get sorted list of deltas + * + * @param doc - the document + * + * @return a list of deltas sorted by timestamp + */ + private _getTimestampSortedDeltas( doc: DeltaDocument ): Delta<any>[] + { + const data_deltas = this._getDeltas( doc, this.DELTA_RATEDATA ); + const ratedata_deltas = this._getDeltas( doc, this.DELTA_DATA ); + const deltas = data_deltas.concat( ratedata_deltas ); + + deltas.sort( this._sortByTimestamp ); + + return deltas; + } + + + /** + * Get trimmed delta list + * + * @param doc - the document + * @param type - the delta type to get + * + * @return a trimmed list of deltas + */ + private _getDeltas( doc: DeltaDocument, type: DeltaType ): Delta<any>[] + { + const deltas_obj = doc.rdelta || <ReverseDelta<any>>{}; + const deltas: Delta<any>[] = deltas_obj[ type ] || []; + + // Get type specific delta index + let published_count = 0; + if ( doc.totalPublishDelta ) + { + published_count = doc.totalPublishDelta[ type ] || 0; + } + + // Only return the unprocessed deltas + const deltas_trimmed = deltas.slice( published_count ); + + // Mark each delta with its type + deltas_trimmed.forEach( delta => + { + delta.type = type; + } ); + + return deltas_trimmed; + } + + + /** + * Sort an array of deltas by timestamp + * + * @param a - The first delta to compare + * @param b - The second delta to compare + * + * @return a sort value + */ + private _sortByTimestamp( a: Delta<any>, b: Delta<any> ): number + { + if ( a.timestamp < b.timestamp ) + { + return -1; + } + + if ( a.timestamp > b.timestamp ) { + return 1; + } + + return 0; + } +} diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts new file mode 100644 index 0000000..57a5747 --- /dev/null +++ b/src/system/DeltaPublisher.ts @@ -0,0 +1,124 @@ +/** + * Delta Publisher + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Publish delta message to a queue + */ + +import { AmqpPublisher } from './AmqpPublisher'; +import { Delta } from '../bucket/delta'; +import { EventEmitter } from 'events'; +import { DocumentMeta } from '../document/Document'; +import { context } from '../error/ContextError'; +import { AmqpError } from '../error/AmqpError'; +import { MessageWriter } from './MessageWriter'; + +import { AmqpConnection } from './amqp/AmqpConnection'; + + +export class DeltaPublisher implements AmqpPublisher +{ + /** + * Delta publisher + * + * @param _emitter - event emitter instance + * @param _ts_ctr - a timestamp constructor + * @param _conn - the amqp connection + * @param _writer - message writer + */ + constructor( + private readonly _emitter: EventEmitter, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _conn: AmqpConnection, + private readonly _writer: MessageWriter, + ) {} + + + /** + * Publish quote message to exchange post-rating + * + * @param meta - document meta data + * @param delta - delta + * @param bucket - bucket + * @param ratedata - rate data bucket + */ + publish( + meta: DocumentMeta, + delta: Delta<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): Promise<void> + { + const ts = this._ts_ctr(); + const headers = { version: 1, created: ts }; + + return this._writer.write( + ts, + meta, + delta, + bucket, + ratedata + ).then( ( avro_buffer: Buffer ) => + { + const channel = this._conn.getAmqpChannel(); + + if ( !channel ) + { + throw context( + new AmqpError( 'Error sending message: No channel' ), + { + doc_id: meta.id, + delta_type: delta.type, + delta_ts: delta.timestamp, + }, + ); + } + + // we don't use a routing key; fanout exchange + const published_successfully = channel.publish( + this._conn.getExchangeName(), + '', + avro_buffer, + { headers: headers }, + ); + + if ( !published_successfully ) + { + throw context( + new Error ( 'Delta publish failed' ), + { + doc_id: meta.id, + delta_type: delta.type, + delta_ts: delta.timestamp, + } + ); + } + } ) + .then( ( _: any ) => + { + this._emitter.emit( + 'delta-publish', + { + delta: delta, + exchange: this._conn.getExchangeName(), + } + ); + } ); + } +} diff --git a/src/system/EventMediator.ts b/src/system/EventMediator.ts new file mode 100644 index 0000000..b95536c --- /dev/null +++ b/src/system/EventMediator.ts @@ -0,0 +1,95 @@ +/** + * Event Meditator + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Hook events and log them + */ + +import { EventEmitter } from 'events'; +import { PsrLogger } from './PsrLogger'; +import { hasContext } from '../error/ContextError'; + +export class EventMediator +{ + /** + * Initialize mediator + * + * @param _log - A PSR-3 style logger + * @param _emitter - An event emitter + */ + constructor( + private readonly _log: PsrLogger, + private readonly _emitter: EventEmitter, + ) { + this._emitter.on( 'delta-publish', ( msg ) => this._log.notice( + 'Published delta to exchange', + msg + ) ); + + this._emitter.on( 'document-processed', ( msg ) => this._log.notice( + 'Deltas on document processed successfully. Document has been ' + + 'marked as completely processed.', + msg + ) ); + + this._emitter.on( 'amqp-conn-warn', ( msg ) => + this._log.warning( 'AMQP Connection Error', msg ) ); + + this._emitter.on( 'amqp-reconnect', () => + this._log.warning( + '...attempting to re-establish AMQP connection' + ) + ); + + this._emitter.on( 'amqp-reconnected', () => + this._log.warning( + 'AMQP re-connected' + ) + ); + + this._emitter.on( 'error', ( arg ) => + this._handleError( arg ) ); + } + + + /** + * Handle an error event + * + * @param e - any + */ + private _handleError( e: any ): void + { + let msg: string = ''; + let context: Record<string, any> = {}; + + if ( e instanceof( Error ) ) + { + msg = e.message; + + if ( hasContext( e ) ) + { + context = e.context; + } + + context.stack = e.stack; + } + + this._log.error( msg, context ); + } +} diff --git a/src/system/MessageWriter.ts b/src/system/MessageWriter.ts new file mode 100644 index 0000000..8ee2b84 --- /dev/null +++ b/src/system/MessageWriter.ts @@ -0,0 +1,44 @@ +/** + * Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Write a message to be published to a queue + */ +import { DocumentMeta } from '../document/Document'; +import { DeltaResult } from '../bucket/delta'; + +export interface MessageWriter +{ + /** + * Write the data to a message + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + */ + write( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: DeltaResult<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): Promise<Buffer> +}
\ No newline at end of file diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts new file mode 100644 index 0000000..9432a9d --- /dev/null +++ b/src/system/MetricsCollector.ts @@ -0,0 +1,205 @@ +/** + * Metrics Collector + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Collect Metrics for Prometheus + */ + +import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; +import { EventEmitter } from 'events'; +import { PrometheusFactory, PrometheusConfig } from './PrometheusFactory'; + +const client = require( 'prom-client' ) + + +export type MetricTimer = ( + _start_time?: [ number, number ] +) => [ number, number ]; + + +export class MetricsCollector +{ + /** The prometheus PushGateway */ + private _gateway: Pushgateway; + + /** Delta processed time histogram */ + private _process_time: Histogram; + private _process_time_name: string = 'liza_delta_process_time'; + private _process_time_help: string = 'Delta process time in ms'; + + /** Delta error counter */ + private _total_error: Counter; + private _total_error_name: string = 'liza_delta_error'; + private _total_error_help: string = 'Total errors from delta processing'; + + /** Delta current error gauge */ + private _current_error: Gauge; + private _current_error_name: string = 'liza_delta_current_error'; + private _current_error_help: string = + 'The current number of documents in an error state'; + + /** Delta error counter */ + private _total_processed: Counter; + private _total_processed_name: string = 'liza_delta_success'; + private _total_processed_help: string = + 'Total deltas successfully processed'; + + /** Timing map */ + private _timing_map: Record<string, [ number, number ]> = {}; + + private _push_interval: NodeJS.Timer; + + + /** + * Initialize delta logger + * + * @param _factory - A factory to create prometheus components + * @param _conf - Prometheus configuration + * @param _emitter - Event emitter + * @param _timer - A timer function to create a tuple timestamp + */ + constructor( + private readonly _factory: PrometheusFactory, + private readonly _conf: PrometheusConfig, + private readonly _emitter: EventEmitter, + private readonly _timer: MetricTimer, + ) { + // Set labels + client.register.setDefaultLabels( { + env: this._conf.env, + service: 'delta_processor', + } ); + + // Create metrics + this._gateway = this._factory.createGateway( + client, + this._conf.hostname, + this._conf.port, + ); + + this._process_time = this._factory.createHistogram( + client, + this._process_time_name, + this._process_time_help, + this._conf.buckets_start, + this._conf.buckets_width, + this._conf.buckets_count, + ); + + this._total_error = this._factory.createCounter( + client, + this._total_error_name, + this._total_error_help, + ); + + this._current_error = this._factory.createGauge( + client, + this._current_error_name, + this._current_error_help, + ); + + this._total_processed = this._factory.createCounter( + client, + this._total_processed_name, + this._total_processed_help, + ); + + // Push metrics on a specific interval + this._push_interval = setInterval( () => + { + this._gateway.pushAdd( + { jobName: 'liza_delta_metrics' }, + this.getPushCallback( this ) + ); + }, this._conf.push_interval_ms + ); + + // Subsribe metrics to events + this.hookMetrics(); + } + + + /** + * Stop the push interval + */ + stop(): void + { + clearInterval( this._push_interval ); + } + + + /** + * List to events to update metrics + */ + private hookMetrics(): void + { + this._emitter.on( + 'delta-process-start', + ( uid: string ) => { this._timing_map[ uid ] = this._timer(); } + ); + + this._emitter.on( + 'delta-process-end', + ( uid: string ) => + { + const start_time_ms = this._timing_map[ uid ] || [ -1, -1 ]; + const t = this._timer( start_time_ms ); + const total_time_ms = t[ 0 ] * 1000 + t[ 1 ] / 1000000; + + this._process_time.observe( total_time_ms ); + this._total_processed.inc(); + } + ); + + this._emitter.on( 'error', ( _ ) => this._total_error.inc() ); + } + + + /** + * Handle push error + * + * @param self - Metrics Collector object + * + * @return a function to handle the pushAdd callback + */ + private getPushCallback( self: MetricsCollector ): () => void + { + return ( + error?: Error | undefined, + _response?: any, + _body?: any + ): void => + { + if ( error ) + { + self._emitter.emit( 'error', error ); + } + } + } + + /** + * Update metrics with current error count + * + * @param count - the number of errors found + */ + updateErrorCount( count: number ): void + { + this._current_error.set( +count ); + } +} diff --git a/src/system/PrometheusFactory.ts b/src/system/PrometheusFactory.ts new file mode 100644 index 0000000..7330ea9 --- /dev/null +++ b/src/system/PrometheusFactory.ts @@ -0,0 +1,171 @@ +/** + * Prometheus Factory functions + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Prometheus Metrics + */ +import { Pushgateway, Histogram, Counter, Gauge } from 'prom-client'; + + +export declare type PrometheusConfig = { + /** The hostname to connect to */ + hostname: string; + + /** The port to connect to */ + port: number; + + /** The environment ( dev, test, demo, live ) */ + env: string; + + /** The rate (in milliseconds) at which metrics are pushed */ + push_interval_ms: number; + + /** The starting point for process time buckets */ + buckets_start: number; + + /** The width of process time buckets */ + buckets_width: number; + + /** The number of process time buckets */ + buckets_count: number; +} + + +/** + * Create a prometheus configuration from the environment + * + * @param env - the environment variables + * + * @return the prometheus configuration + */ +export function createPrometheusConfig( + env: NodeJS.ProcessEnv +): PrometheusConfig +{ + return <PrometheusConfig>{ + hostname: env.PROM_HOST, + port: +( env.PROM_PORT || 0 ), + env: process.env.NODE_ENV, + push_interval_ms: +( process.env.PROM_PUSH_INTERVAL_MS || 5000 ), + buckets_start: +( process.env.PROM_BUCKETS_START || 0 ), + buckets_width: +( process.env.PROM_BUCKETS_WIDTH || 10 ), + buckets_count: +( process.env.PROM_BUCKETS_COUNT || 10 ), + }; +} + + +export class PrometheusFactory +{ + /** + * Create a PushGateway + * + * @param client - prometheus client + * @param hostname - push gateway url + * @param port - push gateway port + * + * @return the gateway + */ + createGateway( + client: any, + hostname: string, + port: number, + ): Pushgateway + { + const url = 'http://' + hostname + ':' + port; + + return new client.Pushgateway( url ); + } + + + /** + * Create a histogram metric + * + * @param client - prometheus client + * @param name - metric name + * @param help - a description of the metric + * @param bucket_start - where to start the range of buckets + * @param bucket_width - the size of each bucket + * @param bucket_count - the total number of buckets + * + * @return the metric + */ + createHistogram( + client: any, + name: string, + help: string, + bucket_start: number, + bucket_width: number, + bucket_count: number, + ): Histogram + { + return new client.Histogram( { + name: name, + help: help, + buckets: client.linearBuckets( + bucket_start, + bucket_width, + bucket_count + ), + } ); + } + + + /** + * Create a counter metric + * + * @param client - prometheus client + * @param name - metric name + * @param help - a description of the metric + * + * @return the metric + */ + createCounter( + client: any, + name: string, + help: string, + ): Counter + { + return new client.Counter( { + name: name, + help: help, + } ); + } + + + /** + * Create a gauge metric + * + * @param client - prometheus client + * @param name - metric name + * @param help - a description of the metric + * + * @return the metric + */ + createGauge( + client: any, + name: string, + help: string, + ): Gauge + { + return new client.Gauge( { + name: name, + help: help, + } ); + } +}
\ No newline at end of file diff --git a/src/system/PsrLogger.ts b/src/system/PsrLogger.ts new file mode 100644 index 0000000..276c78e --- /dev/null +++ b/src/system/PsrLogger.ts @@ -0,0 +1,117 @@ +/** + * PSR logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * PSR-3 style logger + */ + +export enum LogLevel { + DEBUG, + INFO, + NOTICE, + WARNING, + ERROR, + CRITICAL, + ALERT, + EMERGENCY, +}; + + +export interface PsrLogger +{ + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void +} diff --git a/src/system/StandardLogger.ts b/src/system/StandardLogger.ts new file mode 100644 index 0000000..cdd062d --- /dev/null +++ b/src/system/StandardLogger.ts @@ -0,0 +1,199 @@ +/** + * Stdout logger + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Standard out logger implementing PSR-3 standards + */ +import { PsrLogger, LogLevel } from './PsrLogger'; + +declare type StructuredLog = { + message: string; + timestamp: UnixTimestamp; + service: string; + env: string; + severity: string; + context?: Record<string, any>; +} + +export class StandardLogger implements PsrLogger +{ + /** + * Initialize logger + * + * @param _console + * @param _ts_ctr - a timestamp constructor + * @param _env - The environment ( dev, test, demo, live ) + */ + constructor( + private readonly _console: Console, + private readonly _ts_ctr: () => UnixTimestamp, + private readonly _env: string, + ) {} + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + debug( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.DEBUG, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + info( msg: string | object, context?: object ): void + { + this._console.info( this._format( LogLevel.INFO, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + notice( msg: string | object, context?: object ): void + { + this._console.log( this._format( LogLevel.NOTICE, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + warning( msg: string | object, context?: object ): void + { + this._console.warn( this._format( LogLevel.WARNING, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + error( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ERROR, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + critical( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.CRITICAL, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + alert( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.ALERT, msg, context ) ); + } + + + /** + * Log at a debug level + * + * @param msg - the message to log + * @param context - additional message context + */ + emergency( msg: string | object, context?: object ): void + { + this._console.error( this._format( LogLevel.EMERGENCY, msg, context ) ); + } + + + /** + * Log a message + * + * @param msg - the message to log + * @param context - additional message context + */ + log( level: LogLevel, msg: string | object, context?: object ): void + { + this._console.error( this._format( level, msg, context ) ); + } + + + /** + * Get structured log object + * + * @param msg - the string or object to log + * @param level - the log level + * @param context - additional message context + * + * @returns a structured logging object + */ + private _format( + level: LogLevel, + msg: string | object, + context: object = {}, + ): StructuredLog + { + let str: string; + + if ( msg !== null && typeof( msg ) === 'object' ) + { + str = JSON.stringify( msg ); + } + else + { + str = msg; + } + + const structured_log = <StructuredLog>{ + message: str, + timestamp: this._ts_ctr(), + service: 'quote-server', + env: this._env, + severity: LogLevel[level], + }; + + if ( Object.keys( context ).length > 0 ) + { + structured_log[ "context" ] = context; + } + + return structured_log; + } +} diff --git a/src/system/amqp/AmqpConnection.ts b/src/system/amqp/AmqpConnection.ts new file mode 100644 index 0000000..410f808 --- /dev/null +++ b/src/system/amqp/AmqpConnection.ts @@ -0,0 +1,153 @@ +/** + * Amqp Connection + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +import { AmqpConfig } from '../AmqpPublisher'; +import { EventEmitter } from "events"; +import * as amqplib from "amqplib"; + +/** + * Connection to AMQP exchange + */ +export class AmqpConnection +{ + /** The amqp connection */ + private _conn?: amqplib.Connection; + + /** The amqp channel */ + private _channel?: amqplib.Channel; + + + /** + * Amqp Connection + * + * @param _conf - amqp library + * @param _conf - amqp configuration + * @param _emitter - event emitter instance + */ + constructor( + private readonly _amqp: typeof amqplib, + private readonly _conf: AmqpConfig, + private readonly _emitter: EventEmitter, + ) {} + + + /** + * Initialize connection + */ + connect(): Promise<void> + { + return this._amqp.connect( this._conf ) + .then( conn => + { + this._conn = conn; + + /** If there is an error, attempt to reconnect + * Only hook this once because it will be re-hooked on each + * successive successful connection + */ + this._conn.once( 'error', e => + { + this._emitter.emit( 'amqp-conn-warn', e ); + this._reconnect(); + } ); + + return this._conn.createChannel(); + } ) + .then( ( ch: amqplib.Channel ) => + { + this._channel = ch; + + return this._channel.assertExchange( + this._conf.exchange, + 'fanout', + { durable: true } + ); + } ) + .then( _ => {} ); + } + + + /** + * Attempt to re-establish the connection + * + * @param retry_count - the number of retries attempted + */ + private _reconnect( retry_count: number = 0 ): void + { + if ( retry_count >= this._conf.retries ) + { + this._emitter.emit( + 'error', + new Error( 'Could not re-establish AMQP connection.' ) + ); + + return; + } + + this._emitter.emit( 'amqp-reconnect' ); + + this.connect() + .then( _ => { this._emitter.emit( 'amqp-reconnected' ) } ) + .catch( _ => + { + const wait_ms = this._conf.retry_wait; + setTimeout( () => this._reconnect( ++retry_count ), wait_ms ); + } ); + } + + + /** + * Returns the exchange to publish to + * + * @return exchange name + */ + getExchangeName(): string + { + return this._conf.exchange; + } + + + /** + * Returns the amqp channel + * + * @return exchange name + */ + getAmqpChannel(): amqplib.Channel | undefined + { + if ( !this._channel ) + { + this._reconnect(); + } + + return this._channel; + } + + + /** + * Close the amqp conenction + */ + close(): void + { + if ( this._conn ) + { + this._conn.close.bind(this._conn); + } + } +} diff --git a/src/system/avro/AvroFactory.ts b/src/system/avro/AvroFactory.ts new file mode 100644 index 0000000..32ba1ec --- /dev/null +++ b/src/system/avro/AvroFactory.ts @@ -0,0 +1,32 @@ +/** + * Factory functions for avro + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +import { Duplex } from 'stream'; + +import * as avro from "avro-js"; + +/** The avro encoder constructor type */ +export type AvroEncoderCtr = ( type: avro.AvroSchema ) => Duplex; + +/** The avro encoder constructor */ +export function createAvroEncoder( schema: avro.AvroSchema ): Duplex +{ + return new avro.streams.BlockEncoder( schema ); +} diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts new file mode 100644 index 0000000..09ee64a --- /dev/null +++ b/src/system/avro/V1MessageWriter.ts @@ -0,0 +1,259 @@ +/** + * Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Write a message to be published to a queue + */ +import { DocumentMeta } from '../../document/Document'; +import { Delta } from '../../bucket/delta'; +import { AvroEncoderCtr } from '../avro/AvroFactory'; +import { AvroSchema } from 'avro-js'; +import { MessageWriter } from '../MessageWriter'; +import { context } from '../../error/ContextError'; + + +export class V1MessageWriter implements MessageWriter +{ + /** A mapping of which delta type translated to which avro event */ + readonly DELTA_MAP: Record<string, string> = { + data: 'STEP_SAVE', + ratedata: 'RATE', + }; + + + /** + * Delta publisher + * + * @param _encoder_ctr - a factory function to create an avro encoder + * @param _conn - the amqp connection + */ + constructor( + private readonly _encoder_ctor: AvroEncoderCtr, + private readonly _schema: AvroSchema, + ) {} + + + /** + * Write the data to a message + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + */ + write( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): Promise<Buffer> + { + const avro_object = this._avroFormat( + ts, + meta, + delta, + bucket, + ratedata, + ); + + return this.avroEncode( avro_object ); + } + + + /** + * Format the avro data with data type labels + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + * + * @return the formatted data + */ + private _avroFormat( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): any + { + const delta_formatted = this.setDataTypes( delta.data ); + const bucket_formatted = this.setDataTypes( bucket ); + const ratedata_formatted = this.setDataTypes( ratedata ); + const event_id = this.DELTA_MAP[ delta.type ]; + + return { + event: { + id: event_id, + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: meta.id, + created: meta.startDate, + modified: meta.lastUpdate, + }, + session: { + Session: { + entity_id: meta.entity_id, + entity_name: meta.entity_name, + }, + }, + data: { + Data: { + bucket: bucket_formatted, + }, + }, + ratedata: { + Data: { + bucket: ratedata_formatted, + }, + }, + delta: { + Data: { + bucket: delta_formatted, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + } + } + + + /** + * Encode the data in an avro buffer + * + * @param data - the data to encode + * + * @return the avro buffer or null if there is an error + */ + avroEncode( data: Record<string, any> ): Promise<Buffer> + { + return new Promise<Buffer>( ( resolve, reject ) => + { + const bufs: Buffer[] = []; + + try + { + this._schema.isValid( + data, + { + errorHook: ( keys: any, vals: any) => + { + throw context( + new Error( 'Invalid Avro Schema' ), + { + invalid_paths: keys, + invalid_data: vals, + } + ); + } + } + ); + + const encoder = this._encoder_ctor( this._schema ) + + encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) + encoder.on('error', ( err: Error ) => { reject( err ); } ) + encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) + encoder.end( data ); + } + catch ( e ) + { + reject( e ); + } + } ); + } + + + /** + * Format the data for avro by add type specifications to the data + * + * @param data - the data to format + * @param top_level - whether we are at the top level of the recursion + * + * @return the formatted data + */ + setDataTypes( data: any, top_level: boolean = true ): any + { + let data_formatted: any = {}; + + switch( typeof( data ) ) + { + case 'object': + if ( data == null ) + { + return null; + } + else if ( Array.isArray( data ) ) + { + let arr: any[] = []; + + data.forEach( ( datum ) => + { + arr.push( this.setDataTypes( datum, false ) ); + } ); + + data_formatted = ( top_level ) + ? arr + : { 'array': arr }; + } + else + { + let datum_formatted: any = {}; + + Object.keys( data).forEach( ( key: string ) => + { + const datum = this.setDataTypes( data[ key ], false ); + + datum_formatted[ key ] = datum; + + } ); + + data_formatted = ( top_level ) + ? datum_formatted + : { 'map': datum_formatted }; + } + break; + + case 'boolean': + return { 'boolean': data }; + + case 'number': + return { 'double': data }; + + case 'string': + return { 'string': data }; + + case 'undefined': + return null; + } + + return data_formatted; + } +}
\ No newline at end of file diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc new file mode 100644 index 0000000..63bbc7d --- /dev/null +++ b/src/system/avro/schema.avsc @@ -0,0 +1,215 @@ +{ + "type": "record", + "name": "update", + "fields": [ + { + "name": "event", + "type": { + "type": "record", + "name": "Event", + "fields": [ + { + "name": "id", + "type": { + "name": "EventId", + "type": "enum", + "symbols": [ + "STEP_SAVE", + "RATE" + ] + } + }, + { + "name": "ts", + "type": "long", + "logicalType": "timestamp-millis" + }, + { + "name": "actor", + "type": { + "type": "enum", + "name": "EventActor", + "symbols": [ "USER", "CLIENT", "SERVER" ] + } + }, + { + "name": "step", + "type": [ + "null", + { + "type": "record", + "name": "EventStep", + "fields": [ + { + "name": "transition", + "type": { + "type": "enum", + "name": "EventStepTransition", + "symbols": [ "BACK", "FORWARD", "END" ] + } + }, + { + "name": "src", + "type": "string" + }, + { + "name": "dest", + "type": "string" + } + ] + } + ] + } + ] + } + }, + { + "name": "document", + "type": { + "type": "record", + "name": "Document", + "doc": "Source document (quote)", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "created", + "type": ["null", "long"], + "logicalType": "timestamp-millis", + "default": null + }, + { + "name": "modified", + "type": ["null", "long"], + "logicalType": "timestamp-millis", + "default": null + } + ] + } + }, + { + "name": "session", + "type": [ + "null", + { + "type": "record", + "name": "Session", + "fields": [ + { + "name": "entity_name", + "type": "string" + }, + { + "name": "entity_id", + "type": "int" + } + ] + } + ] + }, + { + "name": "data", + "type": [ + "null", + { + "type": "record", + "name": "Data", + "fields": [ + { + "name": "bucket", + "type":{ + "type": "map", + "values": [ + "null", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string", + { + "type": "array", + "items": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + }, + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string", + { + "type": "map", + "values": [ + "null", + "boolean", + "double", + "string" + ] + } + ] + } + ] + } + ] + } + } + ] + } + ] + }, + { + "name": "ratedata", + "type": [ + "null", + "Data" + ] + }, + { + "name": "delta", + "type": [ + "null", + "Data" + ] + }, + { + "name": "program", + "type": [ + "null", + { + "type": "record", + "name": "Program", + "fields": [ + { + "type": "string", + "name": "id", + "doc": "Program id" + }, + { + "type": "string", + "name": "version", + "doc": "Program version" + } + ] + } + ] + } + ] +} diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts new file mode 100644 index 0000000..881bc79 --- /dev/null +++ b/src/system/db/DeltaDao.ts @@ -0,0 +1,87 @@ +/** + * Delta data access + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * These types are used to describe the structure of the token data as it + * is stored in Mongo. It has a number of undesirable properties and + * duplicates data---this was intended to make querying easier and work + * around Mongo limitations. + * + * This structure can be changed in the future, but we'll need to maintain + * compatibility with the existing data. + */ + +import { DocumentId } from "../../document/Document"; +import { DeltaDocument } from "../../bucket/delta"; + + +/** Manage deltas */ +export interface DeltaDao +{ + /** + * Get documents in need of processing + * + * @return documents in need of processing + */ + getUnprocessedDocuments(): Promise<DeltaDocument[]> + + + /** + * Set the document's processed index + * + * @param doc_id - Document whose index will be set + * @param type - Delta type + */ + advanceDeltaIndex( + doc_id: DocumentId, + type: string, + ): Promise<void> + + + /** + * Mark a given document as processed. First does a check to make sure that + * the document does not have a newer update timestamp than the provided one + * + * @param doc_id - The document to mark + * @param last_update_ts - The last time this document was updated + */ + markDocumentAsProcessed( + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise<void> + + + /** + * Flag the document as being in an error state + * + * @param doc_id - The document to flag + * + * @return any errors that occurred + */ + setErrorFlag( doc_id: DocumentId ): Promise<void> + + + /** + * Get a count of documents in an error state + * + * @return a count of the documents in an error state + */ + getErrorCount(): Promise<number> +} + diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts new file mode 100644 index 0000000..e058980 --- /dev/null +++ b/src/system/db/MongoDeltaDao.ts @@ -0,0 +1,278 @@ +/** + * Delta data access + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Get deltas from the mongo document in order to process and publish them + */ + +import { DocumentId } from '../../document/Document'; +import { DeltaDao } from './DeltaDao'; +import { MongoCollection } from 'mongodb'; +import { context } from '../../error/ContextError'; +import { DaoError } from '../../error/DaoError'; +import { DeltaType, DeltaDocument } from '../../bucket/delta'; + +/** Manage deltas */ +export class MongoDeltaDao implements DeltaDao +{ + /** The ratedata delta type */ + static readonly DELTA_RATEDATA: string = 'ratedata'; + + /** The data delta type */ + static readonly DELTA_DATA: string = 'data'; + + /** The document fields to read */ + readonly RESULT_FIELDS: Record<string, number> = { + id: 1, + agentName: 1, + agentEntityId: 1, + startDate: 1, + lastUpdate: 1, + data: 1, + ratedata: 1, + rdelta: 1, + totalPublishDelta: 1, + }; + + + /** + * Initialize connection + * + * @param _collection - Mongo db collection + */ + constructor( + private readonly _collection: MongoCollection, + ) {} + + + /** + * Get documents in need of processing + * + * @return documents in need of processing + */ + getUnprocessedDocuments(): Promise<DeltaDocument[]> + { + return new Promise( ( resolve, reject ) => + { + this._collection.find( + { + published: false, + deltaError: { $ne: true }, + }, + { fields: this.RESULT_FIELDS }, + ( e, cursor ) => + { + if ( e ) + { + reject( + new DaoError( + 'Error fetching unprocessed documents: ' + e + ) + ); + return + } + + cursor.toArray( ( e: Error, data: DeltaDocument[] ) => + { + if ( e ) + { + reject( + new DaoError( + 'Error fetching array from cursor: ' + e + ) + ); + return; + } + + resolve( data ); + } ); + } + ) + } ); + } + + + /** + * Set the document's processed index + * + * @param doc_id - Document whose index will be set + * @param type - Delta type + */ + advanceDeltaIndex( doc_id: DocumentId, type: DeltaType ): Promise<void> + { + return new Promise( ( resolve, reject ) => + { + const inc_data: Record<string, any> = {}; + + inc_data[ 'totalPublishDelta.' + type ] = 1; + + this._collection.update( + { id: doc_id }, + { $inc: inc_data }, + { upsert: false }, + e => + { + if ( e ) + { + reject( context( + new DaoError( 'Error advancing delta index: ' + e ), + { + doc_id: doc_id, + type: type, + } + ) ); + return; + } + + resolve(); + } + ); + } ); + } + + + /** + * Mark a given document as processed. + * + * First does a check to make sure that + * the document does not have a newer update timestamp than the provided one + * + * @param doc_id - The document to mark + * @param last_update_ts - The last time this document was updated + */ + markDocumentAsProcessed( + doc_id: DocumentId, + last_update_ts: UnixTimestamp, + ): Promise<void> + { + return new Promise( ( resolve, reject ) => + { + this._collection.update( + { id: doc_id, lastUpdate: { $lte: last_update_ts } }, + { $set: { published: true } }, + { upsert: false }, + e => + { + if ( e ) + { + reject( context( + new DaoError( + 'Error marking document as processed: ' + e + ), + { + doc_id: doc_id, + last_update_ts: last_update_ts, + } + ) ); + return; + } + + resolve(); + return; + } + ); + } ); + } + + + /** + * Flag the document as being in an error state + * + * @param doc_id - The document to flag + * + * @return any errors that occurred + */ + setErrorFlag( doc_id: DocumentId ): Promise<void> + { + return new Promise( ( resolve, reject ) => + { + this._collection.update( + { id: doc_id }, + { $set: { deltaError: true } }, + { upsert: false }, + e => + { + if ( e ) + { + reject( context( + new DaoError( + 'Failed setting error flag: ' + e + ), + { + doc_id: doc_id, + } + ) ); + return; + } + + resolve(); + return; + } + ); + } ); + } + + + /** + * Get a count of documents in an error state + * + * @return a count of the documents in an error state + */ + getErrorCount(): Promise<number> + { + return new Promise( ( resolve, reject ) => + { + this._collection.find( + { deltaError: true }, + {}, + ( e, cursor ) => + { + if ( e ) + { + reject( + new Error( + 'Failed getting error count: ' + e + ) + ); + return; + } + + cursor.toArray( ( e: NullableError, data: any[] ) => + { + if ( e ) + { + reject( context( + new DaoError( + 'Failed getting error count: ' + e + ), + { + cursor: cursor, + } + ) ); + return; + } + + resolve( data.length ); + }); + } + ) + } ); + } +} + diff --git a/src/system/db/MongoFactory.ts b/src/system/db/MongoFactory.ts new file mode 100644 index 0000000..5a3b03e --- /dev/null +++ b/src/system/db/MongoFactory.ts @@ -0,0 +1,176 @@ +/** + * Mongo Factory functions + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * These definitions are for a very old mongodb library, which will be + * once we get around to updating node. Quite a failure on the maintenance + * front. + * + * instantiate objects for MongoDb + */ +import { MongoDb, MongoDbConfig, MongoCollection } from '../../types/mongodb'; +import { DaoError } from '../../error/DaoError'; + + +const { + Db: MongoDb, + Server: MongoServer, + ReplServers: ReplSetServers, +} = require( 'mongodb' ); + + +/** + * Create a mongodb configuration from the environment + * + * @param env - the environment variables + * + * @return the mongo configuration + */ +export function createMongoConfig( env: NodeJS.ProcessEnv ): MongoDbConfig +{ + return <MongoDbConfig>{ + 'port': +( env.MONGODB_PORT || 0 ), + 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, + 'replset': env.LIZA_MONGODB_REPLSET, + 'host': env.MONGODB_HOST, + 'host_a': env.LIZA_MONGODB_HOST_A, + 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), + 'host_b': env.LIZA_MONGODB_HOST_B, + 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), + 'collection': 'quotes', + }; +} + + +/** + * Create the database connection + * + * @param conf - the configuration from the environment + * + * @return the mongodb connection + */ +export function createMongoDB( conf: MongoDbConfig ): MongoDb +{ + if( conf.ha ) + { + var mongodbPort = conf.port || 27017; + var mongodbReplSet = conf.replset || 'rs0'; + var dbServers = new ReplSetServers( + [ + new MongoServer( conf.host_a, conf.port_a || mongodbPort), + new MongoServer( conf.host_b, conf.port_b || mongodbPort), + ], + {rs_name: mongodbReplSet, auto_reconnect: true} + ); + } + else + { + var dbServers = new MongoServer( + conf.host || '127.0.0.1', + conf.port || 27017, + {auto_reconnect: true} + ); + } + var db = new MongoDb( + 'program', + dbServers, + {native_parser: false, safe: false} + ); + return db; +} + + +/** + * Attempts to connect to the database and retrieve the collection + * + * connectError event will be emitted on failure. + * + * @param db - the mongo database + * @param conf - the mongo configuration + * + * @return the collection + */ +export function getMongoCollection( + db: MongoDb, + conf: MongoDbConfig +): Promise<MongoCollection> +{ + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + db.open( ( e: any, db: any ) => + { + // if there was an error, don't bother with anything else + if ( e ) + { + // in some circumstances, it may just be telling us that + // we're already connected (even though the connection may + // have been broken) + if ( e.errno !== undefined ) + { + reject( new Error( + 'Error opening mongo connection: ' + e + ) ); + return; + } + } else if ( db == null ) + { + reject( new DaoError( 'No database connection' ) ); + return; + } + + // quotes collection + db.collection( + conf.collection, + ( e: any, collection: MongoCollection ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating collection: ' + e + ) ); + return; + } + + // initialize indexes + collection.createIndex( + [ + ['published', 1], + ['deltaError', 1], + ], + true, + ( e: any, _index: { [P: string]: any } ) => + { + if ( e ) + { + reject( new DaoError( + 'Error creating index: ' + e + ) ); + return; + } + + resolve( collection ); + return; + } + ); + } + ); + } ); + } ); +}
\ No newline at end of file diff --git a/src/types/avro-js.d.ts b/src/types/avro-js.d.ts new file mode 100644 index 0000000..a2eff67 --- /dev/null +++ b/src/types/avro-js.d.ts @@ -0,0 +1,89 @@ +/** + * avro-js type definitions + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +import { Duplex } from 'stream'; + +declare module "avro-js"; + +declare function parse( schema: string ): AvroSchema; + +export declare interface AvroSchema +{ + /** + * Write data to a buffer + * + * @param data - the data to write + * + * @return the buffer if successful + */ + toBuffer( data: Record<string, any> ): Buffer | null; + + + /** + * Check if data is valid against schema + * + * @param data - the data to validate + * @param opts - options specified as key/values + * + * @return the buffer if it is valid + */ + isValid( + data: Record<string, any>, + opts?: Record<string, any> + ): Buffer | null; + + + /** + * Write to a buffer + * + * @param data - the data to write + * @param buffer - the buffer that will be written to + */ + encode( data: Record<string, any>, buffer: Buffer ): void; + + + /** + * Output to a json string + * + * @param data - the data to format + * + * @return the formatted data + */ + toString( data: Record<string, any> ): string; + + + /** + * Deserialize from a buffer + * + * @param buffer - the buffer to read from + * + * @return the resulting data + */ + fromBuffer( buffer: Buffer ): any; +} + +declare class BlockEncoder extends Duplex +{ + constructor( schema: AvroSchema ); +} + +export declare const streams: { + BlockEncoder: typeof BlockEncoder, +}; diff --git a/src/types/misc.d.ts b/src/types/misc.d.ts index 5572739..b26d827 100644 --- a/src/types/misc.d.ts +++ b/src/types/misc.d.ts @@ -65,4 +65,7 @@ type UnixTimestampMillis = NominalType<Milliseconds, 'UnixTimestampMillis'>; * reduce the boilerplate of these function definitions, and to clearly * document that this pattern is something that used to be done frequently. */ -type NodeCallback<T, R = void> = ( e: Error | null, result: T | null ) => R; +type NodeCallback<T, R = void> = ( e: NullableError, result: T | null ) => R; + +/** Nullable error */ +type NullableError = Error | null; diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts index 808b458..bbbfa46 100644 --- a/src/types/mongodb.d.ts +++ b/src/types/mongodb.d.ts @@ -23,13 +23,59 @@ * front. */ +import { PositiveInteger } from "../numeric"; + declare module "mongodb"; +export interface MongoDbConfig extends Record<string, any> { + /** Host */ + host?: string; + + /** Port number */ + port?: number; + + /** High availability */ + ha: boolean; + + /** The mongodb collection to read from */ + collection: string; +} + + /** - * Node-style callback for queries + * Interface for the mongo database */ -type MongoCallback = ( err: Error|null, data: { [P: string]: any } ) => void; +export interface MongoDb +{ + /** + * Initialize the database connection + * + * @param callback continuation on completion + */ + open( callback: MongoCallback ): void; + + + /** + * Close the database connection + * + * @param callback continuation on completion + */ + close( callback: MongoCallback ): void; + + + /** + * Hook events + * + * @param event_id - the event to hook + * @param callback - a function to call in response to the event + */ + on( event_id: string, callback: ( err: Error ) => void ): void; +} + + +/** Node-style callback for queries */ +type MongoCallback = ( err: NullableError, data: { [P: string]: any } ) => void; /** @@ -52,11 +98,31 @@ interface MongoQueryUpdateOptions */ interface MongoFindOneOptions { + /** Fields to select */ fields?: MongoFieldSelector, } /** + * Options for `find` queries + * + * This is not at all comprehensive; it covers only the fields we actually + * make use of. + */ +interface MongoFindOptions +{ + /** Limit results returned */ + limit?: PositiveInteger, + + /** Whether to project only id's */ + id?: number, + + /** Which fields to include in the result set */ + fields?: Record<string, number>, +} + + +/** * Options for `findAndModify` queries * * This is not at all comprehensive; it covers only the fields we actually @@ -76,21 +142,26 @@ interface MongoFindAndModifyOptions /** Mongo query selector */ -type MongoSelector = { [P: string]: any }; - +export type MongoSelector = { [P: string]: any }; /** Field selector */ type MongoFieldSelector = { [P: string]: number }; +/** Mongo index specification */ +type MongoIndexSpecification = Array< Array < string | number >>; /** Mongo update clause */ -type MongoUpdate = MongoSelector; +export type MongoUpdate = MongoSelector; +/** Mongo object */ +type MongoObject = { [P: string]: any }; + +/** Mongo update clause */ +type MongoInsertSpecification = MongoObject | MongoObject[]; /** Sorting clause **/ type MongoSortClause = Array<string | [ string, MongoSortDirection ]>; - /** Sort direction */ type MongoSortDirection = -1 | 1 | 'ascending' | 'descending' | 'asc' | 'desc'; @@ -115,8 +186,6 @@ declare interface MongoCollection * @param data update data * @param options query options * @param callback continuation on completion - * - * @return callback return value */ update( selector: MongoSelector, @@ -127,6 +196,23 @@ declare interface MongoCollection /** + * Execute a query and return the results + * + * Unlike `update`, the callback return value is not propagated, and so + * the callback ought not return anything. + * + * @param selector document query + * @param fields fields to return + * @param callback continuation on completion + */ + find( + selector: MongoSelector, + fields: MongoFindOptions, + callback: MongoCallback + ): void; + + + /** * Execute a query and return the first result * * Unlike `update`, the callback return value is not propagated, and so @@ -158,4 +244,30 @@ declare interface MongoCollection options: MongoFindAndModifyOptions, callback: MongoCallback, ): void; + + + /** + * Creates an index on the collection + * + * @param fieldOrSpec - indexes to create + * @param options - mongo options + * @param callback - continuation on completion + */ + createIndex( + fieldOrSpec: MongoIndexSpecification, + options: boolean, + callback: MongoCallback, + ): void; + + + /** + * Creates an index on the collection + * + * @param docs - documents to insert + * @param callback - continuation on completion + */ + insert( + docs: MongoInsertSpecification, + callback: MongoCallback, + ): void; } diff --git a/src/version.d.ts b/src/version.d.ts new file mode 100644 index 0000000..f7a4b0e --- /dev/null +++ b/src/version.d.ts @@ -0,0 +1,39 @@ +/** + * Version information + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework + * + * Liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** Format of version array */ +type VersionTuple = [ number, number, number, string ]; + +/** Version information */ +declare interface Version extends VersionTuple +{ + major: number; + minor: number; + rev: number; + suffix: string; + + toString(): string; +} + +/** Exported version data */ +declare const version: Version; + +export = version; diff --git a/test/bucket/delta.ts b/test/bucket/delta.ts index ba1d192..cc9a790 100644 --- a/test/bucket/delta.ts +++ b/test/bucket/delta.ts @@ -19,12 +19,18 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * */ -import { createDelta as sut, Kv , DeltaResult} from "../../src/bucket/delta"; +import { + createDelta as sutCreate, + applyDelta as sutApply, + Kv, + DeltaResult, +} from "../../src/bucket/delta"; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); -interface SutTestCase<T> + +interface SutCreateTestCase<T> { label: string; src_data: T; @@ -32,68 +38,149 @@ interface SutTestCase<T> expected: DeltaResult<T>; } + +interface SutApplyTestCase<T> +{ + label: string; + bucket: T; + delta: DeltaResult<T>; + expected: T; +} + + describe( 'Delta', () => { - ( <SutTestCase<Kv<string>>[]>[ - { - label: "No changes are made, key is dropped", - src_data: { foo: [ 'bar', 'baz' ] }, - dest_data: { foo: [ 'bar', 'baz' ] }, - expected: {}, - }, - { - label: "Only the unchanged key is dropped", - src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, - dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, - expected: { bar: [ 'asd' ] }, - }, - { - label: "Changed values are updated by index with old value", - src_data: { foo: [ "bar", "baz", "quux" ] }, - dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, - }, - { - label: "The keys are null when they don't exist in first set", - src_data: {}, - dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, - }, - { - label: "Removed keys in new set show up", - src_data: { foo: [ "bar" ] }, - dest_data: {}, - expected: { foo: null }, - }, - { - label: "Indexes after a null terminator aren't included", - src_data: { foo: [ "one", "two", "three", "four" ] }, - dest_data: { foo: [ "one", "done" ] }, - expected: { foo: [ undefined, "done", null ] }, - }, - { - label: "Consider nested arrays to be scalar values", - src_data: { foo: [ [ "one" ], [ "two", "three" ] ] }, - dest_data: { foo: [ [ "one" ], [ "two" ] ] }, - expected: { foo: [ undefined, [ "two" ] ] }, - }, - { - label: "Don't evaluate zeros as falsy", - src_data: { foo: [ 0 ] }, - dest_data: { foo: [ 0 ] }, - expected: {}, - }, + describe( '#createDelta', () => + { + ( <SutCreateTestCase<Kv<string>>[]>[ + { + label: "No changes are made, key is dropped", + src_data: { foo: [ 'bar', 'baz' ] }, + dest_data: { foo: [ 'bar', 'baz' ] }, + expected: {}, + }, + { + label: "Only the unchanged key is dropped", + src_data: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, + dest_data: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, + expected: { bar: [ 'asd' ] }, + }, + { + label: "Changed values are updated by index with old value", + src_data: { foo: [ "bar", "baz", "quux" ] }, + dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, + }, + { + label: "The keys are null when they don't exist in first set", + src_data: {}, + dest_data: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Removed keys in new set show up", + src_data: { foo: [ "bar" ] }, + dest_data: {}, + expected: { foo: null }, + }, + { + label: "Indexes after a null terminator aren't included", + src_data: { foo: [ "one", "two", "three", "four" ] }, + dest_data: { foo: [ "one", "done" ] }, + expected: { foo: [ undefined, "done", null ] }, + }, + { + label: "Consider nested arrays to be scalar values", + src_data: { foo: [ [ "one" ], [ "two", "three" ] ] }, + dest_data: { foo: [ [ "one" ], [ "two" ] ] }, + expected: { foo: [ undefined, [ "two" ] ] }, + }, + { + label: "Don't evaluate zeros as falsy", + src_data: { foo: [ 0 ] }, + dest_data: { foo: [ 0 ] }, + expected: {}, + }, + { + label: "Don't evaluate empty strings as falsy", + src_data: { foo: [ '' ] }, + dest_data: { foo: [ '' ] }, + expected: {}, + }, + ] ).forEach( ( { label, src_data, dest_data, expected } ) => { - label: "Don't evaluate empty strings as falsy", - src_data: { foo: [ '' ] }, - dest_data: { foo: [ '' ] }, - expected: {}, - }, - ] ).forEach( ( { label, src_data, dest_data, expected } ) => + it( label, () => + { + expect( sutCreate( src_data, dest_data ) ) + .to.deep.equal( expected ); + } ); + } ); + } ); + + + describe( '#applyDelta', () => { - it( label, () => + ( <SutApplyTestCase<Kv<string>>[]>[ + { + label: "Empty delta changes nothing", + bucket: { foo: [ 'bar', 'baz' ] }, + delta: {}, + expected: { foo: [ 'bar', 'baz' ] }, + }, + { + label: "Field not in delta is unchanged", + bucket: { foo: [ 'bar', 'baz' ], bar: [ 'qwe' ] }, + delta: { bar: [ 'asd' ] }, + expected: { foo: [ 'bar', 'baz' ], bar: [ 'asd' ] }, + }, + { + label: "Undefined doesn't affect its corresponding index", + bucket: { foo: [ "bar", "baz", "quux" ] }, + delta: { foo: [ undefined, "quuux", null ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Delta applys correctly on empty bucket", + bucket: {}, + delta: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + expected: { foo: [ "bar", "quuux" ], moo: [ "cow" ] }, + }, + { + label: "Keys are removed properly", + bucket: { foo: [ "bar" ] }, + delta: { foo: null }, + expected: {}, + }, + { + label: "Indexes after a null terminator aren't included", + bucket: { foo: [ "one", "two", "three", "four" ] }, + delta: { foo: [ undefined, "done", null ] }, + expected: { foo: [ "one", "done" ] }, + }, + { + label: "Consider nested arrays to be scalar values", + bucket: { foo: [ [ "one" ], [ "two", "three" ] ] }, + delta: { foo: [ undefined, [ "two" ] ] }, + expected: { foo: [ [ "one" ], [ "two" ] ] }, + }, + { + label: "Don't evaluate zeros as falsy", + bucket: { foo: [ 0 ] }, + delta: {}, + expected: { foo: [ 0 ] }, + }, + { + label: "Don't evaluate empty strings as falsy", + bucket: { foo: [ '' ] }, + delta: {}, + expected: { foo: [ '' ] }, + }, + ] ).forEach( ( { label, bucket, delta, expected } ) => { - expect( sut( src_data, dest_data ) ).to.deep.equal( expected ); + it( label, () => + { + expect( sutApply( bucket, delta ) ).to.deep.equal( expected ); + } ); } ); } ); } ); diff --git a/test/conf/ConfLoaderTest.js b/test/conf/ConfLoaderTest.ts index b942216..4d71301 100644 --- a/test/conf/ConfLoaderTest.js +++ b/test/conf/ConfLoaderTest.ts @@ -1,15 +1,34 @@ /** * Tests ConfLoader + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -'use strict'; +const chai = require( 'chai' ); +const expect = chai.expect; + +import { readFile } from "fs"; + +import { ConfLoader as Sut } from "../../src/conf/ConfLoader"; + +type FsLike = { readFile: typeof readFile }; -const chai = require( 'chai' ); -const expect = chai.expect; const { - conf: { - ConfLoader: Sut, - }, store: { MemoryStore: Store, }, @@ -25,8 +44,8 @@ describe( 'ConfLoader', () => const expected_path = "/foo/bar/baz.json"; const expected_data = '{ "foo": "bar" }'; - const fs = { - readFile( path, encoding, callback ) + const fs = <FsLike>{ + readFile( path: string, encoding: string, callback: any ) { expect( path ).to.equal( expected_path ); expect( encoding ).to.equal( 'utf8' ); @@ -36,7 +55,7 @@ describe( 'ConfLoader', () => }; return expect( - Sut( fs, Store ) + new Sut( fs, Store ) .fromFile( expected_path ) .then( conf => conf.get( 'foo' ) ) ).to.eventually.deep.equal( JSON.parse( expected_data ).foo ); @@ -47,14 +66,14 @@ describe( 'ConfLoader', () => { const expected_err = Error( 'rejected' ); - const fs = { - readFile( _, __, callback ) + const fs = <FsLike>{ + readFile( _: any, __: any, callback: any ) { callback( expected_err, null ); }, }; - return expect( Sut( fs ).fromFile( '' ) ) + return expect( new Sut( fs, Store ).fromFile( '' ) ) .to.eventually.be.rejectedWith( expected_err ); } ); @@ -64,21 +83,21 @@ describe( 'ConfLoader', () => const result = { foo: {} }; const input = "foo"; - const fs = { - readFile( _, __, callback ) + const fs = <FsLike>{ + readFile( _: any, __: any, callback: any ) { callback( null, input ); }, }; - const sut = Sut.extend( + const sut = new class extends Sut { - 'override parseConfData'( given_input ) + parseConfData( given_input: string ) { expect( given_input ).to.equal( input ); return Promise.resolve( result ); - }, - } )( fs, Store ); + } + }( fs, Store ); return expect( sut.fromFile( '' ) @@ -91,8 +110,8 @@ describe( 'ConfLoader', () => { const expected_err = SyntaxError( 'test parsing error' ); - const fs = { - readFile( _, __, callback ) + const fs = <FsLike>{ + readFile( _: any, __: any, callback: any ) { // make async so that we clear the stack, and therefore // try/catch @@ -100,13 +119,13 @@ describe( 'ConfLoader', () => }, }; - const sut = Sut.extend( + const sut = new class extends Sut { - 'override parseConfData'( given_input ) + parseConfData( _given_input: string ): never { throw expected_err; - }, - } )( fs, Store ); + } + }( fs, Store ); return expect( sut.fromFile( '' ) ) .to.eventually.be.rejectedWith( expected_err ); @@ -117,20 +136,21 @@ describe( 'ConfLoader', () => { const expected_err = Error( 'test Store ctor error' ); - const fs = { - readFile: ( _, __, callback ) => callback( null, '' ), + const fs = <FsLike>{ + readFile: ( _: any, __: any, callback: any ) => + callback( null, '' ), }; const badstore = () => { throw expected_err }; - return expect( Sut( fs, badstore ).fromFile( '' ) ) + return expect( new Sut( fs, badstore ).fromFile( '' ) ) .to.eventually.be.rejectedWith( expected_err ); } ); it( "rejects promise on bad fs call", () => { - return expect( Sut( {}, Store ).fromFile( '' ) ) + return expect( new Sut( <FsLike>{}, Store ).fromFile( '' ) ) .to.eventually.be.rejected; } ); } ); diff --git a/test/server/dapi/TokenedDataApiTest.ts b/test/server/dapi/TokenedDataApiTest.ts index 27375fa..0d0a3ad 100644 --- a/test/server/dapi/TokenedDataApiTest.ts +++ b/test/server/dapi/TokenedDataApiTest.ts @@ -40,7 +40,7 @@ describe( 'TokenedDataApi', () => const expected_ns = 'foo_ns'; - ( <[string, boolean, ( e: Error|null ) => void][]>[ + ( <[string, boolean, ( e: NullableError ) => void][]>[ [ "creates token and returns data if last_created", true, diff --git a/test/server/db/MongoServerDaoTest.js b/test/server/db/MongoServerDaoTest.ts index 0fa5d30..58e6ab9 100644 --- a/test/server/db/MongoServerDaoTest.js +++ b/test/server/db/MongoServerDaoTest.ts @@ -21,9 +21,17 @@ 'use strict'; -const chai = require( 'chai' ); -const expect = chai.expect; -const { MongoServerDao: Sut } = require( '../../../' ).server.db; +import { MongoServerDao as Sut } from "../../../src/server/db/MongoServerDao"; +import { MongoSelector, MongoUpdate, MongoDb } from "mongodb"; +import { expect, use as chai_use } from 'chai'; +import { ServerSideQuote } from "../../../src/server/quote/ServerSideQuote"; +import { PositiveInteger } from "../../../src/numeric"; +import { Program } from "../../../src/program/Program"; +import { RateResult } from "../../../src/server/rater/Rater"; +import { QuoteDataBucket } from "../../../src/bucket/QuoteDataBucket"; +import { QuoteId } from "../../../src/quote/Quote"; + +chai_use( require( 'chai-as-promised' ) ); describe( 'MongoServerDao', () => @@ -41,9 +49,9 @@ describe( 'MongoServerDao', () => const quote = createStubQuote( metadata ); - const sut = Sut( createMockDb( + const sut = new Sut( createMockDb( // update - ( selector, data ) => + ( _selector: MongoSelector, data: MongoUpdate ) => { expect( data.$set[ 'meta.foo' ] ) .to.deep.equal( metadata.foo ); @@ -75,9 +83,9 @@ describe( 'MongoServerDao', () => const quote = createStubQuote( {} ); - const sut = Sut( createMockDb( + const sut = new Sut( createMockDb( // update - ( selector, data ) => + (_selector: MongoSelector, data: MongoUpdate ) => { expect( data.$push[ 'foo' ] ) .to.deep.equal( push_data.foo ); @@ -106,9 +114,9 @@ describe( 'MongoServerDao', () => const quote = createStubQuote( {} ); - const sut = Sut( createMockDb( + const sut = new Sut( createMockDb( // update - ( selector, data ) => + ( _selector: MongoSelector, data: MongoUpdate ) => { expect( data.$push ).to.equal( undefined ); @@ -131,24 +139,24 @@ describe( 'MongoServerDao', () => } ); -function createMockDb( on_update ) +function createMockDb( on_update: any ): MongoDb { const collection_quotes = { update: on_update, - createIndex: ( _, __, c ) => c(), + createIndex: ( _: any, __: any, c: any ) => c(), }; const collection_seq = { - find( _, __, c ) + find( _: any, __: any, c: any ) { c( null, { - toArray: c => c( null, { length: 5 } ), + toArray: ( c: any ) => c( null, { length: 5 } ), } ); }, }; const db = { - collection( id, c ) + collection( id: any, c: any ) { const coll = ( id === 'quotes' ) ? collection_quotes @@ -158,8 +166,9 @@ function createMockDb( on_update ) }, }; - const driver = { - open: c => c( null, db ), + const driver = <MongoDb>{ + open: ( c: any ) => c( null, db ), + close: () => {}, on: () => {}, }; @@ -167,24 +176,53 @@ function createMockDb( on_update ) } -function createStubQuote( metadata ) +function createStubQuote( metadata: Record<string, any> ) { - return { - getBucket: () => ( { + const program = <Program>{ + getId: () => '1', + ineligibleLockCount: 0, + apis: {}, + internal: {}, + meta: { + arefs: {}, + fields: {}, + groups: {}, + qdata: {}, + qtypes: {}, + }, + mapis: {}, + initQuote: () => {}, + }; + + const quote = <ServerSideQuote>{ + getBucket: () => <QuoteDataBucket>( { getData: () => {}, } ), - getMetabucket: () => ( { + getMetabucket: () => <QuoteDataBucket>( { getData: () => metadata, } ), - getId: () => 1, - getProgramVersion: () => 0, - getLastPremiumDate: () => 0, - getRatedDate: () => 0, + getId: () => <QuoteId>123, + getProgramVersion: () => 'Foo', + getLastPremiumDate: () => <UnixTimestamp>0, + getRatedDate: () => <UnixTimestamp>0, getExplicitLockReason: () => "", - getExplicitLockStep: () => 0, + getExplicitLockStep: () => <PositiveInteger>1, isImported: () => false, isBound: () => false, + getTopVisitedStepId: () => <PositiveInteger>1, + getTopSavedStepId: () => <PositiveInteger>1, + setRatedDate: () => quote, + setRateBucket: () => quote, + setRatingData: () => quote, + getRatingData: () => <RateResult>{ _unavailable_all: '0' }, + getProgram: () => program, + setExplicitLock: () => quote, + getProgramId: () => 'Foo', + getCurrentStepId: () => 0, + setLastPremiumDate: () => quote, }; + + return quote; } diff --git a/test/server/request/DataProcessorTest.ts b/test/server/request/DataProcessorTest.ts index d0e52d8..2be9d61 100644 --- a/test/server/request/DataProcessorTest.ts +++ b/test/server/request/DataProcessorTest.ts @@ -503,7 +503,7 @@ function createStubs( function createStubUserRequest( internal: boolean ) { - return { + return <UserRequest>{ getSession: () => ( { isInternal: () => internal } ) @@ -713,7 +713,44 @@ function createStubQuote() getBucket() { return new QuoteDataBucket(); - } + }, + + getMetabucket(){ + return new QuoteDataBucket(); + }, + + getProgramVersion(){ + return 'Foo'; + }, + + getExplicitLockReason(){ + return 'Reason'; + }, + + getExplicitLockStep() + { + return <PositiveInteger>1; + }, + + isImported() + { + return true; + }, + + isBound() + { + return true; + }, + + getTopVisitedStepId() + { + return <PositiveInteger>1; + }, + + getTopSavedStepId() + { + return <PositiveInteger>1; + }, }; } diff --git a/test/server/service/RatingServiceTest.ts b/test/server/service/RatingServiceTest.ts index 8896c49..546118c 100644 --- a/test/server/service/RatingServiceTest.ts +++ b/test/server/service/RatingServiceTest.ts @@ -36,6 +36,7 @@ import { UserRequest } from "../../../src/server/request/UserRequest"; import { UserResponse } from "../../../src/server/request/UserResponse"; import { UserSession } from "../../../src/server/request/UserSession"; import { QuoteDataBucket } from "../../../src/bucket/QuoteDataBucket"; +import { PositiveInteger } from "../../../src/numeric"; import { Kv } from "../../../src/bucket/delta"; import { @@ -573,19 +574,27 @@ function getStubs() const response = <UserResponse>{}; const quote = <ServerSideQuote>{ - getProgramId: () => program_id, - getProgram: () => program, - getId: () => <QuoteId>0, - setLastPremiumDate: () => quote, - setRatedDate: () => quote, - getRatedDate: () => <UnixTimestamp>0, - getLastPremiumDate: () => <UnixTimestamp>0, - getCurrentStepId: () => 0, - setExplicitLock: () => quote, - setRateBucket: () => quote, - setRatingData: () => quote, - getRatingData: () => stub_rate_data, - getBucket: () => new QuoteDataBucket(), + getProgramId: () => program_id, + getProgram: () => program, + getId: () => <QuoteId>0, + setLastPremiumDate: () => quote, + setRatedDate: () => quote, + getRatedDate: () => <UnixTimestamp>0, + getLastPremiumDate: () => <UnixTimestamp>0, + getCurrentStepId: () => 0, + setExplicitLock: () => quote, + setRateBucket: () => quote, + setRatingData: () => quote, + getRatingData: () => stub_rate_data, + getBucket: () => new QuoteDataBucket(), + getMetabucket: () => new QuoteDataBucket(), + getProgramVersion: () => 'Foo', + getExplicitLockReason: () => 'Reason', + getExplicitLockStep: () => <PositiveInteger>1, + isImported: () => true, + isBound: () => true, + getTopVisitedStepId: () => <PositiveInteger>1, + getTopSavedStepId: () => <PositiveInteger>1, }; return { diff --git a/test/server/token/MongoTokenDaoTest.ts b/test/server/token/MongoTokenDaoTest.ts index 167685a..23e403e 100644 --- a/test/server/token/MongoTokenDaoTest.ts +++ b/test/server/token/MongoTokenDaoTest.ts @@ -26,7 +26,7 @@ import { } from "../../../src/server/token/TokenDao"; import { MongoTokenDao as Sut } from "../../../src/server/token/MongoTokenDao"; - +import { MongoCollection } from "mongodb"; import { TokenId, TokenNamespace, @@ -248,6 +248,9 @@ describe( 'server.token.TokenDao', () => update() {}, findOne() {}, + find() {}, + createIndex() {}, + insert() {}, }; return expect( @@ -269,6 +272,9 @@ describe( 'server.token.TokenDao', () => update() {}, findOne() {}, + find() {}, + createIndex() {}, + insert() {}, }; return expect( @@ -477,6 +483,9 @@ describe( 'server.token.TokenDao', () => update() {}, findAndModify() {}, + find() {}, + createIndex() {}, + insert() {}, }; const result = new Sut( coll, field, () => <UnixTimestamp>0 ) @@ -520,6 +529,9 @@ describe( 'server.token.TokenDao', () => update() {}, findAndModify() {}, + find() {}, + createIndex() {}, + insert() {}, }; return expect( diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts new file mode 100644 index 0000000..1587bb2 --- /dev/null +++ b/test/system/DeltaProcessorTest.ts @@ -0,0 +1,665 @@ +/** + * Delta Processor test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; +import { AmqpPublisher } from '../../src/system/AmqpPublisher'; +import { DeltaDao } from '../../src/system/db/DeltaDao'; +import { DeltaDocument } from '../../src/bucket/delta'; +import { DocumentId } from '../../src/document/Document'; +import { EventEmitter } from 'events'; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + + +describe( 'system.DeltaProcessor', () => +{ + describe( '#process', () => + { + ( <{ + label: string, + given: any[], + expected: any + }[]>[ + { + label: 'No deltas are processed', + given: [ + { + id: 123, + lastUpdate: 123123123, + data: {}, + ratedata: {}, + rdelta: {}, + }, + ], + expected: [], + }, + + // when quote is initialized: { foo: [ '' ], state: [ 'a' ] } + { + label: 'Publishes deltas in order', + + given: [ + { + id: 123, + lastUpdate: 123123123, + + data: { + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], + }, + + ratedata: { + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], + }, + + rdelta: { + data: [ + { + timestamp: 1, + data: { + foo: [ '' ], + state: [ undefined, null ], + }, + }, + { + timestamp: 3, + data: { + foo: [ 'first' ], + state: [ undefined, undefined, null ], + }, + }, + { + timestamp: 5, + data: { + foo: [ 'second' ], + state: [ undefined, undefined, undefined, null ], + }, + }, + ], + + ratedata: [ + { + timestamp: 2, + data: { + prem: [ '' ], + state: [ undefined, null ], + }, + }, + { + timestamp: 4, + data: { + prem: [ 'rate_first' ], + state: [ undefined, undefined, null ], + }, + }, + ], + }, + }, + ], + + expected: [ + // bucket + { + doc_id: 123, + rdelta: { + foo: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: {}, + }, + + // rate + { + doc_id: 123, + rdelta: { + prem: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: { + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], + }, + }, + + // bucket + { + doc_id: 123, + rdelta: { + foo: [ 'first' ], + state: [ undefined, undefined, null ], + }, + bucket: { + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], + }, + ratedata: {}, + }, + + // rate + { + doc_id: 123, + rdelta: { + prem: [ 'rate_first' ], + state: [ undefined, undefined, null ], + }, + bucket: { + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], + }, + ratedata: { + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], + }, + }, + + // bucket + { + doc_id: 123, + rdelta: { + foo: [ 'second' ], + state: [ undefined, undefined, undefined, null ], + }, + bucket: { + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], + }, + ratedata: {}, + }, + ], + }, + + { + label: 'Publishes deltas in order for multiple documents', + + given: [ + { + id: 123, + lastUpdate: 123123123, + + data: { + foo: [ 'first' ], + state: [ 'a', 'b' ], + }, + + ratedata: { + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], + }, + + rdelta: { + data: [ + { + timestamp: 1, + data: { + foo: [ '' ], + state: [ undefined, null ], + }, + }, + ], + + ratedata: [ + { + timestamp: 4, + data: { + prem: [ '' ], + state: [ undefined, null ], + }, + }, + ], + }, + }, + + // timestamps of this document are sandwiched between + // the above to make sure documents are processed + // independently (without splicing their deltas together) + { + id: 234, + lastUpdate: 121212123, + + data: { + foo2: [ 'first' ], + state: [ 'a', 'b' ], + }, + + ratedata: { + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], + }, + + rdelta: { + data: [ + { + timestamp: 2, + data: { + foo2: [ '' ], + state: [ undefined, null ], + }, + }, + ], + + ratedata: [ + { + timestamp: 3, + data: { + prem2: [ '' ], + state: [ undefined, null ], + }, + }, + ], + }, + }, + ], + + expected: [ + // bucket + { + doc_id: 123, + rdelta: { + foo: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: {}, + }, + + // rate + { + doc_id: 123, + rdelta: { + prem: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: { + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], + }, + }, + + // bucket + { + doc_id: 234, + rdelta: { + foo2: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo2: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: {}, + }, + + // rate + { + doc_id: 234, + rdelta: { + prem2: [ '' ], + state: [ undefined, null ], + }, + bucket: { + foo2: [ 'first' ], + state: [ 'a', 'b' ], + }, + ratedata: { + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], + }, + }, + ], + }, + + { + label: 'trims delta array based on index', + given: [ + { + id: 111, + lastUpdate: 123123123, + data: { foo: [ 'second' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ '' ] }, + timestamp: 123, + }, + { + data: { foo: [ 'first' ] }, + timestamp: 234, + }, + ], + }, + totalPublishDelta: { + data: 1, + }, + }, + ], + expected: [ + { + doc_id: 111, + rdelta: { foo: [ 'first' ] }, + bucket: { foo: [ 'second' ] }, + ratedata: {} + }, + ], + }, + ] ).forEach( ( { label, given, expected } ) => it( label, () => + { + let published: any = []; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + + dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> => + { + return Promise.resolve( given ); + } + + publisher.publish = ( + meta, + delta, + bucket, + ratedata, + ): Promise<void> => + { + published.push( { + doc_id: meta.id, + rdelta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + return expect( new Sut( dao, publisher, emitter ).process() ) + .to.eventually.deep.equal( undefined ) + .then( _ => expect( published ).to.deep.equal( expected ) ); + } ) ); + } ); + + + describe( 'Error handling', () => + { + it( 'Marks document in error state and continues', () => + { + let published: any = []; + let error_flag_set = false; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const entity_num = 'Some Agency'; + const entity_id = 4321; + const lastUpdate = <UnixTimestamp>123123123; + const createdData = <UnixTimestamp>234234234; + const doc = <DeltaDocument[]>[ { + id: <DocumentId>123, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: lastUpdate, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: <UnixTimestamp>123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: <DocumentId>234, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: <UnixTimestamp>123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + const expected_published = [ + { + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, + ratedata: {}, + }, + { + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 234, + lastUpdate: 123123123, + startDate: 234234234, + }, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, + ratedata: {}, + } + ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> => + Promise.reject( new Error( expected_error ) ); + + dao.setErrorFlag = (): Promise<void> => + { + error_flag_set = true; + return Promise.resolve(); + } + + publisher.publish = ( + meta, + delta, + bucket, + ratedata, + ): Promise<void> => + { + published.push( { + meta: meta, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( new Sut( dao, publisher, emitter ).process() ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( error_flag_set ).to.be.true; + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); + + + describe( 'Error handling', () => + { + it( 'Failure to set document error state further processing', () => + { + let published: any = []; + let caught_error = ''; + const dao = createMockDeltaDao(); + const publisher = createMockDeltaPublisher(); + const emitter = new EventEmitter(); + const doc = <DeltaDocument[]>[ { + id: <DocumentId>123, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: <UnixTimestamp>234234234, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: <UnixTimestamp>123123, + type: 'data', + } + ], + ratedata: [], + }, + }, + { + id: <DocumentId>234, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: <UnixTimestamp>234234234, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { + data: [ + { + data: { foo: [ 'first_bar' ] }, + timestamp: <UnixTimestamp>123123, + type: 'data', + } + ], + ratedata: [], + }, + } ]; + + // Only one is published + const expected_published = [ { + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, + delta: { foo: [ 'first_bar' ] }, + bucket: { foo: [ 'start_bar' ] }, + ratedata: {}, + } ]; + + const expected_error = 'Uh oh'; + + dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> => + Promise.resolve( doc ); + + dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> => + Promise.reject( new Error( 'Couldn\'t mark document' ) ); + + dao.setErrorFlag = (): Promise<void> => + Promise.reject( new Error( expected_error ) ); + + publisher.publish = ( + meta, + delta, + bucket, + ratedata, + ): Promise<void> => + { + published.push( { + meta, + delta: delta.data, + bucket: bucket, + ratedata: ratedata, + } ); + + return Promise.resolve(); + } + + // Prevent node from converting an error event into an error + emitter.on( 'error', () => {} ); + + return expect( + new Sut( dao, publisher, emitter ).process() + .catch( e => { caught_error = e.message } ) + ) + .to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( caught_error ).to.equal( expected_error ); + expect( published ).to.deep.equal( expected_published ); + } ); + } ); + } ); +} ); + + +function createMockDeltaDao(): DeltaDao +{ + return <DeltaDao>{ + getUnprocessedDocuments() { return Promise.resolve( [] ); }, + advanceDeltaIndex() { return Promise.resolve(); }, + markDocumentAsProcessed() { return Promise.resolve(); }, + setErrorFlag() { return Promise.resolve(); }, + getErrorCount() { return Promise.resolve( 0 ); }, + }; +} + + +function createMockDeltaPublisher(): AmqpPublisher +{ + return <AmqpPublisher>{ + publish() { return Promise.resolve(); }, + }; +} diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts new file mode 100644 index 0000000..f352b0c --- /dev/null +++ b/test/system/DeltaPublisherTest.ts @@ -0,0 +1,236 @@ +/** + * Delta publisher test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { AmqpConnection } from '../../src/system/amqp/AmqpConnection'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; +import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; +import { DocumentId, DocumentMeta } from '../../src/document/Document'; +import { EventEmitter } from 'events'; +import { hasContext } from '../../src/error/ContextError'; +import { AmqpError } from '../../src/error/AmqpError'; +import { Channel } from 'amqplib'; +import { MessageWriter } from '../../src/system/MessageWriter'; + + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + + +describe( 'server.DeltaPublisher', () => +{ + describe( '#publish', () => + { + it( 'sends a message', () => + { + let publish_called = false; + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + conn.getAmqpChannel = () => + { + return <Channel>{ + publish: ( _: any, __: any, buf: any, ___: any ) => + { + expect( buf instanceof Buffer ).to.be.true; + + publish_called = true; + + return true; + } + }; + }; + + const sut = new Sut( emitter, ts_ctr, conn, writer ); + + return expect( + sut.publish( meta, delta, bucket, ratedata ) + ).to.eventually.deep.equal( undefined ) + .then( _ => + { + expect( publish_called ).to.be.true; + } ); + } ); + + ( <[string, () => Channel | undefined, Error, string ][]>[ + [ + 'Throws an error when publishing was unsuccessful', + () => + { + return <Channel>{ + publish: ( _: any, __: any, _buf: any, ___: any ) => + { + return false; + } + }; + }, + Error, + 'Delta publish failed' + ], + [ + 'Throws an error when no amqp channel is found', + () => + { + return undefined; + }, + AmqpError, + 'Error sending message: No channel' + ] + ] ).forEach( ( [ label, getChannelF, error_type, err_msg ] ) => + it( label, () => + { + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + const expected = { + doc_id: meta.id, + delta_type: delta.type, + delta_ts: delta.timestamp + } + + conn.getAmqpChannel = getChannelF; + + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( + error_type, err_msg + ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ) ); + + + it( 'writer#write rejects', () => + { + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const error = new Error( 'Bad thing happened' ); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + writer.write = ( + _: any, + __: any, + ___: any, + ____: any, + _____: any + ): Promise<Buffer> => + { + return Promise.reject( error ); + }; + + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + return expect( e ).to.deep.equal( error ); + } ) + ] ); + } ) + } ); +} ); + + +function ts_ctr(): UnixTimestamp +{ + return <UnixTimestamp>Math.floor( new Date().getTime() / 1000 ); +} + + +function createMockAmqpConnection(): AmqpConnection +{ + return <AmqpConnection>{ + connect: () => {}, + getExchangeName: () => { 'Foo' }, + }; +} + + +function createMockBucketData(): Record<string, any> +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockDelta(): Delta<any> +{ + return <Delta<any>>{ + type: <DeltaType>'data', + timestamp: <UnixTimestamp>123123123, + data: <DeltaResult<any>>{}, + } +} + + +function createMockWriter(): MessageWriter +{ + return <MessageWriter>{ + write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer> + { + return Promise.resolve( Buffer.from( '' ) ); + } + }; +}
\ No newline at end of file diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts new file mode 100644 index 0000000..abfbef8 --- /dev/null +++ b/test/system/EventMediatorTest.ts @@ -0,0 +1,145 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { EventMediator as Sut } from '../../src/system/EventMediator'; +import { context } from '../../src/error/ContextError'; +import { EventEmitter } from "events"; +import { expect } from 'chai'; +import { PsrLogger } from '../../src/system/PsrLogger'; + + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'document-processed triggers log#notice', () => + { + let method_called = false; + + const event_id = 'document-processed'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'delta-publish triggers log#notice', () => + { + let method_called = false; + + const event_id = 'delta-publish'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.notice = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-conn-warn triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-conn-warn'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'amqp-reconnect triggers log#warning', () => + { + let method_called = false; + + const event_id = 'amqp-reconnect'; + const emitter = new EventEmitter(); + const log = createMockLogger(); + + log.warning = ( _str: string ) => { method_called = true; }; + + new Sut( log, emitter ); + + emitter.emit( event_id ); + + expect( method_called ).to.be.true; + } ); + + it( 'context and stack are retrieved from error', () => + { + let method_called = false; + + const event_id = 'error'; + const err_msg = 'Foo'; + const stub_err = new Error( err_msg ); + const emitter = new EventEmitter(); + const log = createMockLogger(); + const err_context = { bar: 'baz' }; + + const expected_context = { + bar: err_context.bar, + stack: stub_err.stack, + }; + + log.error = ( str: string, context: any ) => + { + expect( str ).to.equal( err_msg ); + expect( context ).to.deep.equal( expected_context ); + + method_called = true; + }; + + new Sut( log, emitter ); + + emitter.emit( event_id, context( stub_err, err_context ) ); + + expect( method_called ).to.be.true; + } ); +} ); + + +function createMockLogger(): PsrLogger +{ + return <PsrLogger>{ + debug( _msg: string | object, _context: object ){}, + info( _msg: string | object, _context: object ){}, + notice( _msg: string | object, _context: object ){ console.log( 'asdasd msg: ', _msg ); }, + warning( _msg: string | object, _context: object ){}, + error( _msg: string | object, _context: object ){}, + critical( _msg: string | object, _context: object ){}, + alert( _msg: string | object, _context: object ){}, + emergency( _msg: string | object, _context: object ){}, + log( _level: any, _msg: string | object, _context: object ){}, + }; +} diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts new file mode 100644 index 0000000..9a36584 --- /dev/null +++ b/test/system/MetricsCollectorTest.ts @@ -0,0 +1,165 @@ +/** + * Metrics collector test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { + PrometheusFactory, + PrometheusConfig, +} from '../../src/system/PrometheusFactory'; +import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client'; +import { EventEmitter } from 'events'; +import { expect } from 'chai'; +import { + MetricsCollector as Sut, + MetricTimer, +} from '../../src/system/MetricsCollector'; + +const sinon = require( 'sinon' ); + +describe( 'system.MetricsCollector captures events and pushes metrics', () => +{ + it( 'process-complete event is hooked', () => + { + let histogram_called = false; + let counter_called = false; + + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer(); + const factory = createMockFactory( { + histogram_cb: () => { histogram_called = true }, + counter_cb: () => { counter_called = true }, + } ); + + const sut = new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'delta-process-end' ); + + expect( histogram_called ).to.be.true; + expect( counter_called ).to.be.true; + + sut.stop(); + } ); + + + it( 'process-error event is hooked', () => + { + let counter_called = false; + + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer(); + const factory = createMockFactory( { + counter_cb: () => { counter_called = true }, + } ); + + const sut = new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'error' ); + + expect( counter_called ).to.be.true; + + sut.stop(); + } ); + + + it( 'process-complete is timed properly', () => + { + let actual_ms = 0; + const uid = 'foo'; + const start_time_ns = 1234; + const end_time_ns = 5678; + const expected_ms = ( end_time_ns - start_time_ns ) / 1000000; + const emitter = new EventEmitter(); + const conf = createMockConfig(); + const timer = createMockTimer( start_time_ns, end_time_ns ); + const factory = createMockFactory( { + histogram_cb: ( n: number ) => { actual_ms = n }, + } ); + + const sut = new Sut( factory, conf, emitter, timer ); + + emitter.emit( 'delta-process-start', uid ); + emitter.emit( 'delta-process-end', uid ); + + expect( actual_ms ).to.be.equal( expected_ms ); + + sut.stop(); + } ); +} ); + + +function createMockFactory( + { + gateway_cb = () => {}, + counter_cb = () => {}, + histogram_cb = ( _n: number = 0 ) => {}, + gauge_cb = ( _n: number = 0 ) => {}, + }: + { + gateway_cb ?: () => void; + counter_cb ?: () => void; + histogram_cb ?: ( _n: number ) => void; + gauge_cb ?: ( _n: number ) => void; + } +): PrometheusFactory +{ + const gateway = sinon.mock( Pushgateway ); + const counter = sinon.mock( Counter ); + const histogram = sinon.mock( Histogram ); + const gauge = sinon.mock( Gauge ); + + gateway.pushAdd = gateway_cb; + counter.inc = counter_cb; + histogram.observe = histogram_cb; + gauge.set = gauge_cb; + + return <PrometheusFactory>{ + createGateway() { return gateway }, + createCounter() { return counter }, + createHistogram(){ return histogram }, + createGauge() { return gauge }, + }; +} + + +function createMockConfig(): PrometheusConfig +{ + return <PrometheusConfig>{ + hostname: 'foo.com', + port: 123, + env: 'test', + push_interval_ms: 1000, + } +} + + +function createMockTimer( _start: number = 0, _end: number = 0 ): MetricTimer +{ + return ( _start_time?: [ number, number ] ) => + { + if ( !_start_time ) + { + return [ 0, _start ]; + } + + return [ 0, _end - _start_time[ 1 ] ]; + }; +}
\ No newline at end of file diff --git a/test/system/StandardLoggerTest.ts b/test/system/StandardLoggerTest.ts new file mode 100644 index 0000000..918bfd1 --- /dev/null +++ b/test/system/StandardLoggerTest.ts @@ -0,0 +1,178 @@ +/** + * Event logger test + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { StandardLogger as Sut } from '../../src/system/StandardLogger'; +import { LogLevel } from '../../src/system/PsrLogger'; +import { expect } from 'chai'; + +const sinon = require( 'sinon' ); + +declare interface MockConsole extends Console { + getLevel(): string, + getStr(): string, +} + +describe( 'system.EventLogger captures and logs events', () => +{ + it( 'debug triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.debug( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'info triggers console output level: info', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.info( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'info' ); + } ); + + it( 'notice triggers console output level: log', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.notice( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'log' ); + } ); + + it( 'warning triggers console output level: warn', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.warning( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'warn' ); + } ); + + it( 'error triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.error( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'critical triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.critical( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'alert triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.alert( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'emergency triggers console output level: error', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.emergency( 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'log triggers corresponding log level', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + + sut.log( LogLevel.ERROR, 'Foo' ); + + expect( con.getLevel() ).to.equal( 'error' ); + } ); + + it( 'Context is included in structured output', () => + { + const con = createMockConsole(); + const env = 'test'; + const sut = new Sut( con, ts_ctr, env ); + const context = { bar: 'baz' }; + const expected_output = { + message: 'Foo', + timestamp: 123123, + service: 'quote-server', + env: 'test', + severity: 'NOTICE', + context: { + bar: 'baz', + }, + }; + + sut.notice( 'Foo', context ); + + expect( con.getStr() ).to.deep.equal( expected_output ); + } ); +} ); + + +function ts_ctr(): UnixTimestamp +{ + return <UnixTimestamp>123123; +} + + +function createMockConsole(): MockConsole +{ + const mock = sinon.mock( console ); + + mock.lvl = ''; + mock.str = ''; + mock.info = ( str: string ) => { mock.str = str; mock.lvl = 'info'; }; + mock.log = ( str: string ) => { mock.str = str; mock.lvl = 'log'; }; + mock.warn = ( str: string ) => { mock.str = str; mock.lvl = 'warn'; }; + mock.error = ( str: string ) => { mock.str = str; mock.lvl = 'error'; }; + mock.getLevel = () => mock.lvl; + mock.getStr = () => mock.str; + + return mock; +} diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts new file mode 100644 index 0000000..271d735 --- /dev/null +++ b/test/system/V1MessageWriterTest.ts @@ -0,0 +1,532 @@ +/** + * V1 Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Tests for Version 1 of the avro message writer + */ + +import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter'; +import { hasContext, context } from '../../src/error/ContextError'; +import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; +import { DocumentMeta, DocumentId } from '../../src/document/Document'; +import { Duplex } from 'stream'; +import { AvroSchema } from 'avro-js'; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + +const sinon = require( 'sinon' ); + +describe( 'system.V1MessageWriter', () => +{ + it( 'Rejects improperly formatted data', () => + { + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const error = new Error( 'Oh no' ); + const schema = createMockAvroSchema(); + const ts = <UnixTimestamp>123; + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + const expected = { + invalid_paths: 'Foo', + invalid_data: 'Bar', + }; + + const error_context = context( error, expected ); + + schema.isValid = () => { throw error_context; }; + + const result = new Sut( + createMockEncoderCtor( schema ), + schema, + ).write( ts, meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ); + + + describe( '#avroEncode parses', () => + { + [ + { + label: 'Null value', + valid: true, + delta_data: { foo: null }, + }, + { + label: 'Null array', + valid: true, + delta_data: { foo: { 'array': [ null ] } }, + }, + { + label: 'Boolean value', + valid: true, + delta_data: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + valid: true, + delta_data: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null, + ] } }, + }, + { + label: 'Nested Array with mixed values', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] } + ] } }, + }, + { + label: 'Non-array', + valid: false, + delta_data: { foo: 'bar' }, + }, + { + label: 'Map objects', + valid: true, + delta_data: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'baz': { 'double': 1572903485000 }, + } } + } } + ] } }, + } + ].forEach( ( { label, delta_data, valid } ) => + { + it( label, () => + { + const data = createMockData( delta_data ); + const schema = createMockAvroSchema(); + + const sut = new Sut( + createMockEncoderCtor( schema ), + schema + ); + + sut.avroEncode( data ) + .then( b => + { + expect( typeof(b) ).to.equal( 'object' ); + expect( valid ).to.be.true; + } ) + .catch( _ => + { + expect( valid ).to.be.false; + } ); + } ); + } ); + } ); + + + describe( '#setDataTypes annotates', () => + { + [ + { + label: 'Null', + delta_data: null, + expected: null, + }, + { + label: 'Null Value', + delta_data: { foo: null }, + expected: { foo: null }, + }, + { + label: 'Boolean Value', + delta_data: { foo: [ true ] }, + expected: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + delta_data: { foo: [ + 'bar', + 'baz', + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + delta_data: { foo: [ + 123 + ] }, + expected: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + delta_data: { foo: [ + [ + 'bar', + 'baz', + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] }, + ] } }, + }, + { + label: 'Double nested array', + delta_data: { foo: [ + [ + [ + 'bar', + 123, + null + ], + ], + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123 }, + null, + ] }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + delta_data: { foo: [ + 'bar', + 'baz', + null + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + [ + 'bar', + 123321, + null, + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] }, + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + { + 'bar': { + 'wer': 'qaz', + 'qwe': 1572903485000, + 'asd': true, + 'zxc': null, + }, + }, + ] }, + expected: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'wer': { 'string': 'qaz' }, + 'qwe': { 'double': 1572903485000 }, + 'asd': { 'boolean': true }, + 'zxc': null, + } }, + } }, + ] } }, + }, + ].forEach( ( { label, delta_data, expected } ) => + { + it( label, () => + { + const encoded = 'FooBar'; + const avroEncoderCtr = createMockEncoder( encoded ); + const stub_schema = <AvroSchema>{}; + const sut = new Sut( + avroEncoderCtr, + stub_schema, + ); + const actual = sut.setDataTypes( delta_data ); + + expect( actual ).to.deep.equal( expected ); + } ); + } ); + } ); + + + it( 'Message is formatted correctly', () => + { + const bucket = { foo: [ 'bar', 'baz' ] }; + const ratedata = {}; + const doc_id = <DocumentId>123; + const entity_name = 'Some Agency'; + const entity_id = 123; + const startDate = <UnixTimestamp>345; + const lastUpdate = <UnixTimestamp>456; + const schema = createMockAvroSchema(); + const ts = <UnixTimestamp>123; + const encoder = createMockEncoderCtor( schema ); + const meta = <DocumentMeta>{ + id: doc_id, + entity_name: entity_name, + entity_id: entity_id, + startDate: startDate, + lastUpdate: lastUpdate, + }; + + const delta = <Delta<any>>{ + type: <DeltaType>'data', + timestamp: <UnixTimestamp>123123123, + data: <DeltaResult<any>>{}, + }; + + const expected = { + event: { + id: 'STEP_SAVE', + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: doc_id, + created: startDate, + modified: lastUpdate, + }, + session: { + Session: { + entity_name: entity_name, + entity_id: entity_id, + }, + }, + data: { + Data: { + bucket: { + 'foo': { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } + }, + }, + }, + ratedata: { + Data: { + bucket: {}, + }, + }, + delta: { + Data: { + bucket: delta.data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + }; + + let is_valid_called = false; + + schema.isValid = ( data: Record<string, any>, _:any ) => + { + expect( data ).to.deep.equal( expected ); + + is_valid_called = true; + + return null; + } + + return expect( new Sut( encoder, schema ) + .write( ts, meta, delta, bucket, ratedata ) ) + .to.eventually.deep.equal( Buffer.from( '' ) ) + .then( _ => + { + expect( is_valid_called ).to.be.true; + } ) + } ); +} ); + + +function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr +{ + return ( _schema: AvroSchema ) => + { + const mock = sinon.mock( Duplex ); + + mock.on = ( _: string, __: any ) => {}; + mock.end = ( _: any ) => { return mock_encoded_data; }; + + return mock; + }; +} + + +function createMockData( delta_data: any ): any +{ + + return { + event: { + id: 'RATE', + ts: 1573856916, + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, + created: 1573856916, + modified: 1573856916, + top_visited_step: '2', + }, + data: null, + ratedata: null, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', + }, + }, + }; +} + + +function createMockDelta(): Delta<any> +{ + return <Delta<any>>{ + type: <DeltaType>'data', + timestamp: <UnixTimestamp>123123123, + data: <DeltaResult<any>>{}, + } +} + + +function createMockBucketData(): Record<string, any> +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockEncoderCtor( stub_schema: AvroSchema ): + ( schema: AvroSchema ) => Duplex +{ + const events = <Record<string, () => void>>{}; + + const mock_duplex = <Duplex>(<unknown>{ + on( event_name: string, callback: () => void ) + { + events[ event_name ] = callback; + }, + + end() + { + events.end(); + }, + } ); + + return ( schema: AvroSchema ): Duplex => + { + expect( schema ).to.equal( stub_schema ); + return mock_duplex; + }; +} + + +function createMockAvroSchema(): AvroSchema +{ + return <AvroSchema>{ + toBuffer() { return null }, + isValid() { return null }, + encode() {}, + toString() { return '' }, + fromBuffer() { return {} }, + }; +} diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts new file mode 100644 index 0000000..1e4237d --- /dev/null +++ b/test/system/amqp/AmqpConnectionTest.ts @@ -0,0 +1,112 @@ +/** + * Tests AmqpConnection + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Amqp Connection + */ + +import { AmqpConnection as Sut } from "../../../src/system/amqp/AmqpConnection"; +import { AmqpConfig } from "../../../src/system/AmqpPublisher"; +import { EventEmitter } from "events"; +import * as amqplib from "amqplib"; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + +describe( 'AmqpConnection', () => +{ + describe( '#connect', () => + { + it( "fails when exchange cannot be asserted", () => + { + const expected_err = new Error( "test failure" ); + + const mock_channel = <amqplib.Channel>(<unknown>{ + assertExchange() { + return Promise.reject( expected_err ); + }, + } ); + + const mock_connection = <amqplib.Connection>(<unknown>{ + once() {}, + + createChannel() { + return Promise.resolve( mock_channel ); + }, + } ); + + const mock_amqp = <typeof amqplib>(<unknown>{ + connect() { + return Promise.resolve( mock_connection ); + } + } ); + + const emitter = new EventEmitter(); + const conf = <AmqpConfig>{}; + const sut = new Sut( mock_amqp, conf, emitter ); + + return expect( sut.connect() ) + .to.eventually.be.rejectedWith( expected_err ); + } ); + } ); + + + describe( '#reconnect', () => + { + it( "is called when there is an error with the connection", () => + { + let reconnect_called = false; + + const mock_channel = <amqplib.Channel>(<unknown>{ + assertExchange() { + return Promise.resolve(); + }, + } ); + + const mock_connection = <amqplib.Connection>Object.create( + new EventEmitter() + ); + + mock_connection.createChannel = (): any => { + return Promise.resolve( mock_channel ); + }; + + const mock_amqp = <typeof amqplib>(<unknown>{ + connect() { + return Promise.resolve( mock_connection ); + } + } ); + + const emitter = new EventEmitter(); + + emitter.on( 'amqp-reconnect', () => { reconnect_called = true } ); + + const conf = <AmqpConfig>{}; + const sut = new Sut( mock_amqp, conf, emitter ); + + const result = sut.connect() + .then( () => mock_connection.emit( 'error' ) ) + + return expect( result ) + .to.eventually.deep.equal( true ) + .then( _ => expect( reconnect_called ).to.be.true ); + } ); + } ); +} ); + diff --git a/tsconfig.json b/tsconfig.json index 83b8f5e..ee03891 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -18,6 +18,7 @@ } }, "include": [ + "bin/*", "src/**/*", "test/**/*" ] |