diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/system/DeltaProcessor.ts | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts index d68615f..0e5201a 100644 --- a/src/system/DeltaProcessor.ts +++ b/src/system/DeltaProcessor.ts @@ -31,9 +31,18 @@ import { ReverseDelta, } from "../bucket/delta"; +/** Deltas and state of data prior to their application */ +type DeltaState = [ + Delta<any>, + Record<string, any>, + Record<string, any>, +]; + /** * Process deltas for a quote and publish to a queue + * + * TODO: Decouple from applyDelta */ export class DeltaProcessor { @@ -87,10 +96,12 @@ export class DeltaProcessor const deltas = this._getTimestampSortedDeltas( doc ); const doc_id = doc.id; const bucket = doc.data; - const ratedata = doc.ratedata; + const ratedata = doc.ratedata || {}; const last_updated_ts = doc.lastUpdate; - return this._processNextDelta( doc_id, deltas, bucket, ratedata ) + const history = this._applyDeltas( deltas, bucket, ratedata ); + + return this._processNextDelta( doc_id, history ) .then( _ => this._dao.markDocumentAsProcessed( doc_id, last_updated_ts ) ) @@ -106,41 +117,90 @@ export class DeltaProcessor } + /** + * Produce states of buckets at each point in history + * + * For bucket data, each tuple will contain the state of the bucket + * prior to the corresponding delta having been applied. For rate data, + * the tuple will also contain the state of the bucket at the point of + * rating. + * + * @param deltas - deltas to apply + * @param bucket - current state of bucket prior to deltas + * @param ratedata - current state of ratedata prior to deltas + * + * @return deltas paired with state prior to its application + */ + private _applyDeltas( + deltas: Delta<any>[], + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): DeltaState[] + { + const pairs: DeltaState[] = []; + + let bucket_state = bucket; + let ratedata_state = ratedata; + let i = deltas.length; + + while ( i-- ) + { + let delta = deltas[ i ]; + + pairs[ i ] = [ + delta, + bucket_state, + ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {}, + ]; + + // Don't apply the final delta, since we won't use it + if ( i === 0 ) + { + break; + } + + if ( delta.type === this.DELTA_DATA ) + { + bucket_state = applyDelta( + Object.create( bucket_state ), + deltas[ i ].data, + ); + } + else + { + ratedata_state = applyDelta( + Object.create( ratedata_state ), + deltas[ i ].data, + ); + } + } + + return pairs; + } + + private _processNextDelta( - doc_id: DocumentId, - deltas: Delta<any>[], - bucket: Record<string, any>, - ratedata?: Record<string, any>, + doc_id: DocumentId, + history: DeltaState[], ): Promise<void> { - const delta = deltas.shift(); - - if ( !delta ) + if ( history.length === 0 ) { return Promise.resolve(); } + const [ delta, bucket, ratedata ] = history[ 0 ]; + const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type; this._emitter.emit( 'delta-process-start', delta_uid ); - if ( delta.type == this.DELTA_DATA ) - { - bucket = applyDelta( bucket, delta.data ); - } - else - { - ratedata = applyDelta( ratedata, delta.data ); - } - return this._publisher.publish( doc_id, delta, bucket, ratedata ) .then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) ) .then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) ) .then( _ => this._processNextDelta( doc_id, - deltas, - bucket, - ratedata + history.slice( 1 ), ) ); } |