Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-11-12 15:07:37 -0500
committerMike Gerwitz <mike.gerwitz@ryansg.com>2019-11-12 16:21:01 -0500
commit950ae8818bbb13fc96dcec620308277a179ffc5a (patch)
tree420f78bc330fc61033a86c0f84daa735a564b8ef /src
parent91a7cf94b2266b5f942378c68e1251ff31438db0 (diff)
downloadliza-950ae8818bbb13fc96dcec620308277a179ffc5a.tar.gz
liza-950ae8818bbb13fc96dcec620308277a179ffc5a.tar.bz2
liza-950ae8818bbb13fc96dcec620308277a179ffc5a.zip
[DEV-5312] Add preliminary processor
Diffstat (limited to 'src')
-rw-r--r--src/server/db/MongoServerDao.ts4
-rw-r--r--src/system/DeltaProcessor.ts177
-rw-r--r--src/system/db/DeltaDao.ts (renamed from src/server/db/DeltaDao.ts)2
-rw-r--r--src/system/db/MongoDeltaDao.ts (renamed from src/server/db/MongoDeltaDao.ts)4
4 files changed, 182 insertions, 5 deletions
diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts
index 2cbce84..e2597cc 100644
--- a/src/server/db/MongoServerDao.ts
+++ b/src/server/db/MongoServerDao.ts
@@ -81,7 +81,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
*
* @return MongoServerDao self to allow for method chaining
*/
- init( callback: () => {} ): this
+ init( callback: () => void ): this
{
var dao = this;
@@ -109,7 +109,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao
*
* @return MongoServerDao self to allow for method chaining
*/
- connect( callback: () => {} ): this
+ connect( callback: () => void ): this
{
var dao = this;
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
new file mode 100644
index 0000000..6103f20
--- /dev/null
+++ b/src/system/DeltaProcessor.ts
@@ -0,0 +1,177 @@
+/**
+ * Delta Processor
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of the Liza Data Collection Framework.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import { DeltaDao } from "../system/db/DeltaDao";
+import { MongoDeltaType } from "../system/db/MongoDeltaDao";
+import { DeltaResult } from "../bucket/delta";
+import { DocumentId } from "../document/Document";
+
+
+/**
+ * Process deltas for a quote and publish to a queue
+ */
+export class DeltaProcessor
+{
+ /** The ratedata delta type */
+ readonly DELTA_RATEDATA: MongoDeltaType = 'ratedata';
+
+ /** The data delta type */
+ readonly DELTA_DATA: MongoDeltaType = 'data';
+
+ /** A mapping of which delta type translated to which avro event */
+ readonly DELTA_MAP: Record<string, string> = {
+ DELTA_RATEDATA: 'rate',
+ DELTA_DATA: 'update',
+ };
+
+
+ /**
+ * Initialize processor
+ *
+ * @param _collection Mongo collection
+ */
+ constructor(
+ private readonly _dao: DeltaDao,
+ ) {}
+
+
+ /**
+ * Process unpublished deltas
+ */
+ process(): void
+ {
+ let self = this;
+
+ this._dao.getUnprocessedDocuments( function( docs )
+ {
+ docs.forEach( doc => {
+
+ const deltas = self.getTimestampSortedDeltas( doc );
+
+ deltas.forEach( delta => {
+
+ // TODO: publish delta
+ // publisher.publish( delta, self.DELTA_MAP[ delta.type ] )
+ console.log( delta, self.DELTA_MAP[ delta.type ] );
+
+ });
+
+ const last_updated_ts = doc.lastUpdated;
+ const doc_id: DocumentId = doc.id;
+
+ self._dao.markDocumentAsProcessed(
+ doc_id,
+ last_updated_ts,
+ function( err, markedSuccessfully )
+ {
+ console.log( err, markedSuccessfully );
+ },
+ );
+ });
+ });
+ }
+
+
+ /**
+ * Get sorted list of deltas
+ *
+ * @param doc - the document
+ *
+ * @return a list of deltas sorted by timestamp
+ */
+ getTimestampSortedDeltas(
+ doc: any,
+ ): DeltaResult<any>[]
+ {
+ const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA );
+ const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA );
+ const deltas = data_deltas.concat( ratedata_deltas );
+
+ deltas.sort( this._sortByTimestamp );
+
+ return deltas;
+ }
+
+
+ /**
+ * Get trimmed delta list
+ *
+ * @param doc - the document
+ * @param type - the delta type to get
+ *
+ * @return a trimmed list of deltas
+ */
+ getDeltas(
+ doc: any,
+ type: MongoDeltaType,
+ ): DeltaResult<any>[]
+ {
+ // Get objects so we can get the index by type
+ const deltas_obj = doc.rdelta || {};
+
+ // Get type specific deltas
+ let last_published_index = 0;
+ if ( doc.lastPublishDelta )
+ {
+ const last_published_indexes = doc.lastPublishDelta;
+
+ last_published_index = last_published_indexes[ type ] || 0;
+ }
+
+ const deltas: DeltaResult<any>[] = deltas_obj[ type ] || [];
+
+ // Only return the unprocessed deltas
+ const deltas_trimmed = deltas.slice( last_published_index );
+
+ // Mark each delta with its type
+ deltas_trimmed.forEach( delta => {
+ delta.type = type;
+ });
+
+ return deltas_trimmed;
+ }
+
+
+ /**
+ * Sort an array of deltas by timestamp
+ *
+ * @param a - The first delta to compare
+ * @param a - The second delta to compare
+ *
+ * @return a sort value
+ */
+ private _sortByTimestamp(
+ a: DeltaResult<any>,
+ b: DeltaResult<any>,
+ ): number
+ {
+ if ( a.timestamp < b.timestamp )
+ {
+ return -1;
+ }
+
+ if ( a.timestamp > b.timestamp ) {
+ return 1;
+ }
+
+ return 0;
+ }
+} \ No newline at end of file
diff --git a/src/server/db/DeltaDao.ts b/src/system/db/DeltaDao.ts
index 4728d99..53cd8f5 100644
--- a/src/server/db/DeltaDao.ts
+++ b/src/system/db/DeltaDao.ts
@@ -40,7 +40,7 @@ export interface DeltaDao
* @return documents in need of processing
*/
getUnprocessedDocuments(
- callback: ( data: Record<string, any> | null ) => void
+ callback: ( data: Record<string, any>[] ) => void,
): this;
diff --git a/src/server/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts
index 4b98ad6..cebf453 100644
--- a/src/server/db/MongoDeltaDao.ts
+++ b/src/system/db/MongoDeltaDao.ts
@@ -56,7 +56,7 @@ export class MongoDeltaDao implements DeltaDao
* @return documents in need of processing
*/
getUnprocessedDocuments(
- callback: ( data: Record<string, any> | null ) => void,
+ callback: ( data: Record<string, any>[] ) => void,
): this
{
var self = this;
@@ -71,7 +71,7 @@ export class MongoDeltaDao implements DeltaDao
// was the quote found?
if ( data.length == 0 )
{
- callback.call( self, null );
+ callback.call( self, [] );
return;
}