Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-21 15:59:17 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-11-22 16:37:57 -0500
commitfaa7e15760d1136ef44b15a914c5dd582f86a903 (patch)
tree789e48bf79e12accb31712791de7c65a87a5eb86 /src/system
parent9b5cd4e89f8d8778994a01eca641d4b78296c4d9 (diff)
downloadliza-faa7e15760d1136ef44b15a914c5dd582f86a903.tar.gz
liza-faa7e15760d1136ef44b15a914c5dd582f86a903.tar.bz2
liza-faa7e15760d1136ef44b15a914c5dd582f86a903.zip
[DEV-5312] Add signal handling and prometheus monitoring
Diffstat (limited to 'src/system')
-rw-r--r--src/system/AmqpPublisher.ts2
-rw-r--r--src/system/DeltaLogger.ts6
-rw-r--r--src/system/DeltaProcessor.ts54
-rw-r--r--src/system/DeltaPublisher.ts279
-rw-r--r--src/system/MetricsCollector.ts210
-rw-r--r--src/system/db/DeltaDao.ts22
-rw-r--r--src/system/db/MongoDeltaDao.ts91
7 files changed, 506 insertions, 158 deletions
diff --git a/src/system/AmqpPublisher.ts b/src/system/AmqpPublisher.ts
index 019ea0d..622d659 100644
--- a/src/system/AmqpPublisher.ts
+++ b/src/system/AmqpPublisher.ts
@@ -38,5 +38,5 @@ export interface AmqpPublisher
*
* @param delta - The delta to publish
*/
- publish( delta: DeltaResult<any> ): Promise<null>;
+ publish( delta: DeltaResult<any> ): Promise<NullableError>;
}
diff --git a/src/system/DeltaLogger.ts b/src/system/DeltaLogger.ts
index 50e616e..f286387 100644
--- a/src/system/DeltaLogger.ts
+++ b/src/system/DeltaLogger.ts
@@ -55,7 +55,9 @@ export class DeltaLogger
private readonly _env: string,
private readonly _subscriber: EventSubscriber,
private readonly _ts_ctr : () => UnixTimestamp,
- ) {}
+ ) {
+ this.init();
+ }
/**
@@ -65,7 +67,7 @@ export class DeltaLogger
{
this._registerEvent( 'document-processed', LogLevel.NOTICE );
this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
+ this._registerEvent( 'avro-err', LogLevel.ERROR );
this._registerEvent( 'mongodb-err', LogLevel.ERROR );
this._registerEvent( 'publish-err', LogLevel.ERROR );
}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 39cdf3d..db5be5b 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -59,7 +59,7 @@ export class DeltaProcessor
{
let self = this;
- this._dao.getUnprocessedDocuments()
+ self._dao.getUnprocessedDocuments()
.then( docs =>
{
docs.forEach( doc =>
@@ -68,23 +68,50 @@ export class DeltaProcessor
const doc_id: DocumentId = doc.id;
const last_updated_ts = doc.lastUpdate;
- deltas.forEach( delta =>
+ for ( let i = 0; i < deltas.length; i++ )
{
+ const delta = deltas[ i ];
+ const startTime = process.hrtime();
+ let error = null;
+
self._publisher.publish( delta )
.then( _ =>
{
self._dao.advanceDeltaIndex( doc_id, delta.type );
} )
- .catch( _ =>
+ .catch( err =>
{
- // TODO: blow up?
+ self._dao.setErrorFlag( doc_id );
+
+ error = err;
} );
- });
+
+ // Do not process any more deltas for
+ // this document if there was an error
+ if ( error )
+ {
+ self._dispatcher.dispatch(
+ 'delta-process-error',
+ error
+ );
+
+ return;
+ }
+ else
+ {
+ const elapsedTime = process.hrtime( startTime );
+
+ self._dispatcher.dispatch(
+ 'delta-process-complete',
+ elapsedTime[ 1 ] / 10000
+ );
+ }
+ };
self._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
.then( _ =>
{
- this._dispatcher.dispatch(
+ self._dispatcher.dispatch(
'document-processed',
'Deltas on document ' + doc_id + ' processed '
+ 'successfully. Document has been marked as '
@@ -93,13 +120,13 @@ export class DeltaProcessor
} )
.catch( err =>
{
- this._dispatcher.dispatch( 'mongodb-err', err );
+ self._dispatcher.dispatch( 'mongodb-err', err );
} );
- });
+ } );
} )
.catch( err =>
{
- this._dispatcher.dispatch( 'mongodb-err', err );
+ self._dispatcher.dispatch( 'mongodb-err', err );
} );
}
@@ -137,14 +164,15 @@ export class DeltaProcessor
const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
// Get type specific delta index
- let last_published_index = 0;
- if ( doc.lastPublishDelta )
+ let published_count = 0;
+ if ( doc.totalPublishDelta )
{
- last_published_index = doc.lastPublishDelta[ type ] || 0;
+ published_count = doc.totalPublishDelta[ type ] || 0;
}
// Only return the unprocessed deltas
- const deltas_trimmed = deltas.slice( last_published_index );
+ console.log( published_count );
+ const deltas_trimmed = deltas.slice( published_count );
// Mark each delta with its type
deltas_trimmed.forEach( delta =>
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index c67cde4..c718756 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -27,7 +27,8 @@ import { EventDispatcher } from './event/EventDispatcher';
import {
connect as amqpConnect,
Options,
- Channel
+ Channel,
+ Connection,
} from 'amqplib';
const avro = require( 'avro-js' );
@@ -38,8 +39,23 @@ export interface AmqpConfig extends Options.Connect {
}
+export interface AvroSchema {
+ /** Write data to a buffer */
+ toBuffer( data: Record<string, any> ): Buffer | null;
+}
+
+
export class DeltaPublisher implements AmqpPublisher
{
+ /** The amqp connection */
+ private _conn?: Connection;
+
+ /** The amqp channel */
+ private _channel?: Channel;
+
+ /** The avro schema */
+ private _type?: AvroSchema;
+
/** The path to the avro schema */
readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
@@ -51,7 +67,7 @@ export class DeltaPublisher implements AmqpPublisher
/**
- * Initialize publisher
+ * Delta publisher
*
* @param _conf - amqp configuration
* @param _emitter - event emitter instance
@@ -61,67 +77,94 @@ export class DeltaPublisher implements AmqpPublisher
private readonly _conf: AmqpConfig,
private readonly _dispatcher: EventDispatcher,
private readonly _ts_ctr : () => UnixTimestamp,
- ) {}
+ ) {
+ this._type = avro.parse( this.SCHEMA_PATH );
+ }
/**
- * Publish quote message to exchange post-rating
- *
- * @param delta - The delta to publish
- *
- * @return whether the message was published successfully
- */
- publish( delta: DeltaResult<any> ): Promise<null>
+ * Initialize connection
+ */
+ connect(): Promise<NullableError>
{
- const exchange = this._conf.exchange;
-
return new Promise<null>( ( resolve, reject ) =>
{
amqpConnect( this._conf )
.then( conn =>
{
- setTimeout( () => conn.close(), 10000 );
- return conn.createChannel();
+ this._conn = conn;
+
+ return this._conn.createChannel();
} )
- .then( ch =>
+ .then( ( ch: Channel ) =>
{
- ch.assertExchange( exchange, 'fanout', { durable: true } );
+ this._channel = ch;
+
+ this._channel.assertExchange(
+ this._conf.exchange,
+ 'fanout',
+ { durable: true }
+ );
- return this.sendMessage( ch, exchange, delta );
+ resolve();
+ return;
} )
- .then( sentSuccessfully =>
+ .catch( e =>
{
- console.log('sentSuccessfully', sentSuccessfully);
- if ( sentSuccessfully )
- {
- this._dispatcher.dispatch(
- 'delta-publish',
- "Published " + delta.type + " delta with ts '"
- + delta.timestamp + "' to '" + exchange
- + '" exchange',
- );
-
- resolve();
- }
- else
- {
- this._dispatcher.dispatch(
- 'publish-err',
- "Error publishing " + delta.type + " delta with ts '"
- + delta.timestamp + "' to '" + exchange
- + "' exchange",
- );
-
- reject();
- }
+ reject( e );
+ return;
+ } );
+ } );
+ }
+
+
+ /**
+ * Close the amqp conenction
+ */
+ close(): void
+ {
+ if ( this._conn )
+ {
+ this._conn.close.bind(this._conn);
+ }
+ }
+
+
+ /**
+ * Publish quote message to exchange post-rating
+ *
+ * @param delta - The delta to publish
+ *
+ * @return whether the message was published successfully
+ */
+ publish( delta: DeltaResult<any> ): Promise<NullableError>
+ {
+ return new Promise<NullableError>( ( resolve, reject ) =>
+ {
+ const startTime = process.hrtime();
+
+ this.sendMessage( delta )
+ .then( _ =>
+ {
+ this._dispatcher.dispatch(
+ 'delta-publish',
+ "Published " + delta.type + " delta with ts '"
+ + delta.timestamp + "' to '" + this._conf.exchange
+ + '" exchange',
+ );
+
+ console.log('#publish: '
+ + process.hrtime( startTime )[0] / 10000 );
+ resolve();
+ return;
} )
.catch( e =>
{
this._dispatcher.dispatch(
'publish-err',
"Error publishing " + delta.type + " delta with ts '"
- + delta.timestamp + '" to "' + exchange + "' exchange '"
- + e,
+ + delta.timestamp + '" to "' + this._conf.exchange
+ + "' exchange: '" + e,
)
reject();
@@ -133,75 +176,92 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Send message to exchange
*
- * @param channel - AMQP channel
- * @param exchange - exchange name
- * @param delta - The delta to publish
+ * @param delta - The delta to publish
*
* @return whether publish was successful
*/
- sendMessage(
- channel: Channel,
- exchange: string,
- delta: DeltaResult<any>,
- ): boolean
+ sendMessage( delta: DeltaResult<any> ): Promise<NullableError>
{
- const headers = {
- version: 1,
- created: Date.now(),
- };
-
- // Convert all delta datums to string for avro
- const delta_data = this.avroFormat( delta.data );
- const event_id = this.DELTA_MAP[ delta.type ];
-
- const data = {
- event: {
- id: event_id,
- ts: this._ts_ctr(),
- actor: 'SERVER',
- step: null,
- },
- document: {
- id: 123123, // Fix
- },
- session: {
- entity_name: 'Foobar', // Fix
- entity_id: 123123, // Fix
- },
- data: {
- Data: {
- bucket: delta_data,
+ return new Promise<NullableError>( ( resolve, reject ) =>
+ {
+ const startTime = process.hrtime();
+
+ const ts = this._ts_ctr();
+ const headers = { version: 1, created: ts };
+ const delta_data = this.avroFormat( delta.data );
+ console.log('#sendmessage 1: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+ const event_id = this.DELTA_MAP[ delta.type ];
+ const avro_buffer = this.avroEncode( {
+ event: {
+ id: event_id,
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
},
- },
- delta: {
- Data: {
- bucket: delta_data,
+ document: {
+ id: 123123, // Fix
},
- },
- program: {
- Program: {
- id: 'quote_server',
- version: 'dadaddwafdwa', // Fix
+ session: {
+ entity_name: 'Foobar', // Fix
+ entity_id: 123123, // Fix
},
- },
- };
-
- const avro_buffer = this.avroEncode( data );
+ data: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa', // Fix
+ },
+ },
+ } );
+ console.log('#sendmessage 2: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
- if ( !avro_buffer )
- {
- return false;
- }
+ if ( !this._conn )
+ {
+ reject( 'Error sending message: No connection' );
+ return;
+ }
+ else if ( !this._channel )
+ {
+ reject( 'Error sending message: No channel' );
+ return;
+ }
+ else if ( !avro_buffer )
+ {
+ reject( 'Error sending message: No avro buffer' );
+ return;
+ }
+ console.log('#sendmessage 3: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+
+ // we don't use a routing key; fanout exchange
+ const published_successfully = this._channel.publish(
+ this._conf.exchange,
+ '',
+ avro_buffer,
+ { headers: headers },
+ );
- // we don't use a routing key; fanout exchange
- const routing_key = '';
+ if ( published_successfully )
+ {
+ console.log('#sendmessage 4: '
+ + (process.hrtime( startTime )[ 1 ] / 10000) + 'ms');
+ resolve();
+ return;
+ }
- return channel.publish(
- exchange,
- routing_key,
- avro_buffer,
- { headers: headers },
- );
+ reject( 'Error sending message: publishing failed' );
+ } );
}
@@ -218,13 +278,22 @@ export class DeltaPublisher implements AmqpPublisher
try
{
- const type = avro.parse( this.SCHEMA_PATH );
- buffer = type.toBuffer( data );
+ if ( !this._type )
+ {
+ this._dispatcher.dispatch(
+ 'avro-err',
+ 'No avro scheama found',
+ );
+
+ return null;
+ }
+
+ buffer = this._type.toBuffer( data );
}
catch( e )
{
this._dispatcher.dispatch(
- 'avro-parse-err',
+ 'avro-err',
'Error encoding data to avro: ' + e,
);
}
diff --git a/src/system/MetricsCollector.ts b/src/system/MetricsCollector.ts
index 583b5a6..2d7ca24 100644
--- a/src/system/MetricsCollector.ts
+++ b/src/system/MetricsCollector.ts
@@ -22,41 +22,177 @@
*/
import { EventSubscriber } from "./event/EventSubscriber";
+import { DeltaDao } from "./db/DeltaDao";
+import { PositiveInteger } from "../numeric";
+import { Histogram, Pushgateway, Counter, Gauge } from 'prom-client';
-const client = require('prom-client');
+const client = require( 'prom-client' );
-declare type MetricStructure = {
- path: string;
- code: number;
- service: string;
- env: string;
+
+// declare type MetricStructure = {
+// path: string;
+// code: number;
+// service: string;
+// env: string;
+// }
+
+
+export declare type PrometheusConfig = {
+ /** The hostname to connect to */
+ hostname: string;
+
+ /** The port to connect to */
+ port: number;
+
+ /** The environment ( dev, test, demo, live ) */
+ env: string;
}
+
export class MetricsCollector
{
+ /** The prometheus PushGateway */
+ private _gateway: Pushgateway;
+
+ /** Metric push interval */
+ private _push_interval_ms: PositiveInteger = <PositiveInteger>5000;
+
+ /** Delta processed time histogram */
+ private _process_time_hist: Histogram;
+ private _process_time_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_process_time'
+ };
+
+ /** Delta error counter */
+ private _process_error_count: Counter;
+ private _process_error_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_error'
+ };
+
+ /** Delta current error gauge */
+ private _current_error_gauge: Gauge;
+ private _current_error_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_current_error'
+ };
+
+ /** Delta error counter */
+ private _process_delta_count: Counter;
+ private _process_delta_params: Pushgateway.Parameters = {
+ jobName: 'liza_delta_success'
+ };
+
/**
* Initialize delta logger
+ *
+ * @param _conf - the prometheus configuration
+ * @param _subscriber - the event subscriber
*/
constructor(
- private readonly _env: string,
+ private readonly _conf: PrometheusConfig,
private readonly _subscriber: EventSubscriber,
- ) {}
+ ) {
+ // Set labels
+ const default_labels = {
+ env: this._conf.env,
+ service: 'delta_processor',
+ };
+
+ client.register.setDefaultLabels( default_labels );
+
+ // Create gateway
+ const url = 'http://' + this._conf.hostname + ':' + this._conf.port;
+ this._gateway = new client.Pushgateway( url );
+
+ // Create metrics
+ this._process_time_hist = new client.Histogram( {
+ name: this._process_time_params.jobName,
+ help: 'Time in ms for deltas to be processed',
+ labelNames: [ 'env', 'service' ],
+ buckets: client.linearBuckets(0, 10, 10),
+ } );
+
+ this._process_error_count = new client.Counter( {
+ name: this._process_error_params.jobName,
+ help: 'Error count for deltas being processed',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ this._current_error_gauge = new client.Gauge( {
+ name: this._current_error_params.jobName,
+ help: 'The current number of documents in an error state',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ this._process_delta_count = new client.Counter( {
+ name: this._process_delta_params.jobName,
+ help: 'Count of deltas successfully processed',
+ labelNames: [ 'env', 'service' ],
+ } );
+
+ // Push metrics on a specific intervals
+ setInterval( () => { this.pushMetrics(); }, this._push_interval_ms );
+
+ // Subsribe metrics to events
+ this.subscribeMetrics();
+ }
/**
- * Initialize the logger to look for specific events
+ * Subscribe metrics
*/
- init(): void
+ private subscribeMetrics()
{
- const collectDefaultMetrics = client.collectDefaultMetrics;
-
- console.log( this._subscriber, collectDefaultMetrics)
- this._formatLog( '', 123 );
- // this._registerEvent( 'document-processed', LogLevel.NOTICE );
- // this._registerEvent( 'delta-publish', LogLevel.NOTICE );
- // this._registerEvent( 'avro-parse-err', LogLevel.ERROR );
- // this._registerEvent( 'mongodb-err', LogLevel.ERROR );
- // this._registerEvent( 'publish-err', LogLevel.ERROR );
+ this._subscriber.subscribe(
+ 'delta-process-complete',
+ ( val ) =>
+ {
+ console.log( 'Got time: ' + val + 'ms' );
+ this._process_time_hist.observe( val );
+ this._process_delta_count.inc();
+ }
+ );
+
+ this._subscriber.subscribe(
+ 'delta-process-error',
+ ( _ ) => this._process_error_count.inc()
+ );
+ }
+
+
+ /**
+ * Push metrics to Prometheus PushGateway
+ */
+ private pushMetrics(): void
+ {
+ // this._gateway.pushAdd( this._process_time_params, this.pushCallback );
+ // this._gateway.pushAdd( this._process_error_params, this.pushCallback );
+ // this._gateway.pushAdd( this._current_error_params, this.pushCallback );
+ // this._gateway.pushAdd( this._process_delta_params, this.pushCallback );
+
+ this._gateway.pushAdd(
+ {
+ jobName: 'liza_delta_metrics'
+ },
+ this.pushCallback
+ );
+ }
+
+
+ /**
+ * Handle push error
+ *
+ * @param error - Any errors that occurred
+ * @param response - The http response
+ * @param body - The resposne body
+ */
+ private pushCallback(
+ _error?: Error | undefined,
+ _response?: any,
+ _body?: any
+ ): void
+ {
+ // console.log( 'Push callback' );
+ // console.error( error, response, body );
}
@@ -68,13 +204,35 @@ export class MetricsCollector
*
* @returns a structured logging object
*/
- private _formatLog( path: string, code: number ): MetricStructure
+ // private _formatMetricVal( label: string, val: any ): MetricStructure
+ // {
+ // return <MetricStructure>{
+ // path: path,
+ // code: code,
+ // service: 'quote-server',
+ // env: this._conf.env,
+ // };
+ // }
+
+
+ /**
+ * Look for mongodb delta errors and update metrics if found
+ *
+ * @return any errors the occurred
+ */
+ checkForErrors( dao: DeltaDao ): NullableError
{
- return <MetricStructure>{
- path: path,
- code: code,
- service: 'quote-server',
- env: this._env,
- };
+ dao.getErrorCount()
+ .then( count =>
+ {
+ // console.log( 'Error count: ', count );
+ this._current_error_gauge.set( +count );
+ } )
+ .catch( err =>
+ {
+ return err;
+ } );
+
+ return null;
}
}
diff --git a/src/system/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index 64e1f0f..173fcee 100644
--- a/src/system/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -47,7 +47,7 @@ export interface DeltaDao
* @param doc_id - Document whose index will be set
* @param type - Delta type
*
- * @return any errors that occured
+ * @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@@ -62,11 +62,29 @@ export interface DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
- * @return true if the document was successfully marked as processed
+ * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
last_update_ts: UnixTimestamp,
): Promise<NullableError>
+
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise<number | Error>
}
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 443ae85..81816e0 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -59,7 +59,7 @@ export class MongoDeltaDao implements DeltaDao
*
* connectError event will be emitted on failure.
*
- * @return any errors that occured
+ * @return any errors that occurred
*/
init(): Promise<NullableError>
{
@@ -73,14 +73,18 @@ export class MongoDeltaDao implements DeltaDao
// if there was an error, don't bother with anything else
if ( err )
{
- // in some circumstances, it may just be telling us that we're
- // already connected (even though the connection may have been
- // broken)
+ // in some circumstances, it may just be telling us that
+ // we're already connected (even though the connection may
+ // have been broken)
if ( err.errno !== undefined )
{
reject( 'Error opening mongo connection: ' + err );
return;
}
+ } else if ( db == null )
+ {
+ reject( 'No database connection' );
+ return;
}
// quotes collection
@@ -116,7 +120,7 @@ export class MongoDeltaDao implements DeltaDao
);
}
);
- });
+ } );
} );
}
@@ -165,8 +169,10 @@ export class MongoDeltaDao implements DeltaDao
/**
* Set the document's processed index
*
- * @param doc_id - Document whose index will be set
- * @param type - Delta type
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ *
+ * @return any errors that occurred
*/
advanceDeltaIndex(
doc_id: DocumentId,
@@ -177,7 +183,7 @@ export class MongoDeltaDao implements DeltaDao
{
const inc_data: Record<string, any> = {};
- inc_data[ 'lastPublishDelta.' + type ] = 1;
+ inc_data[ 'totalPublishDelta.' + type ] = 1;
this._collection!.update(
{ id: doc_id },
@@ -206,7 +212,7 @@ export class MongoDeltaDao implements DeltaDao
* @param doc_id - The document to mark
* @param last_update_ts - The last time this document was updated
*
- * @return true if the document was successfully marked as processed
+ * @return any errors that occurred
*/
markDocumentAsProcessed(
doc_id: DocumentId,
@@ -231,7 +237,74 @@ export class MongoDeltaDao implements DeltaDao
return;
}
);
+ } );
+ }
+
+ /**
+ * Flag the document as being in an error state
+ *
+ * @param doc_id - The document to flag
+ *
+ * @return any errors that occurred
+ */
+ setErrorFlag( doc_id: DocumentId ): Promise<NullableError>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.update(
+ { id: doc_id },
+ { $set: { deltaError: true } },
+ { upsert: false },
+ function( err )
+ {
+ if ( err )
+ {
+ reject( "Failed setting error flag: " + err );
+ return;
+ }
+
+ resolve();
+ return;
+ }
+ );
+ } );
+ }
+
+
+ /**
+ * Get a count of documents in an error state
+ *
+ * @return a count of the documents in an error state
+ */
+ getErrorCount(): Promise<number | Error>
+ {
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.find(
+ { deltaError: true },
+ {},
+ function( err, cursor )
+ {
+ if ( err )
+ {
+ reject( err );
+ return;
+ }
+
+ cursor.toArray( function( err: NullableError, data: any[] )
+ {
+ if ( err )
+ {
+ reject( err );
+ return;
+ }
+
+ // return the count
+ resolve( data.length );
+ });
+ }
+ )
} );
}
}