Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'src/system/db/MongoDeltaDao.ts')
-rw-r--r--src/system/db/MongoDeltaDao.ts212
1 files changed, 138 insertions, 74 deletions
diff --git a/src/system/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index cebf453..443ae85 100644
--- a/src/system/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -22,10 +22,8 @@
*/
import { DocumentId } from "../../document/Document";
-import { PositiveInteger } from "../../numeric";
-import { MongoCollection } from "mongodb";
import { DeltaDao } from "./DeltaDao";
-
+import { MongoCollection } from "mongodb";
export type MongoDeltaType = 'ratedata' | 'data';
@@ -33,56 +31,134 @@ export type MongoDeltaType = 'ratedata' | 'data';
/** Manage deltas */
export class MongoDeltaDao implements DeltaDao
{
+ /** Collection used to store quotes */
+ readonly COLLECTION: string = 'quotes';
+
/** The ratedata delta type */
static readonly DELTA_RATEDATA: string = 'ratedata';
/** The data delta type */
static readonly DELTA_DATA: string = 'data';
+ /** The mongo quotes collection */
+ private _collection?: MongoCollection | null;
+
/**
* Initialize connection
*
- * @param _collection Mongo collection
+ * @param _db Mongo db
*/
constructor(
- private readonly _collection: MongoCollection,
+ private readonly _db: any,
) {}
/**
- * Get documents in need of processing
+ * Attempts to connect to the database
*
- * @return documents in need of processing
+ * connectError event will be emitted on failure.
+ *
+ * @return any errors that occured
*/
- getUnprocessedDocuments(
- callback: ( data: Record<string, any>[] ) => void,
- ): this
+ init(): Promise<NullableError>
{
- var self = this;
+ var dao = this;
- this._collection.find(
- { published: false },
- {},
- function( _err, cursor )
+ return new Promise( ( resolve, reject ) =>
+ {
+ // attempt to connect to the database
+ this._db.open( function( err: any, db: any )
{
- cursor.toArray( function( _err: NullableError, data: any[] )
+ // if there was an error, don't bother with anything else
+ if ( err )
{
- // was the quote found?
- if ( data.length == 0 )
+ // in some circumstances, it may just be telling us that we're
+ // already connected (even though the connection may have been
+ // broken)
+ if ( err.errno !== undefined )
{
- callback.call( self, [] );
-
+ reject( 'Error opening mongo connection: ' + err );
return;
}
+ }
- // return the quote data
- callback.call( self, data );
- });
+ // quotes collection
+ db.collection(
+ dao.COLLECTION,
+ function(
+ _err: any,
+ collection: MongoCollection,
+ ) {
+ // for some reason this gets called more than once
+ if ( collection == null )
+ {
+ return;
+ }
+
+ // initialize indexes
+ collection.createIndex(
+ [ ['id', 1] ],
+ true,
+ function( err: any, _index: { [P: string]: any } )
+ {
+ if ( err )
+ {
+ reject( 'Error creating index: ' + err );
+ return;
+ }
+
+ // mark the DAO as ready to be used
+ dao._collection = collection;
+ resolve();
+ return;
+ }
+ );
+ }
+ );
+ });
+ } );
+ }
+
+
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(): Promise<Record<string, any>[]>
+ {
+ var self = this;
+
+ return new Promise( ( resolve, reject ) =>
+ {
+ if ( !self._collection )
+ {
+ reject( 'Database not ready' );
+ return;
}
- )
- return this;
+
+ this._collection!.find(
+ { published: false },
+ {},
+ function( _err, cursor )
+ {
+ cursor.toArray( function( _err: NullableError, data: any[] )
+ {
+ // was the quote found?
+ if ( data.length == 0 )
+ {
+ resolve( [] );
+ return;
+ }
+
+ // return the quote data
+ resolve( data );
+ });
+ }
+ )
+ } );
}
@@ -91,42 +167,35 @@ export class MongoDeltaDao implements DeltaDao
*
* @param doc_id - Document whose index will be set
* @param type - Delta type
- * @param index - Index to set
- * @param callback - Callback function
*/
- advanceDeltaIndexByType(
+ advanceDeltaIndex(
doc_id: DocumentId,
type: MongoDeltaType,
- index: PositiveInteger,
- callback: ( err: NullableError, indexAdvanced: boolean ) => void,
- ): this
+ ): Promise<NullableError>
{
- var self = this;
+ return new Promise( ( resolve, reject ) =>
+ {
+ const inc_data: Record<string, any> = {};
- const set_data: Record<string, any> = {};
+ inc_data[ 'lastPublishDelta.' + type ] = 1;
- set_data[ 'lastPublishDelta.' + type ] = index;
-
- this._collection.update(
- { id: doc_id },
- { $set: set_data },
- { upsert: true },
- function( err )
- {
- if ( err )
+ this._collection!.update(
+ { id: doc_id },
+ { $inc: inc_data },
+ { upsert: false },
+ function( err )
{
- callback.call( self, err, false );
+ if ( err )
+ {
+ reject( 'Error advancing delta index: ' + err )
+ return;
+ }
+ resolve();
return;
}
-
- callback.call( self, null, true );
-
- return;
- }
- );
-
- return this;
+ );
+ } );
}
@@ -140,35 +209,30 @@ export class MongoDeltaDao implements DeltaDao
* @return true if the document was successfully marked as processed
*/
markDocumentAsProcessed(
- doc_id: DocumentId,
- last_update_ts: UnixTimestamp,
- callback: ( err: NullableError, indexAdvanced: boolean ) => void,
- ): this
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ ): Promise<NullableError>
{
- var self = this;
-
- this._collection.update(
- { id: doc_id, lastUpdate: { $gt: last_update_ts } },
- { $set: { processed: true } },
- { upsert: false },
- function( err, result )
- {
- if ( err )
+ return new Promise( ( resolve, reject ) =>
+ {
+ this._collection!.update(
+ { id: doc_id, lastUpdate: { $lte: last_update_ts } },
+ { $set: { published: true } },
+ { upsert: false },
+ function( err )
{
- callback.call( self, err, false );
+ if ( err )
+ {
+ reject( "Error marking document as processed: " + err );
+ return;
+ }
+ resolve();
return;
}
+ );
- console.log( '-------', result );
-
- callback.call( self, null, true );
-
- return;
- }
- );
-
- return this;
+ } );
}
}