Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'test/system/DeltaProcessorTest.ts')
-rw-r--r--test/system/DeltaProcessorTest.ts312
1 files changed, 265 insertions, 47 deletions
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index d0cd8f8..3998d68 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -22,7 +22,8 @@
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
-import { DeltaType } from "../../src/bucket/delta";
+import { DeltaType, DeltaDocument } from "../../src/bucket/delta";
+import { DocumentId } from '../../src/document/Document';
import { EventEmitter } from 'events';
import { expect, use as chai_use } from 'chai';
@@ -308,11 +309,12 @@ describe( 'system.DeltaProcessor', () =>
}[]>[
{
label: 'No deltas are processed',
- docs: [
+ given: [
{
id: 123,
lastUpdate: 123123123,
- bucket: {},
+ data: {},
+ ratedata: {},
rdelta: {},
},
],
@@ -324,7 +326,8 @@ describe( 'system.DeltaProcessor', () =>
{
id: 123,
lastUpdate: 123123123,
- bucket: { foo: [ 'start_bar' ] },
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
rdelta: {
data: [
{
@@ -341,14 +344,16 @@ describe( 'system.DeltaProcessor', () =>
],
expected: [
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'second_bar' ] },
+ bucket: { foo: [ 'second_bar' ] },
+ ratedata: {},
},
],
},
@@ -358,17 +363,18 @@ describe( 'system.DeltaProcessor', () =>
{
id: 123,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_123' ] },
+ ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_123' ] },
timestamp: 234,
},
],
ratedata: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_123' ] },
timestamp: 123,
},
],
@@ -377,19 +383,20 @@ describe( 'system.DeltaProcessor', () =>
{
id: 234,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_234' ] },
+ ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_234' ] },
timestamp: 123,
},
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_234' ] },
timestamp: 234,
},
{
- data: { foo: [ 'third_bar' ] },
+ data: { foo: [ 'third_bar_234' ] },
timestamp: 345,
},
],
@@ -398,15 +405,16 @@ describe( 'system.DeltaProcessor', () =>
{
id: 345,
lastUpdate: 123123123,
- bucket: { foo: 'start_bar' },
+ data: { foo: [ 'start_bar_345' ] },
+ ratedata: {},
rdelta: {
ratedata: [
{
- data: { foo: [ 'first_bar' ] },
+ data: { foo: [ 'first_bar_345' ] },
timestamp: 123,
},
{
- data: { foo: [ 'second_bar' ] },
+ data: { foo: [ 'second_bar_345' ] },
timestamp: 234,
},
],
@@ -415,60 +423,73 @@ describe( 'system.DeltaProcessor', () =>
],
expected: [
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'first_bar_123' ] },
+ bucket: { foo: [ 'start_bar_123' ] },
+ ratedata: { foo: [ 'first_bar_123' ] },
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 123,
+ doc_id: 123,
+ delta: { foo: [ 'second_bar_123' ] },
+ bucket: { foo: [ 'second_bar_123' ] },
+ ratedata: { foo: [ 'first_bar_123' ] },
},
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'first_bar_234' ] },
+ bucket: { foo: [ 'first_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'second_bar_234' ] },
+ bucket: { foo: [ 'second_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'third_bar' ] },
- bucket: { foo: [ 'third_bar' ] },
- doc_id: 234,
+ doc_id: 234,
+ delta: { foo: [ 'third_bar_234' ] },
+ bucket: { foo: [ 'third_bar_234' ] },
+ ratedata: {},
},
{
- delta: { foo: [ 'first_bar' ] },
- bucket: { foo: [ 'first_bar' ] },
- doc_id: 345,
+ doc_id: 345,
+ delta: { foo: [ 'first_bar_345' ] },
+ bucket: { foo: [ 'start_bar_345' ] },
+ ratedata: { foo: [ 'first_bar_345' ] },
},
{
- delta: { foo: [ 'second_bar' ] },
- bucket: { foo: [ 'second_bar' ] },
- doc_id: 345,
+ doc_id: 345,
+ delta: { foo: [ 'second_bar_345' ] },
+ bucket: { foo: [ 'start_bar_345' ] },
+ ratedata: { foo: [ 'second_bar_345' ] },
},
],
},
- ] ).forEach( ( { given, expected, label } ) => it( label, () =>
+ ] ).forEach( ( { label, given, expected } ) => it( label, () =>
{
let published: any = [];
const dao = createMockDeltaDao();
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
- dao.getUnprocessedDocuments = (): Promise<Record<string, any>[]> =>
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
{
return Promise.resolve( given );
}
- publisher.publish = ( delta, bucket, doc_id ): Promise<void> =>
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
{
published.push( {
- delta: delta.data,
- bucket: bucket,
- doc_id: doc_id,
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
} );
return Promise.resolve();
@@ -479,6 +500,203 @@ describe( 'system.DeltaProcessor', () =>
.then( _ => expect( published ).to.deep.equal( expected ) );
} ) );
} );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Marks document in error state and continues', () =>
+ {
+ let published: any = [];
+ let error_flag_set = false;
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ const expected_published = [
+ {
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ },
+ {
+ doc_id: 234,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ }
+ ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ {
+ error_flag_set = true;
+ return Promise.resolve();
+ }
+
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect( new Sut( dao, publisher, emitter ).process() )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( error_flag_set ).to.be.true;
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
+
+
+ describe( 'Error handling', () =>
+ {
+ it( 'Failure to set document error state further processing', () =>
+ {
+ let published: any = [];
+ let caught_error = '';
+ const dao = createMockDeltaDao();
+ const publisher = createMockDeltaPublisher();
+ const emitter = new EventEmitter();
+ const doc = <DeltaDocument[]>[ {
+ id: <DocumentId>123,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ },
+ {
+ id: <DocumentId>234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
+ data: [
+ {
+ data: { foo: [ 'first_bar' ] },
+ timestamp: <UnixTimestamp>123123,
+ type: 'data',
+ }
+ ],
+ ratedata: [],
+ },
+ } ];
+
+ // Only one is published
+ const expected_published = [ {
+ doc_id: 123,
+ delta: { foo: [ 'first_bar' ] },
+ bucket: { foo: [ 'first_bar' ] },
+ ratedata: {},
+ } ];
+
+ const expected_error = 'Uh oh';
+
+ dao.getUnprocessedDocuments = (): Promise<DeltaDocument[]> =>
+ Promise.resolve( doc );
+
+ dao.markDocumentAsProcessed = ( _doc_id, _ts ): Promise<void> =>
+ Promise.reject( new Error( 'Couldn\'t mark document' ) );
+
+ dao.setErrorFlag = (): Promise<void> =>
+ Promise.reject( new Error( expected_error ) );
+
+ publisher.publish = (
+ doc_id,
+ delta,
+ bucket,
+ ratedata,
+ ): Promise<void> =>
+ {
+ published.push( {
+ doc_id: doc_id,
+ delta: delta.data,
+ bucket: bucket,
+ ratedata: ratedata,
+ } );
+
+ return Promise.resolve();
+ }
+
+ // Prevent node from converting an error event into an error
+ emitter.on( 'error', () => {} );
+
+ return expect(
+ new Sut( dao, publisher, emitter ).process()
+ .catch( e => { caught_error = e.message } )
+ )
+ .to.eventually.deep.equal( undefined )
+ .then( _ =>
+ {
+ expect( caught_error ).to.equal( expected_error );
+ expect( published ).to.deep.equal( expected_published );
+ } );
+ } );
+ } );
} );