diff options
author | Austin Schaffer <austin.schaffer@ryansg.com> | 2019-11-12 15:07:37 -0500 |
---|---|---|
committer | Mike Gerwitz <mike.gerwitz@ryansg.com> | 2019-11-12 16:21:01 -0500 |
commit | 950ae8818bbb13fc96dcec620308277a179ffc5a (patch) | |
tree | 420f78bc330fc61033a86c0f84daa735a564b8ef /src | |
parent | 91a7cf94b2266b5f942378c68e1251ff31438db0 (diff) | |
download | liza-950ae8818bbb13fc96dcec620308277a179ffc5a.tar.gz liza-950ae8818bbb13fc96dcec620308277a179ffc5a.tar.bz2 liza-950ae8818bbb13fc96dcec620308277a179ffc5a.zip |
[DEV-5312] Add preliminary processor
Diffstat (limited to 'src')
-rw-r--r-- | src/server/db/MongoServerDao.ts | 4 | ||||
-rw-r--r-- | src/system/DeltaProcessor.ts | 177 | ||||
-rw-r--r-- | src/system/db/DeltaDao.ts (renamed from src/server/db/DeltaDao.ts) | 2 | ||||
-rw-r--r-- | src/system/db/MongoDeltaDao.ts (renamed from src/server/db/MongoDeltaDao.ts) | 4 |
4 files changed, 182 insertions, 5 deletions
diff --git a/src/server/db/MongoServerDao.ts b/src/server/db/MongoServerDao.ts index 2cbce84..e2597cc 100644 --- a/src/server/db/MongoServerDao.ts +++ b/src/server/db/MongoServerDao.ts @@ -81,7 +81,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao * * @return MongoServerDao self to allow for method chaining */ - init( callback: () => {} ): this + init( callback: () => void ): this { var dao = this; @@ -109,7 +109,7 @@ export class MongoServerDao extends EventEmitter implements ServerDao * * @return MongoServerDao self to allow for method chaining */ - connect( callback: () => {} ): this + connect( callback: () => void ): this { var dao = this; diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts new file mode 100644 index 0000000..6103f20 --- /dev/null +++ b/src/system/DeltaProcessor.ts @@ -0,0 +1,177 @@ +/** + * Delta Processor + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of the Liza Data Collection Framework. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +import { DeltaDao } from "../system/db/DeltaDao"; +import { MongoDeltaType } from "../system/db/MongoDeltaDao"; +import { DeltaResult } from "../bucket/delta"; +import { DocumentId } from "../document/Document"; + + +/** + * Process deltas for a quote and publish to a queue + */ +export class DeltaProcessor +{ + /** The ratedata delta type */ + readonly DELTA_RATEDATA: MongoDeltaType = 'ratedata'; + + /** The data delta type */ + readonly DELTA_DATA: MongoDeltaType = 'data'; + + /** A mapping of which delta type translated to which avro event */ + readonly DELTA_MAP: Record<string, string> = { + DELTA_RATEDATA: 'rate', + DELTA_DATA: 'update', + }; + + + /** + * Initialize processor + * + * @param _collection Mongo collection + */ + constructor( + private readonly _dao: DeltaDao, + ) {} + + + /** + * Process unpublished deltas + */ + process(): void + { + let self = this; + + this._dao.getUnprocessedDocuments( function( docs ) + { + docs.forEach( doc => { + + const deltas = self.getTimestampSortedDeltas( doc ); + + deltas.forEach( delta => { + + // TODO: publish delta + // publisher.publish( delta, self.DELTA_MAP[ delta.type ] ) + console.log( delta, self.DELTA_MAP[ delta.type ] ); + + }); + + const last_updated_ts = doc.lastUpdated; + const doc_id: DocumentId = doc.id; + + self._dao.markDocumentAsProcessed( + doc_id, + last_updated_ts, + function( err, markedSuccessfully ) + { + console.log( err, markedSuccessfully ); + }, + ); + }); + }); + } + + + /** + * Get sorted list of deltas + * + * @param doc - the document + * + * @return a list of deltas sorted by timestamp + */ + getTimestampSortedDeltas( + doc: any, + ): DeltaResult<any>[] + { + const data_deltas = this.getDeltas( doc, this.DELTA_RATEDATA ); + const ratedata_deltas = this.getDeltas( doc, this.DELTA_DATA ); + const deltas = data_deltas.concat( ratedata_deltas ); + + deltas.sort( this._sortByTimestamp ); + + return deltas; + } + + + /** + * Get trimmed delta list + * + * @param doc - the document + * @param type - the delta type to get + * + * @return a trimmed list of deltas + */ + getDeltas( + doc: any, + type: MongoDeltaType, + ): DeltaResult<any>[] + { + // Get objects so we can get the index by type + const deltas_obj = doc.rdelta || {}; + + // Get type specific deltas + let last_published_index = 0; + if ( doc.lastPublishDelta ) + { + const last_published_indexes = doc.lastPublishDelta; + + last_published_index = last_published_indexes[ type ] || 0; + } + + const deltas: DeltaResult<any>[] = deltas_obj[ type ] || []; + + // Only return the unprocessed deltas + const deltas_trimmed = deltas.slice( last_published_index ); + + // Mark each delta with its type + deltas_trimmed.forEach( delta => { + delta.type = type; + }); + + return deltas_trimmed; + } + + + /** + * Sort an array of deltas by timestamp + * + * @param a - The first delta to compare + * @param a - The second delta to compare + * + * @return a sort value + */ + private _sortByTimestamp( + a: DeltaResult<any>, + b: DeltaResult<any>, + ): number + { + if ( a.timestamp < b.timestamp ) + { + return -1; + } + + if ( a.timestamp > b.timestamp ) { + return 1; + } + + return 0; + } +}
\ No newline at end of file diff --git a/src/server/db/DeltaDao.ts b/src/system/db/DeltaDao.ts index 4728d99..53cd8f5 100644 --- a/src/server/db/DeltaDao.ts +++ b/src/system/db/DeltaDao.ts @@ -40,7 +40,7 @@ export interface DeltaDao * @return documents in need of processing */ getUnprocessedDocuments( - callback: ( data: Record<string, any> | null ) => void + callback: ( data: Record<string, any>[] ) => void, ): this; diff --git a/src/server/db/MongoDeltaDao.ts b/src/system/db/MongoDeltaDao.ts index 4b98ad6..cebf453 100644 --- a/src/server/db/MongoDeltaDao.ts +++ b/src/system/db/MongoDeltaDao.ts @@ -56,7 +56,7 @@ export class MongoDeltaDao implements DeltaDao * @return documents in need of processing */ getUnprocessedDocuments( - callback: ( data: Record<string, any> | null ) => void, + callback: ( data: Record<string, any>[] ) => void, ): this { var self = this; @@ -71,7 +71,7 @@ export class MongoDeltaDao implements DeltaDao // was the quote found? if ( data.length == 0 ) { - callback.call( self, null ); + callback.call( self, [] ); return; } |