Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/system/DeltaProcessor.ts102
-rw-r--r--test/system/DeltaProcessorTest.ts315
2 files changed, 311 insertions, 106 deletions
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index d68615f..0e5201a 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -31,9 +31,18 @@ import {
ReverseDelta,
} from "../bucket/delta";
+/** Deltas and state of data prior to their application */
+type DeltaState = [
+ Delta<any>,
+ Record<string, any>,
+ Record<string, any>,
+];
+
/**
* Process deltas for a quote and publish to a queue
+ *
+ * TODO: Decouple from applyDelta
*/
export class DeltaProcessor
{
@@ -87,10 +96,12 @@ export class DeltaProcessor
const deltas = this._getTimestampSortedDeltas( doc );
const doc_id = doc.id;
const bucket = doc.data;
- const ratedata = doc.ratedata;
+ const ratedata = doc.ratedata || {};
const last_updated_ts = doc.lastUpdate;
- return this._processNextDelta( doc_id, deltas, bucket, ratedata )
+ const history = this._applyDeltas( deltas, bucket, ratedata );
+
+ return this._processNextDelta( doc_id, history )
.then( _ =>
this._dao.markDocumentAsProcessed( doc_id, last_updated_ts )
)
@@ -106,41 +117,90 @@ export class DeltaProcessor
}
+ /**
+ * Produce states of buckets at each point in history
+ *
+ * For bucket data, each tuple will contain the state of the bucket
+ * prior to the corresponding delta having been applied. For rate data,
+ * the tuple will also contain the state of the bucket at the point of
+ * rating.
+ *
+ * @param deltas - deltas to apply
+ * @param bucket - current state of bucket prior to deltas
+ * @param ratedata - current state of ratedata prior to deltas
+ *
+ * @return deltas paired with state prior to its application
+ */
+ private _applyDeltas(
+ deltas: Delta<any>[],
+ bucket: Record<string, any>,
+ ratedata: Record<string, any>,
+ ): DeltaState[]
+ {
+ const pairs: DeltaState[] = [];
+
+ let bucket_state = bucket;
+ let ratedata_state = ratedata;
+ let i = deltas.length;
+
+ while ( i-- )
+ {
+ let delta = deltas[ i ];
+
+ pairs[ i ] = [
+ delta,
+ bucket_state,
+ ( delta.type === this.DELTA_RATEDATA ) ? ratedata_state : {},
+ ];
+
+ // Don't apply the final delta, since we won't use it
+ if ( i === 0 )
+ {
+ break;
+ }
+
+ if ( delta.type === this.DELTA_DATA )
+ {
+ bucket_state = applyDelta(
+ Object.create( bucket_state ),
+ deltas[ i ].data,
+ );
+ }
+ else
+ {
+ ratedata_state = applyDelta(
+ Object.create( ratedata_state ),
+ deltas[ i ].data,
+ );
+ }
+ }
+
+ return pairs;
+ }
+
+
private _processNextDelta(
- doc_id: DocumentId,
- deltas: Delta<any>[],
- bucket: Record<string, any>,
- ratedata?: Record<string, any>,
+ doc_id: DocumentId,
+ history: DeltaState[],
): Promise<void>
{
- const delta = deltas.shift();
-
- if ( !delta )
+ if ( history.length === 0 )
{
return Promise.resolve();
}
+ const [ delta, bucket, ratedata ] = history[ 0 ];
+
const delta_uid = doc_id + '_' + delta.timestamp + '_' + delta.type;
this._emitter.emit( 'delta-process-start', delta_uid );
- if ( delta.type == this.DELTA_DATA )
- {
- bucket = applyDelta( bucket, delta.data );
- }
- else
- {
- ratedata = applyDelta( ratedata, delta.data );
- }
-
return this._publisher.publish( doc_id, delta, bucket, ratedata )
.then( _ => this._dao.advanceDeltaIndex( doc_id, delta.type ) )
.then( _ => this._emitter.emit( 'delta-process-end', delta_uid ) )
.then( _ => this._processNextDelta(
doc_id,
- deltas,
- bucket,
- ratedata
+ history.slice( 1 ),
) );
}
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index 0b7c35d..2cd68e2 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () =>
expected: any
}[]>[
{
- label: 'No deltas are processed',
+ label: "No deltas are processed",
given: [
{
id: 123,
@@ -52,168 +52,313 @@ describe( 'system.DeltaProcessor', () =>
],
expected: [],
},
+
+ // when quote is initialized: { foo: [ "" ], state: [ "a" ] }
{
- label: 'Publishes deltas in order',
+ label: "Publishes deltas in order",
+
given: [
{
id: 123,
lastUpdate: 123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
+
+ data: {
+ foo: [ "third" ],
+ state: [ "a", "b", "c", "d" ],
+ },
+
+ ratedata: {
+ prem: [ "rate_second" ],
+ state: [ "i", "ii", "iii" ],
+ },
+
rdelta: {
data: [
{
- data: { foo: [ 'first_bar' ] },
- timestamp: 123123,
+ timestamp: 1,
+ data: {
+ foo: [ "" ],
+ state: [ undefined, null ],
+ },
+ },
+ {
+ timestamp: 3,
+ data: {
+ foo: [ "first" ],
+ state: [ undefined, undefined, null ],
+ },
},
{
- data: { foo: [ 'second_bar' ] },
- timestamp: 234123,
+ timestamp: 5,
+ data: {
+ foo: [ "second" ],
+ state: [ undefined, undefined, undefined, null ],
+ },
+ },
+ ],
+
+ ratedata: [
+ {
+ timestamp: 2,
+ data: {
+ prem: [ "" ],
+ state: [ undefined, null ],
+ },
+ },
+ {
+ timestamp: 4,
+ data: {
+ prem: [ "rate_first" ],
+ state: [ undefined, undefined, null ],
+ },
},
],
},
},
],
+
expected: [
+ // bucket
{
doc_id: 123,
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
+ rdelta: {
+ foo: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ "first" ],
+ state: [ "a", "b" ],
+ },
ratedata: {},
},
+
+ // rate
{
doc_id: 123,
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
+ rdelta: {
+ prem: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ "first" ],
+ state: [ "a", "b" ],
+ },
+ ratedata: {
+ prem: [ "rate_first" ],
+ state: [ "i", "ii" ],
+ },
+ },
+
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ "first" ],
+ state: [ undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ "second" ],
+ state: [ "a", "b", "c" ],
+ },
+ ratedata: {},
+ },
+
+ // rate
+ {
+ doc_id: 123,
+ rdelta: {
+ prem: [ "rate_first" ],
+ state: [ undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ "second" ],
+ state: [ "a", "b", "c" ],
+ },
+ ratedata: {
+ prem: [ "rate_second" ],
+ state: [ "i", "ii", "iii" ],
+ },
+ },
+
+ // bucket
+ {
+ doc_id: 123,
+ rdelta: {
+ foo: [ "second" ],
+ state: [ undefined, undefined, undefined, null ],
+ },
+ bucket: {
+ foo: [ "third" ],
+ state: [ "a", "b", "c", "d" ],
+ },
ratedata: {},
},
],
},
+
{
- label: 'Publishes deltas in order for multiple documents',
+ label: "Publishes deltas in order for multiple documents",
+
given: [
{
id: 123,
lastUpdate: 123123123,
- data: { foo: [ 'start_bar_123' ] },
- ratedata: {},
+
+ data: {
+ foo: [ "first" ],
+ state: [ "a", "b" ],
+ },
+
+ ratedata: {
+ prem: [ "rate_first" ],
+ state: [ "i", "ii" ],
+ },
+
rdelta: {
data: [
{
- data: { foo: [ 'second_bar_123' ] },
- timestamp: 234,
+ timestamp: 1,
+ data: {
+ foo: [ "" ],
+ state: [ undefined, null ],
+ },
},
],
+
ratedata: [
{
- data: { foo: [ 'first_bar_123' ] },
- timestamp: 123,
+ timestamp: 4,
+ data: {
+ prem: [ "" ],
+ state: [ undefined, null ],
+ },
},
],
},
},
+
+ // timestamps of this document are sandwiched between
+ // the above to make sure documents are processed
+ // independently (without splicing their deltas together)
{
id: 234,
- lastUpdate: 123123123,
- data: { foo: [ 'start_bar_234' ] },
- ratedata: {},
+ lastUpdate: 121212123,
+
+ data: {
+ foo2: [ "first" ],
+ state: [ "a", "b" ],
+ },
+
+ ratedata: {
+ prem2: [ "rate_first" ],
+ state: [ "i", "ii" ],
+ },
+
rdelta: {
data: [
{
- data: { foo: [ 'first_bar_234' ] },
- timestamp: 123,
- },
- {
- data: { foo: [ 'second_bar_234' ] },
- timestamp: 234,
- },
- {
- data: { foo: [ 'third_bar_234' ] },
- timestamp: 345,
+ timestamp: 2,
+ data: {
+ foo2: [ "" ],
+ state: [ undefined, null ],
+ },
},
],
- },
- },
- {
- id: 345,
- lastUpdate: 123123123,
- data: { foo: [ 'start_bar_345' ] },
- ratedata: {},
- rdelta: {
+
ratedata: [
{
- data: { foo: [ 'first_bar_345' ] },
- timestamp: 123,
- },
- {
- data: { foo: [ 'second_bar_345' ] },
- timestamp: 234,
+ timestamp: 3,
+ data: {
+ prem2: [ "" ],
+ state: [ undefined, null ],
+ },
},
],
},
},
],
+
expected: [
+ // bucket
{
doc_id: 123,
- delta: { foo: [ 'first_bar_123' ] },
- bucket: { foo: [ 'start_bar_123' ] },
- ratedata: { foo: [ 'first_bar_123' ] },
+ rdelta: {
+ foo: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ "first" ],
+ state: [ "a", "b" ],
+ },
+ ratedata: {},
},
+
+ // rate
{
doc_id: 123,
- delta: { foo: [ 'second_bar_123' ] },
- bucket: { foo: [ 'second_bar_123' ] },
- ratedata: { foo: [ 'first_bar_123' ] },
- },
- {
- doc_id: 234,
- delta: { foo: [ 'first_bar_234' ] },
- bucket: { foo: [ 'first_bar_234' ] },
- ratedata: {},
+ rdelta: {
+ prem: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo: [ "first" ],
+ state: [ "a", "b" ],
+ },
+ ratedata: {
+ prem: [ "rate_first" ],
+ state: [ "i", "ii" ],
+ },
},
+
+ // bucket
{
doc_id: 234,
- delta: { foo: [ 'second_bar_234' ] },
- bucket: { foo: [ 'second_bar_234' ] },
+ rdelta: {
+ foo2: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo2: [ "first" ],
+ state: [ "a", "b" ],
+ },
ratedata: {},
},
+
+ // rate
{
doc_id: 234,
- delta: { foo: [ 'third_bar_234' ] },
- bucket: { foo: [ 'third_bar_234' ] },
- ratedata: {},
- },
- {
- doc_id: 345,
- delta: { foo: [ 'first_bar_345' ] },
- bucket: { foo: [ 'start_bar_345' ] },
- ratedata: { foo: [ 'first_bar_345' ] },
- },
- {
- doc_id: 345,
- delta: { foo: [ 'second_bar_345' ] },
- bucket: { foo: [ 'start_bar_345' ] },
- ratedata: { foo: [ 'second_bar_345' ] },
+ rdelta: {
+ prem2: [ "" ],
+ state: [ undefined, null ],
+ },
+ bucket: {
+ foo2: [ "first" ],
+ state: [ "a", "b" ],
+ },
+ ratedata: {
+ prem2: [ "rate_first" ],
+ state: [ "i", "ii" ],
+ },
},
],
},
+
{
- label: 'trims delta array based on index',
+ label: "trims delta array based on index",
given: [
{
id: 111,
lastUpdate: 123123123,
- data: { foo: [ 'bar' ] },
+ data: { foo: [ "second" ] },
ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ "" ] },
timestamp: 123,
},
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ "first" ] },
timestamp: 234,
},
],
@@ -226,8 +371,8 @@ describe( 'system.DeltaProcessor', () =>
expected: [
{
doc_id: 111,
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
+ rdelta: { foo: [ "first" ] },
+ bucket: { foo: [ "second" ] },
ratedata: {}
},
],
@@ -253,7 +398,7 @@ describe( 'system.DeltaProcessor', () =>
{
published.push( {
doc_id: doc_id,
- delta: delta.data,
+ rdelta: delta.data,
bucket: bucket,
ratedata: ratedata,
} );
@@ -314,13 +459,13 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
ratedata: {},
},
{
doc_id: 234,
delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
ratedata: {},
}
];
@@ -416,7 +561,7 @@ describe( 'system.DeltaProcessor', () =>
const expected_published = [ {
doc_id: 123,
delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'start_bar' ] },
ratedata: {},
} ];