Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 13:24:41 -0500
committerMike Gerwitz <mike.gerwitz@ryansg.com>2019-11-12 16:21:01 -0500
commit91a7cf94b2266b5f942378c68e1251ff31438db0 (patch)
tree501cea5b5476f0a01f66d861324f8e38d86b7445 /src
parentd0b2a4ce73ce47030e1bc49e9b68a2bad9069ac3 (diff)
downloadliza-91a7cf94b2266b5f942378c68e1251ff31438db0.tar.gz
liza-91a7cf94b2266b5f942378c68e1251ff31438db0.tar.bz2
liza-91a7cf94b2266b5f942378c68e1251ff31438db0.zip
[DEV-5312] Add dao for deltas
Diffstat (limited to 'src')
-rw-r--r--src/server/db/DeltaDao.ts76
-rw-r--r--src/server/db/MongoDeltaDao.ts174
-rw-r--r--src/types/mongodb.d.ts5
3 files changed, 254 insertions, 1 deletions
diff --git a/src/server/db/DeltaDao.ts b/src/server/db/DeltaDao.ts
new file mode 100644
index 0000000..4728d99
--- /dev/null
+++ b/src/server/db/DeltaDao.ts
@@ -0,0 +1,76 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * These types are used to describe the structure of the token data as it
+ * is stored in Mongo. It has a number of undesirable properties and
+ * duplicates data---this was intended to make querying easier and work
+ * around Mongo limitations.
+ *
+ * This structure can be changed in the future, but we'll need to maintain
+ * compatibility with the existing data.
+ */
+
+import { DocumentId } from "../../document/Document";
+import { PositiveInteger } from "../../numeric";
+
+
+/** Manage deltas */
+export interface DeltaDao
+{
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(
+ callback: ( data: Record<string, any> | null ) => void
+ ): this;
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - The document whose index will be set
+ * @param index - The index to set
+ */
+ advanceDeltaIndexByType(
+ doc_id: DocumentId,
+ type: string,
+ index: PositiveInteger,
+ callback: ( err: NullableError, indexHasAdvanced: boolean ) => void,
+ ): this;
+
+
+ /**
+ * Mark a given document as processed. First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ *
+ * @return true if the document was successfully marked as processed
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ callback: ( err: NullableError, markedSuccessfully: boolean ) => void,
+ ): this;
+}
+
diff --git a/src/server/db/MongoDeltaDao.ts b/src/server/db/MongoDeltaDao.ts
new file mode 100644
index 0000000..4b98ad6
--- /dev/null
+++ b/src/server/db/MongoDeltaDao.ts
@@ -0,0 +1,174 @@
+/**
+ * Delta data access
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Get deltas from the mongo document in order to process and publish them
+ */
+
+import { DocumentId } from "../../document/Document";
+import { PositiveInteger } from "../../numeric";
+import { MongoCollection } from "mongodb";
+import { DeltaDao } from "./DeltaDao";
+
+
+export type MongoDeltaType = 'ratedata' | 'data';
+
+
+/** Manage deltas */
+export class MongoDeltaDao implements DeltaDao
+{
+ /** The ratedata delta type */
+ static readonly DELTA_RATEDATA: string = 'ratedata';
+
+ /** The data delta type */
+ static readonly DELTA_DATA: string = 'data';
+
+
+ /**
+ * Initialize connection
+ *
+ * @param _collection Mongo collection
+ */
+ constructor(
+ private readonly _collection: MongoCollection,
+ ) {}
+
+
+ /**
+ * Get documents in need of processing
+ *
+ * @return documents in need of processing
+ */
+ getUnprocessedDocuments(
+ callback: ( data: Record<string, any> | null ) => void,
+ ): this
+ {
+ var self = this;
+
+ this._collection.find(
+ { published: false },
+ {},
+ function( _err, cursor )
+ {
+ cursor.toArray( function( _err: NullableError, data: any[] )
+ {
+ // was the quote found?
+ if ( data.length == 0 )
+ {
+ callback.call( self, null );
+
+ return;
+ }
+
+ // return the quote data
+ callback.call( self, data );
+ });
+ }
+ )
+
+ return this;
+ }
+
+
+ /**
+ * Set the document's processed index
+ *
+ * @param doc_id - Document whose index will be set
+ * @param type - Delta type
+ * @param index - Index to set
+ * @param callback - Callback function
+ */
+ advanceDeltaIndexByType(
+ doc_id: DocumentId,
+ type: MongoDeltaType,
+ index: PositiveInteger,
+ callback: ( err: NullableError, indexAdvanced: boolean ) => void,
+ ): this
+ {
+ var self = this;
+
+ const set_data: Record<string, any> = {};
+
+ set_data[ 'lastPublishDelta.' + type ] = index;
+
+ this._collection.update(
+ { id: doc_id },
+ { $set: set_data },
+ { upsert: true },
+ function( err )
+ {
+ if ( err )
+ {
+ callback.call( self, err, false );
+
+ return;
+ }
+
+ callback.call( self, null, true );
+
+ return;
+ }
+ );
+
+ return this;
+ }
+
+
+ /**
+ * Mark a given document as processed. First does a check to make sure that
+ * the document does not have a newer update timestamp than the provided one
+ *
+ * @param doc_id - The document to mark
+ * @param last_update_ts - The last time this document was updated
+ *
+ * @return true if the document was successfully marked as processed
+ */
+ markDocumentAsProcessed(
+ doc_id: DocumentId,
+ last_update_ts: UnixTimestamp,
+ callback: ( err: NullableError, indexAdvanced: boolean ) => void,
+ ): this
+ {
+ var self = this;
+
+ this._collection.update(
+ { id: doc_id, lastUpdate: { $gt: last_update_ts } },
+ { $set: { processed: true } },
+ { upsert: false },
+ function( err, result )
+ {
+ if ( err )
+ {
+ callback.call( self, err, false );
+
+ return;
+ }
+
+ console.log( '-------', result );
+
+ callback.call( self, null, true );
+
+ return;
+ }
+ );
+
+ return this;
+ }
+}
+
diff --git a/src/types/mongodb.d.ts b/src/types/mongodb.d.ts
index 4b0e21f..6cc221f 100644
--- a/src/types/mongodb.d.ts
+++ b/src/types/mongodb.d.ts
@@ -69,6 +69,9 @@ interface MongoFindOptions
{
/** Limit results returned */
limit?: PositiveInteger,
+
+ /** Whether to project only id's */
+ id?: number,
}
@@ -148,7 +151,7 @@ declare interface MongoCollection
/**
- * Execute a query and return the first result
+ * Execute a query and return the results
*
* Unlike `update`, the callback return value is not propagated, and so
* the callback ought not return anything.