diff options
author | Austin Schaffer <austin.schaffer@ryansg.com> | 2019-11-25 17:20:56 -0500 |
---|---|---|
committer | Austin Schaffer <austin.schaffer@ryansg.com> | 2019-12-12 10:10:45 -0500 |
commit | 1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4 (patch) | |
tree | f108e6e51aeba2ff8cde08343c25dff3b7a026cb /bin | |
parent | 5ee9a5d3409c07e59644a5680a818c2c301eba77 (diff) | |
download | liza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.tar.gz liza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.tar.bz2 liza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.zip |
[DEV-5312] Refactor references from 'self' to 'this', pass console into event logger, and add factory for prometheus
Diffstat (limited to 'bin')
-rw-r--r-- | bin/delta-processor.ts | 228 |
1 files changed, 144 insertions, 84 deletions
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts index 9813169..ee42cc6 100644 --- a/bin/delta-processor.ts +++ b/bin/delta-processor.ts @@ -18,19 +18,20 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -import fs = require( 'fs' ); - -import { AmqpConfig } from "../src/system/AmqpPublisher"; -import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao"; -import { DeltaProcessor } from "../src/system/DeltaProcessor"; -import { DeltaPublisher } from "../src/system/DeltaPublisher"; -import { MongoDb, MongoDbConfig } from "../src/types/mongodb"; -import { DeltaLogger } from "../src/system/DeltaLogger"; -import { EventEmitter } from "events"; +import fs = require( 'fs' ); + +import { AmqpConfig } from '../src/system/AmqpPublisher'; +import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao'; +import { DeltaProcessor } from '../src/system/DeltaProcessor'; +import { DeltaPublisher } from '../src/system/DeltaPublisher'; +import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb'; +import { EventLogger } from '../src/system/EventLogger'; +import { EventEmitter } from 'events'; +import { PrometheusFactory } from '../src/system/PrometheusFactory'; import { MetricsCollector, PrometheusConfig, -} from "../src/system/MetricsCollector"; +} from '../src/system/MetricsCollector'; const { Db: MongoDb, @@ -38,53 +39,40 @@ const { ReplServers: ReplSetServers, } = require( 'mongodb' ); -// TODO: fix this -process.env.NODE_ENV = 'dev'; -process.env.amqp_hostname = 'localhost'; -process.env.amqp_port = '5672'; -process.env.amqp_username = 'quote_referral'; -process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E'; -process.env.amqp_frameMax = '0'; -process.env.amqp_heartbeat = '2'; -process.env.amqp_vhost = 'quote'; -process.env.amqp_exchange = 'quoteupdate'; -process.env.amqp_retries = '30'; -process.env.amqp_retry_wait = '1'; -process.env.prom_hostname = 'dmz2docker01.rsgcorp.local'; -process.env.prom_port = '9091'; - -// Environment variables -const amqp_conf = _getAmqpConfig( process.env ); -const db_conf = _getMongoConfig( process.env ); -const prom_conf = _getPrometheusConfig( process.env ); -const env = process.env.NODE_ENV || 'Unknown Environment'; - -// Event handling -const emitter = new EventEmitter(); - -// Event subscribers -new DeltaLogger( env, emitter, ts_ctr ); -const metrics = new MetricsCollector( prom_conf, emitter ); - -// Instantiate classes for processor -const db = _createDB( db_conf ); -const dao = new MongoDeltaDao( db ); -const publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); -const processor = new DeltaProcessor( dao, publisher, emitter ); - -// If the dao intializes successfully then process on a two second interval -const interval_ms = 2000; +const amqp_conf = _getAmqpConfig( process.env ); +const db_conf = _getMongoConfig( process.env ); +const prom_conf = _getPrometheusConfig( process.env ); +const env = process.env.NODE_ENV || 'Unknown Environment'; +const process_interval_ms = +(process.env.process_interval_ms || 2000); +const emitter = new EventEmitter(); +const db = _createDB( db_conf ); + +// Prometheus Metrics +const prom_factory = new PrometheusFactory(); +const metrics = new MetricsCollector( prom_factory, prom_conf, emitter ); + +// Structured logging +new EventLogger( console, env, emitter, ts_ctr ); let process_interval: NodeJS.Timer; +let dao: MongoDeltaDao; +let publisher: DeltaPublisher; +let processor: DeltaProcessor; -dao.init() +_getMongoCollection( db, db_conf ) +.then( ( conn: MongoCollection ) => +{ + return new MongoDeltaDao( conn ); +} ) +.then( ( mongoDao: MongoDeltaDao ) => +{ + dao = mongoDao; + publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr ); + processor = new DeltaProcessor( mongoDao, publisher, emitter ); +} ) .then( _ => { - publisher.connect() - .catch( e => - { - console.error( 'AMQP connection error: ' + e ); - } ); + publisher.connect(); } ) .then( _ => { @@ -99,10 +87,10 @@ dao.init() processor.process(); metrics.checkForErrors( dao ); }, - interval_ms, + process_interval_ms, ); } ) -.catch( err => { console.error( 'Error: ' + err ); } ); +.catch( e => { console.error( 'Error: ' + e ); } ); /** @@ -151,27 +139,26 @@ function writePidFile( pid_path: string ): void */ function shutdown( signal: string ): void { - console.log( "Received " + signal + ". Beginning graceful shutdown:" ); - - console.log( "...Stopping processing interval" ); + console.log( 'Received ' + signal + '. Beginning graceful shutdown:' ); + console.log( '...Stopping processing interval' ); clearInterval( process_interval ); - console.log( "...Closing MongoDb connection" ); + console.log( '...Closing MongoDb connection' ); db.close( ( err, _data ) => { if ( err ) { - console.error( " Error closing connection: " + err ); + console.error( ' Error closing connection: ' + err ); } } ); - console.log( "...Closing AMQP connection..." ); + console.log( '...Closing AMQP connection...' ); publisher.close(); - console.log( "Shutdown complete. Exiting." ); + console.log( 'Shutdown complete. Exiting.' ); process.exit(); } @@ -226,6 +213,77 @@ function _createDB( conf: MongoDbConfig ): MongoDb /** + * Attempts to connect to the database + * + * connectError event will be emitted on failure. + * + * @return any errors that occurred + */ +function _getMongoCollection( + db: MongoDb, + conf: MongoDbConfig +): Promise<MongoCollection> +{ + return new Promise( ( resolve, reject ) => + { + // attempt to connect to the database + db.open( ( e: any, db: any ) => + { + // if there was an error, don't bother with anything else + if ( e ) + { + // in some circumstances, it may just be telling us that + // we're already connected (even though the connection may + // have been broken) + if ( e.errno !== undefined ) + { + reject( 'Error opening mongo connection: ' + e ); + return; + } + } else if ( db == null ) + { + reject( 'No database connection' ); + return; + } + + // quotes collection + db.collection( + conf.collection, + ( e: any, collection: MongoCollection ) => + { + if ( e ) + { + reject( 'Error creating collection: ' + e ); + return; + } + + // initialize indexes + collection.createIndex( + [ + ['published', 1], + ['deltaError', 1], + ], + true, + ( e: any, _index: { [P: string]: any } ) => + { + if ( e ) + { + reject( 'Error creating index: ' + e ); + return; + } + + resolve( collection ); + return; + } + ); + } + ); + } ); + } ); +} + + +/** * Create a mongodb configuration from the environment * * @param env - the environment variables @@ -235,14 +293,15 @@ function _createDB( conf: MongoDbConfig ): MongoDb function _getMongoConfig( env: any ): MongoDbConfig { return <MongoDbConfig>{ - "port": +( env.MONGODB_PORT || 0 ), - "ha": +( env.LIZA_MONGODB_HA || 0 ) == 1, - "replset": env.LIZA_MONGODB_REPLSET, - "host": env.MONGODB_HOST, - "host_a": env.LIZA_MONGODB_HOST_A, - "port_a": +( env.LIZA_MONGODB_PORT_A || 0 ), - "host_b": env.LIZA_MONGODB_HOST_B, - "port_b": +( env.LIZA_MONGODB_PORT_B || 0 ), + 'port': +( env.MONGODB_PORT || 0 ), + 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1, + 'replset': env.LIZA_MONGODB_REPLSET, + 'host': env.MONGODB_HOST, + 'host_a': env.LIZA_MONGODB_HOST_A, + 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ), + 'host_b': env.LIZA_MONGODB_HOST_B, + 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ), + 'collection': 'quotes', }; } @@ -257,18 +316,18 @@ function _getMongoConfig( env: any ): MongoDbConfig function _getAmqpConfig( env: any ): AmqpConfig { return <AmqpConfig>{ - "protocol": "amqp", - "hostname": env.amqp_hostname, - "port": +( env.amqp_port || 0 ), - "username": env.amqp_username, - "password": env.amqp_password, - "locale": "en_US", - "frameMax": env.amqp_frameMax, - "heartbeat": env.amqp_heartbeat, - "vhost": env.amqp_vhost, - "exchange": env.amqp_exchange, - "retries": env.amqp_retries || 30, - "retry_wait": env.amqp_retry_wait || 1, + 'protocol': 'amqp', + 'hostname': env.amqp_hostname, + 'port': +( env.amqp_port || 0 ), + 'username': env.amqp_username, + 'password': env.amqp_password, + 'locale': 'en_US', + 'frameMax': env.amqp_frameMax, + 'heartbeat': env.amqp_heartbeat, + 'vhost': env.amqp_vhost, + 'exchange': env.amqp_exchange, + 'retries': env.amqp_retries || 30, + 'retry_wait': env.amqp_retry_wait || 1, }; } @@ -283,8 +342,9 @@ function _getAmqpConfig( env: any ): AmqpConfig function _getPrometheusConfig( env: any ): PrometheusConfig { return <PrometheusConfig>{ - "hostname": env.prom_hostname, - "port": +( env.prom_port || 0 ), - "env": process.env.NODE_ENV, + 'hostname': env.prom_hostname, + 'port': +( env.prom_port || 0 ), + 'env': process.env.NODE_ENV, + 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ), }; }
\ No newline at end of file |