Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/bin
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-25 17:20:56 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-12-12 10:10:45 -0500
commit1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4 (patch)
treef108e6e51aeba2ff8cde08343c25dff3b7a026cb /bin
parent5ee9a5d3409c07e59644a5680a818c2c301eba77 (diff)
downloadliza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.tar.gz
liza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.tar.bz2
liza-1b96cd91470bc01fb87ee0ee3e92f2e3c10f20f4.zip
[DEV-5312] Refactor references from 'self' to 'this', pass console into event logger, and add factory for prometheus
Diffstat (limited to 'bin')
-rw-r--r--bin/delta-processor.ts228
1 files changed, 144 insertions, 84 deletions
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index 9813169..ee42cc6 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -18,19 +18,20 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-import fs = require( 'fs' );
-
-import { AmqpConfig } from "../src/system/AmqpPublisher";
-import { MongoDeltaDao } from "../src/system/db/MongoDeltaDao";
-import { DeltaProcessor } from "../src/system/DeltaProcessor";
-import { DeltaPublisher } from "../src/system/DeltaPublisher";
-import { MongoDb, MongoDbConfig } from "../src/types/mongodb";
-import { DeltaLogger } from "../src/system/DeltaLogger";
-import { EventEmitter } from "events";
+import fs = require( 'fs' );
+
+import { AmqpConfig } from '../src/system/AmqpPublisher';
+import { MongoDeltaDao } from '../src/system/db/MongoDeltaDao';
+import { DeltaProcessor } from '../src/system/DeltaProcessor';
+import { DeltaPublisher } from '../src/system/DeltaPublisher';
+import { MongoDb, MongoDbConfig, MongoCollection } from '../src/types/mongodb';
+import { EventLogger } from '../src/system/EventLogger';
+import { EventEmitter } from 'events';
+import { PrometheusFactory } from '../src/system/PrometheusFactory';
import {
MetricsCollector,
PrometheusConfig,
-} from "../src/system/MetricsCollector";
+} from '../src/system/MetricsCollector';
const {
Db: MongoDb,
@@ -38,53 +39,40 @@ const {
ReplServers: ReplSetServers,
} = require( 'mongodb' );
-// TODO: fix this
-process.env.NODE_ENV = 'dev';
-process.env.amqp_hostname = 'localhost';
-process.env.amqp_port = '5672';
-process.env.amqp_username = 'quote_referral';
-process.env.amqp_password = 'Et7iojahwo4aePie9Cahng7Chu5eim4E';
-process.env.amqp_frameMax = '0';
-process.env.amqp_heartbeat = '2';
-process.env.amqp_vhost = 'quote';
-process.env.amqp_exchange = 'quoteupdate';
-process.env.amqp_retries = '30';
-process.env.amqp_retry_wait = '1';
-process.env.prom_hostname = 'dmz2docker01.rsgcorp.local';
-process.env.prom_port = '9091';
-
-// Environment variables
-const amqp_conf = _getAmqpConfig( process.env );
-const db_conf = _getMongoConfig( process.env );
-const prom_conf = _getPrometheusConfig( process.env );
-const env = process.env.NODE_ENV || 'Unknown Environment';
-
-// Event handling
-const emitter = new EventEmitter();
-
-// Event subscribers
-new DeltaLogger( env, emitter, ts_ctr );
-const metrics = new MetricsCollector( prom_conf, emitter );
-
-// Instantiate classes for processor
-const db = _createDB( db_conf );
-const dao = new MongoDeltaDao( db );
-const publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
-const processor = new DeltaProcessor( dao, publisher, emitter );
-
-// If the dao intializes successfully then process on a two second interval
-const interval_ms = 2000;
+const amqp_conf = _getAmqpConfig( process.env );
+const db_conf = _getMongoConfig( process.env );
+const prom_conf = _getPrometheusConfig( process.env );
+const env = process.env.NODE_ENV || 'Unknown Environment';
+const process_interval_ms = +(process.env.process_interval_ms || 2000);
+const emitter = new EventEmitter();
+const db = _createDB( db_conf );
+
+// Prometheus Metrics
+const prom_factory = new PrometheusFactory();
+const metrics = new MetricsCollector( prom_factory, prom_conf, emitter );
+
+// Structured logging
+new EventLogger( console, env, emitter, ts_ctr );
let process_interval: NodeJS.Timer;
+let dao: MongoDeltaDao;
+let publisher: DeltaPublisher;
+let processor: DeltaProcessor;
-dao.init()
+_getMongoCollection( db, db_conf )
+.then( ( conn: MongoCollection ) =>
+{
+ return new MongoDeltaDao( conn );
+} )
+.then( ( mongoDao: MongoDeltaDao ) =>
+{
+ dao = mongoDao;
+ publisher = new DeltaPublisher( amqp_conf, emitter, ts_ctr );
+ processor = new DeltaProcessor( mongoDao, publisher, emitter );
+} )
.then( _ =>
{
- publisher.connect()
- .catch( e =>
- {
- console.error( 'AMQP connection error: ' + e );
- } );
+ publisher.connect();
} )
.then( _ =>
{
@@ -99,10 +87,10 @@ dao.init()
processor.process();
metrics.checkForErrors( dao );
},
- interval_ms,
+ process_interval_ms,
);
} )
-.catch( err => { console.error( 'Error: ' + err ); } );
+.catch( e => { console.error( 'Error: ' + e ); } );
/**
@@ -151,27 +139,26 @@ function writePidFile( pid_path: string ): void
*/
function shutdown( signal: string ): void
{
- console.log( "Received " + signal + ". Beginning graceful shutdown:" );
-
- console.log( "...Stopping processing interval" );
+ console.log( 'Received ' + signal + '. Beginning graceful shutdown:' );
+ console.log( '...Stopping processing interval' );
clearInterval( process_interval );
- console.log( "...Closing MongoDb connection" );
+ console.log( '...Closing MongoDb connection' );
db.close( ( err, _data ) =>
{
if ( err )
{
- console.error( " Error closing connection: " + err );
+ console.error( ' Error closing connection: ' + err );
}
} );
- console.log( "...Closing AMQP connection..." );
+ console.log( '...Closing AMQP connection...' );
publisher.close();
- console.log( "Shutdown complete. Exiting." );
+ console.log( 'Shutdown complete. Exiting.' );
process.exit();
}
@@ -226,6 +213,77 @@ function _createDB( conf: MongoDbConfig ): MongoDb
/**
+ * Attempts to connect to the database
+ *
+ * connectError event will be emitted on failure.
+ *
+ * @return any errors that occurred
+ */
+function _getMongoCollection(
+ db: MongoDb,
+ conf: MongoDbConfig
+): Promise<MongoCollection>
+{
+ return new Promise( ( resolve, reject ) =>
+ {
+ // attempt to connect to the database
+ db.open( ( e: any, db: any ) =>
+ {
+ // if there was an error, don't bother with anything else
+ if ( e )
+ {
+ // in some circumstances, it may just be telling us that
+ // we're already connected (even though the connection may
+ // have been broken)
+ if ( e.errno !== undefined )
+ {
+ reject( 'Error opening mongo connection: ' + e );
+ return;
+ }
+ } else if ( db == null )
+ {
+ reject( 'No database connection' );
+ return;
+ }
+
+ // quotes collection
+ db.collection(
+ conf.collection,
+ ( e: any, collection: MongoCollection ) =>
+ {
+ if ( e )
+ {
+ reject( 'Error creating collection: ' + e );
+ return;
+ }
+
+ // initialize indexes
+ collection.createIndex(
+ [
+ ['published', 1],
+ ['deltaError', 1],
+ ],
+ true,
+ ( e: any, _index: { [P: string]: any } ) =>
+ {
+ if ( e )
+ {
+ reject( 'Error creating index: ' + e );
+ return;
+ }
+
+ resolve( collection );
+ return;
+ }
+ );
+ }
+ );
+ } );
+ } );
+}
+
+
+/**
* Create a mongodb configuration from the environment
*
* @param env - the environment variables
@@ -235,14 +293,15 @@ function _createDB( conf: MongoDbConfig ): MongoDb
function _getMongoConfig( env: any ): MongoDbConfig
{
return <MongoDbConfig>{
- "port": +( env.MONGODB_PORT || 0 ),
- "ha": +( env.LIZA_MONGODB_HA || 0 ) == 1,
- "replset": env.LIZA_MONGODB_REPLSET,
- "host": env.MONGODB_HOST,
- "host_a": env.LIZA_MONGODB_HOST_A,
- "port_a": +( env.LIZA_MONGODB_PORT_A || 0 ),
- "host_b": env.LIZA_MONGODB_HOST_B,
- "port_b": +( env.LIZA_MONGODB_PORT_B || 0 ),
+ 'port': +( env.MONGODB_PORT || 0 ),
+ 'ha': +( env.LIZA_MONGODB_HA || 0 ) == 1,
+ 'replset': env.LIZA_MONGODB_REPLSET,
+ 'host': env.MONGODB_HOST,
+ 'host_a': env.LIZA_MONGODB_HOST_A,
+ 'port_a': +( env.LIZA_MONGODB_PORT_A || 0 ),
+ 'host_b': env.LIZA_MONGODB_HOST_B,
+ 'port_b': +( env.LIZA_MONGODB_PORT_B || 0 ),
+ 'collection': 'quotes',
};
}
@@ -257,18 +316,18 @@ function _getMongoConfig( env: any ): MongoDbConfig
function _getAmqpConfig( env: any ): AmqpConfig
{
return <AmqpConfig>{
- "protocol": "amqp",
- "hostname": env.amqp_hostname,
- "port": +( env.amqp_port || 0 ),
- "username": env.amqp_username,
- "password": env.amqp_password,
- "locale": "en_US",
- "frameMax": env.amqp_frameMax,
- "heartbeat": env.amqp_heartbeat,
- "vhost": env.amqp_vhost,
- "exchange": env.amqp_exchange,
- "retries": env.amqp_retries || 30,
- "retry_wait": env.amqp_retry_wait || 1,
+ 'protocol': 'amqp',
+ 'hostname': env.amqp_hostname,
+ 'port': +( env.amqp_port || 0 ),
+ 'username': env.amqp_username,
+ 'password': env.amqp_password,
+ 'locale': 'en_US',
+ 'frameMax': env.amqp_frameMax,
+ 'heartbeat': env.amqp_heartbeat,
+ 'vhost': env.amqp_vhost,
+ 'exchange': env.amqp_exchange,
+ 'retries': env.amqp_retries || 30,
+ 'retry_wait': env.amqp_retry_wait || 1,
};
}
@@ -283,8 +342,9 @@ function _getAmqpConfig( env: any ): AmqpConfig
function _getPrometheusConfig( env: any ): PrometheusConfig
{
return <PrometheusConfig>{
- "hostname": env.prom_hostname,
- "port": +( env.prom_port || 0 ),
- "env": process.env.NODE_ENV,
+ 'hostname': env.prom_hostname,
+ 'port': +( env.prom_port || 0 ),
+ 'env': process.env.NODE_ENV,
+ 'push_interval_ms': +( process.env.prom_push_interval_ms || 5000 ),
};
} \ No newline at end of file