Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/system/DeltaProcessor.ts102
1 files changed, 81 insertions, 21 deletions
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index d68615f..0e5201a 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -31,9 +31,18 @@ import {
ReverseDelta,
} from "../bucket/delta";
+/** Deltas and state of data prior to their application */
+type DeltaState = [
+ Delta<any>,
+ Record<string, any>,
+ Record<string, any>,
+];
+
/**
* Process deltas for a quote and publish to a queue
+ *
+ * TODO: Decouple from applyDelta
*/
export class DeltaProcessor
{
@@ -87,10 +96,12 @@ export class DeltaProcessor
const deltas = this._getTimestampSortedDeltas( doc );
const doc_id = doc.id;
const bucket = doc.data;
- const ratedata = doc.ratedata;
+ const ratedata = doc.ratedata || {};
const last_updated_ts = doc.lastUpdate;
- return this._processNextDelta( doc_id, deltas, bucket, ratedata )
+ const history = this._applyDeltas( deltas, bucket, ratedata );
+
+ return this._processNextDelta( doc_id, history )
.then( _ =>
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
)
@@ -106,41 +117,90 @@ export class DeltaProcessor
}
+ /**
+ * Produce states of buckets at each point in history
+ *
+ * For bucket data, each tuple will contain the state of the bucket
+ * prior to the corresponding delta having been applied. For rate data,
+ * the tuple will also contain the state of the bucket at the point of
+ * rating.
+ *
+ * @param deltas - deltas to apply
+ * @param bucket - current state of bucket prior to deltas
+ * @param ratedata - current state of ratedata prior to deltas
+ *
+ * @return deltas paired with state prior to its application
+ */
+ private _applyDeltas(
+ deltas: Delta<any>[],
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): DeltaState[]
+ {
+ const pairs: DeltaState[] = [];
+
+ let bucket_state = bucket;
+ let ratedata_state = ratedata;
+ let i = deltas.length;
+
+ while ( i-- )
+ {
+ let delta = deltas[ i ];
+
+ pairs[ i ] = [
+ delta,
+ bucket_state,
+ ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {},
+ ];
+
+ // Don't apply the final delta, since we won't use it
+ if ( i === 0 )
+ {
+ break;
+ }
+
+ if ( delta.type === this.DELTA_DATA )
+ {
+ bucket_state = applyDelta(
+ Object.create( bucket_state ),
+ deltas[ i ].data,
+ );
+ }
+ else
+ {
+ ratedata_state = applyDelta(
+ Object.create( ratedata_state ),
+ deltas[ i ].data,
+ );
+ }
+ }
+
+ return pairs;
+ }
+
+
private _processNextDelta(
- doc_id: DocumentId,
- deltas: Delta<any>[],
- bucket: Record<string, any>,
- ratedata?: Record<string, any>,
+ doc_id: DocumentId,
+ history: DeltaState[],
): Promise<void>
{
- const delta = deltas.shift();
-
- if ( !delta )
+ if ( history.length === 0 )
{
return Promise.resolve();
}
+ const [ delta, bucket, ratedata ] = history[ 0 ];
+
const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
this._emitter.emit( 'delta-process-start', delta_uid );
- if ( delta.type == this.DELTA_DATA )
- {
- bucket = applyDelta( bucket, delta.data );
- }
- else
- {
- ratedata = applyDelta( ratedata, delta.data );
- }
-
return this._publisher.publish( doc_id, delta, bucket, ratedata )
.then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
.then( _ => this._processNextDelta(
doc_id,
- deltas,
- bucket,
- ratedata
+ history.slice( 1 ),
) );
}