Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAustin Schaffer <austin.schaffer@ryansg.com>2019-12-10 17:24:19 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-12-12 10:27:09 -0500
commite3dded760d18013e4f91248b0d12ec6c6a9dc368 (patch)
tree71b93936bbd19fe6d70800bbce4299fdb27d3b94 /test
parent9eb1f3afca4228fa9b69dde10336379094fe0bf9 (diff)
downloadliza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.tar.gz
liza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.tar.bz2
liza-e3dded760d18013e4f91248b0d12ec6c6a9dc368.zip
[DEV-5312] Define document meta data and restructure message writer.
Add better tests for message writer
Diffstat (limited to 'test')
-rw-r--r--test/system/DeltaProcessorTest.ts226
-rw-r--r--test/system/DeltaPublisherTest.ts443
-rw-r--r--test/system/EventMediatorTest.ts4
-rw-r--r--test/system/MetricsCollectorTest.ts2
-rw-r--r--test/system/V1MessageWriterTest.ts532
-rw-r--r--test/system/amqp/AmqpConnectionTest.ts49
6 files changed, 779 insertions, 477 deletions
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts
index 2cd68e2..1587bb2 100644
--- a/test/system/DeltaProcessorTest.ts
+++ b/test/system/DeltaProcessorTest.ts
@@ -22,7 +22,7 @@
import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor';
import { AmqpPublisher } from '../../src/system/AmqpPublisher';
import { DeltaDao } from '../../src/system/db/DeltaDao';
-import { DeltaDocument } from "../../src/bucket/delta";
+import { DeltaDocument } from '../../src/bucket/delta';
import { DocumentId } from '../../src/document/Document';
import { EventEmitter } from 'events';
@@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () =>
expected: any
}[]>[
{
- label: "No deltas are processed",
+ label: 'No deltas are processed',
given: [
{
id: 123,
@@ -53,9 +53,9 @@ describe( 'system.DeltaProcessor', () =>
expected: [],
},
- // when quote is initialized: { foo: [ "" ], state: [ "a" ] }
+ // when quote is initialized: { foo: [ '' ], state: [ 'a' ] }
{
- label: "Publishes deltas in order",
+ label: 'Publishes deltas in order',
given: [
{
@@ -63,13 +63,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
- foo: [ "third" ],
- state: [ "a", "b", "c", "d" ],
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {
- prem: [ "rate_second" ],
- state: [ "i", "ii", "iii" ],
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
},
rdelta: {
@@ -77,21 +77,21 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 3,
data: {
- foo: [ "first" ],
+ foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
},
{
timestamp: 5,
data: {
- foo: [ "second" ],
+ foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
},
@@ -101,14 +101,14 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
},
{
timestamp: 4,
data: {
- prem: [ "rate_first" ],
+ prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
},
@@ -122,12 +122,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -136,16 +136,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
@@ -153,12 +153,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "first" ],
+ foo: [ 'first' ],
state: [ undefined, undefined, null ],
},
bucket: {
- foo: [ "second" ],
- state: [ "a", "b", "c" ],
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
},
ratedata: {},
},
@@ -167,16 +167,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "rate_first" ],
+ prem: [ 'rate_first' ],
state: [ undefined, undefined, null ],
},
bucket: {
- foo: [ "second" ],
- state: [ "a", "b", "c" ],
+ foo: [ 'second' ],
+ state: [ 'a', 'b', 'c' ],
},
ratedata: {
- prem: [ "rate_second" ],
- state: [ "i", "ii", "iii" ],
+ prem: [ 'rate_second' ],
+ state: [ 'i', 'ii', 'iii' ],
},
},
@@ -184,12 +184,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "second" ],
+ foo: [ 'second' ],
state: [ undefined, undefined, undefined, null ],
},
bucket: {
- foo: [ "third" ],
- state: [ "a", "b", "c", "d" ],
+ foo: [ 'third' ],
+ state: [ 'a', 'b', 'c', 'd' ],
},
ratedata: {},
},
@@ -197,7 +197,7 @@ describe( 'system.DeltaProcessor', () =>
},
{
- label: "Publishes deltas in order for multiple documents",
+ label: 'Publishes deltas in order for multiple documents',
given: [
{
@@ -205,13 +205,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 123123123,
data: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
rdelta: {
@@ -219,7 +219,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 1,
data: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
},
@@ -229,7 +229,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 4,
data: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
},
@@ -245,13 +245,13 @@ describe( 'system.DeltaProcessor', () =>
lastUpdate: 121212123,
data: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem2: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
rdelta: {
@@ -259,7 +259,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 2,
data: {
- foo2: [ "" ],
+ foo2: [ '' ],
state: [ undefined, null ],
},
},
@@ -269,7 +269,7 @@ describe( 'system.DeltaProcessor', () =>
{
timestamp: 3,
data: {
- prem2: [ "" ],
+ prem2: [ '' ],
state: [ undefined, null ],
},
},
@@ -283,12 +283,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- foo: [ "" ],
+ foo: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -297,16 +297,16 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 123,
rdelta: {
- prem: [ "" ],
+ prem: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo: [ "first" ],
- state: [ "a", "b" ],
+ foo: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
@@ -314,12 +314,12 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
- foo2: [ "" ],
+ foo2: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {},
},
@@ -328,37 +328,37 @@ describe( 'system.DeltaProcessor', () =>
{
doc_id: 234,
rdelta: {
- prem2: [ "" ],
+ prem2: [ '' ],
state: [ undefined, null ],
},
bucket: {
- foo2: [ "first" ],
- state: [ "a", "b" ],
+ foo2: [ 'first' ],
+ state: [ 'a', 'b' ],
},
ratedata: {
- prem2: [ "rate_first" ],
- state: [ "i", "ii" ],
+ prem2: [ 'rate_first' ],
+ state: [ 'i', 'ii' ],
},
},
],
},
{
- label: "trims delta array based on index",
+ label: 'trims delta array based on index',
given: [
{
id: 111,
lastUpdate: 123123123,
- data: { foo: [ "second" ] },
+ data: { foo: [ 'second' ] },
ratedata: {},
rdelta: {
data: [
{
- data: { foo: [ "" ] },
+ data: { foo: [ '' ] },
timestamp: 123,
},
{
- data: { foo: [ "first" ] },
+ data: { foo: [ 'first' ] },
timestamp: 234,
},
],
@@ -371,8 +371,8 @@ describe( 'system.DeltaProcessor', () =>
expected: [
{
doc_id: 111,
- rdelta: { foo: [ "first" ] },
- bucket: { foo: [ "second" ] },
+ rdelta: { foo: [ 'first' ] },
+ bucket: { foo: [ 'second' ] },
ratedata: {}
},
],
@@ -390,14 +390,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ doc_id: meta.id,
rdelta: delta.data,
bucket: bucket,
ratedata: ratedata,
@@ -422,12 +422,19 @@ describe( 'system.DeltaProcessor', () =>
const dao = createMockDeltaDao();
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
+ const entity_num = 'Some Agency';
+ const entity_id = 4321;
+ const lastUpdate = <UnixTimestamp>123123123;
+ const createdData = <UnixTimestamp>234234234;
const doc = <DeltaDocument[]>[ {
- id: <DocumentId>123,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>123,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: lastUpdate,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -439,11 +446,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
- id: <DocumentId>234,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>234,
+ agentName: entity_num,
+ agentEntityId: entity_id,
+ startDate: createdData,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -457,13 +467,25 @@ describe( 'system.DeltaProcessor', () =>
const expected_published = [
{
- doc_id: 123,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
},
{
- doc_id: 234,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 234,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@@ -485,14 +507,14 @@ describe( 'system.DeltaProcessor', () =>
}
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ meta: meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,
@@ -525,11 +547,14 @@ describe( 'system.DeltaProcessor', () =>
const publisher = createMockDeltaPublisher();
const emitter = new EventEmitter();
const doc = <DeltaDocument[]>[ {
- id: <DocumentId>123,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>123,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -541,11 +566,14 @@ describe( 'system.DeltaProcessor', () =>
},
},
{
- id: <DocumentId>234,
- lastUpdate: <UnixTimestamp>123123123,
- data: { foo: [ 'start_bar' ] },
- ratedata: {},
- rdelta: {
+ id: <DocumentId>234,
+ agentName: 'Some Agency',
+ agentEntityId: 4321,
+ startDate: <UnixTimestamp>234234234,
+ lastUpdate: <UnixTimestamp>123123123,
+ data: { foo: [ 'start_bar' ] },
+ ratedata: {},
+ rdelta: {
data: [
{
data: { foo: [ 'first_bar' ] },
@@ -559,7 +587,13 @@ describe( 'system.DeltaProcessor', () =>
// Only one is published
const expected_published = [ {
- doc_id: 123,
+ meta: {
+ entity_id: 4321,
+ entity_name: 'Some Agency',
+ id: 123,
+ lastUpdate: 123123123,
+ startDate: 234234234,
+ },
delta: { foo: [ 'first_bar' ] },
bucket: { foo: [ 'start_bar' ] },
ratedata: {},
@@ -577,14 +611,14 @@ describe( 'system.DeltaProcessor', () =>
Promise.reject( new Error( expected_error ) );
publisher.publish = (
- doc_id,
+ meta,
delta,
bucket,
ratedata,
): Promise<void> =>
{
published.push( {
- doc_id: doc_id,
+ meta,
delta: delta.data,
bucket: bucket,
ratedata: ratedata,
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
index 314c7da..f352b0c 100644
--- a/test/system/DeltaPublisherTest.ts
+++ b/test/system/DeltaPublisherTest.ts
@@ -22,20 +22,17 @@
import { AmqpConnection } from '../../src/system/amqp/AmqpConnection';
import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher';
-import { DocumentId } from '../../src/document/Document';
-import { Duplex } from 'stream';
-import { EventEmitter } from "events";
+import { DocumentId, DocumentMeta } from '../../src/document/Document';
+import { EventEmitter } from 'events';
import { hasContext } from '../../src/error/ContextError';
import { AmqpError } from '../../src/error/AmqpError';
import { Channel } from 'amqplib';
-import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
+import { MessageWriter } from '../../src/system/MessageWriter';
-import { AvroSchema } from "avro-js";
import { expect, use as chai_use } from 'chai';
chai_use( require( 'chai-as-promised' ) );
-const sinon = require( 'sinon' );
describe( 'server.DeltaPublisher', () =>
{
@@ -49,6 +46,15 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
conn.getAmqpChannel = () =>
{
return <Channel>{
@@ -63,23 +69,10 @@ describe( 'server.DeltaPublisher', () =>
};
};
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const sut = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema,
- );
+ const sut = new Sut( emitter, ts_ctr, conn, writer );
return expect(
- sut.publish( <DocumentId>123, delta, bucket, ratedata )
+ sut.publish( meta, delta, bucket, ratedata )
).to.eventually.deep.equal( undefined )
.then( _ =>
{
@@ -119,29 +112,25 @@ describe( 'server.DeltaPublisher', () =>
const ratedata = createMockBucketData();
const emitter = new EventEmitter();
const conn = createMockAmqpConnection();
- const doc_id = <DocumentId>123;
+ const writer = createMockWriter();
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
const expected = {
- doc_id: doc_id,
+ doc_id: meta.id,
delta_type: delta.type,
delta_ts: delta.timestamp
}
conn.getAmqpChannel = getChannelF;
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const result = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema,
- ).publish( doc_id, delta, bucket, ratedata );
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith(
@@ -158,276 +147,47 @@ describe( 'server.DeltaPublisher', () =>
} )
] );
} ) );
- } );
-
- describe( '#avroEncode parses', () =>
- {
- [
- {
- label: 'Null value',
- valid: true,
- delta_data: { foo: null },
- },
- {
- label: 'Null array',
- valid: true,
- delta_data: { foo: { "array": [ null ] } },
- },
- {
- label: 'Boolean value',
- valid: true,
- delta_data: { foo: { "array": [
- { "boolean": true },
- ] } },
- },
- {
- label: 'Simple string',
- valid: true,
- delta_data: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] } },
- },
- {
- label: 'Simple int',
- valid: true,
- delta_data: { foo: { "array": [
- { "double": 123 },
- ] } },
- },
- {
- label: 'Nested array',
- valid: true,
- delta_data: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- ] },
- ] } },
- },
- {
- label: 'Array with nulls',
- valid: true,
- delta_data: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- null,
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- valid: true,
- delta_data: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123321 },
- null,
- ] }
- ] } },
- },
- {
- label: 'Non-array',
- valid: false,
- delta_data: { foo: 'bar' },
- },
- {
- label: 'Map objects',
- valid: true,
- delta_data: { "foo": { "array": [
- { "map": {
- "bar": { "map": {
- "baz": { "double": 1572903485000 },
- } }
- } }
- ] } },
- }
- ].forEach( ( { label, delta_data, valid } ) =>
- {
- it( label, () =>
- {
- const emitter = createMockEventEmitter();
- const conn = createMockAmqpConnection();
- const data = createMockData( delta_data );
-
- const stub_schema = <AvroSchema>(<unknown>{
- isValid()
- {
- // TODO: test me
- },
- } );
-
- const sut = new Sut(
- emitter,
- ts_ctr,
- createMockEncoderCtor( stub_schema ),
- conn,
- stub_schema
- );
-
- sut.avroEncode( data )
- .then( b =>
- {
- expect( typeof(b) ).to.equal( 'object' );
- expect( valid ).to.be.true;
- } )
- .catch( _ =>
- {
- expect( valid ).to.be.false;
- } );
- } );
- } );
- } );
- describe( '#setDataTypes annotates', () =>
- {
- [
- {
- label: 'Null',
- delta_data: null,
- expected: null,
- },
- {
- label: 'Null Value',
- delta_data: { foo: null },
- expected: { foo: null },
- },
- {
- label: 'Boolean Value',
- delta_data: { foo: [ true ] },
- expected: { foo: { "array": [
- { "boolean": true },
- ] } },
- },
- {
- label: 'Simple string',
- delta_data: { foo: [
- 'bar',
- 'baz',
- ] },
- expected: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] } },
- },
- {
- label: 'Simple int',
- delta_data: { foo: [
- 123
- ] },
- expected: { foo: { "array": [
- { "double": 123 },
- ] } },
- },
- {
- label: 'Nested array',
- delta_data: { foo: [
- [
- 'bar',
- 'baz',
- ]
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- ] },
- ] } },
- },
- {
- label: 'Double nested array',
- delta_data: { foo: [
- [
- [
- 'bar',
- 123,
- null
- ],
- ],
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123 },
- null,
- ] },
- ] },
- ] } },
- },
- {
- label: 'Array with nulls',
- delta_data: { foo: [
- 'bar',
- 'baz',
- null
- ] },
- expected: { foo: { "array": [
- { "string": 'bar' },
- { "string": 'baz' },
- null
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- delta_data: { foo: [
- [
- 'bar',
- 123321,
- null,
- ]
- ] },
- expected: { foo: { "array": [
- { "array": [
- { "string": 'bar' },
- { "double": 123321 },
- null,
- ] },
- ] } },
- },
- {
- label: 'Nested Array with mixed values',
- delta_data: { foo: [
- {
- "bar": {
- "wer": 'qaz',
- "qwe": 1572903485000,
- "asd": true,
- "zxc": null,
- },
- },
- ] },
- expected: { "foo": { "array": [
- { "map": {
- "bar": { "map": {
- "wer": { "string": 'qaz' },
- "qwe": { "double": 1572903485000 },
- "asd": { "boolean": true },
- "zxc": null,
- } },
- } },
- ] } },
- },
- ].forEach( ( { label, delta_data, expected } ) =>
+ it( 'writer#write rejects', () =>
{
- it( label, () =>
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const emitter = new EventEmitter();
+ const conn = createMockAmqpConnection();
+ const writer = createMockWriter();
+ const error = new Error( 'Bad thing happened' );
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ writer.write = (
+ _: any,
+ __: any,
+ ___: any,
+ ____: any,
+ _____: any
+ ): Promise<Buffer> =>
{
- const encoded = 'FooBar';
- const emitter = createMockEventEmitter();
- const conn = createMockAmqpConnection();
- const avroEncoderCtr = createMockEncoder( encoded );
- const stub_schema = <AvroSchema>{};
- const sut = new Sut(
- emitter,
- ts_ctr,
- avroEncoderCtr,
- conn,
- stub_schema,
- );
- const actual = sut.setDataTypes( delta_data );
+ return Promise.reject( error );
+ };
- expect( actual ).to.deep.equal( expected );
- } );
- } );
+ const result = new Sut( emitter, ts_ctr, conn, writer )
+ .publish( meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ return expect( e ).to.deep.equal( error );
+ } )
+ ] );
+ } )
} );
} );
@@ -438,26 +198,6 @@ function ts_ctr(): UnixTimestamp
}
-function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
-{
- return ( _schema: AvroSchema ) =>
- {
- const mock = sinon.mock( Duplex );
-
- mock.on = ( _: string, __: any ) => {};
- mock.end = ( _: any ) => { return mock_encoded_data; };
-
- return mock;
- };
-}
-
-
-function createMockEventEmitter(): EventEmitter
-{
- return <EventEmitter>{};
-}
-
-
function createMockAmqpConnection(): AmqpConnection
{
return <AmqpConnection>{
@@ -467,39 +207,6 @@ function createMockAmqpConnection(): AmqpConnection
}
-function createMockData( delta_data: any ): any
-{
-
- return {
- event: {
- id: 'RATE',
- ts: 1573856916,
- actor: 'SERVER',
- step: null,
- },
- document: {
- id: 123123,
- created: 1573856916,
- modified: 1573856916,
- top_visited_step: '2',
- },
- data: null,
- ratedata: null,
- delta: {
- Data: {
- bucket: delta_data,
- },
- },
- program: {
- Program: {
- id: 'quote_server',
- version: 'dadaddwafdwa',
- },
- },
- };
-}
-
-
function createMockBucketData(): Record<string, any>
{
return {
@@ -518,26 +225,12 @@ function createMockDelta(): Delta<any>
}
-function createMockEncoderCtor( stub_schema: AvroSchema ):
- ( schema: AvroSchema ) => Duplex
+function createMockWriter(): MessageWriter
{
- const events = <Record<string, () => void>>{};
-
- const mock_duplex = <Duplex>(<unknown>{
- on( event_name: string, callback: () => void )
+ return <MessageWriter>{
+ write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer>
{
- events[ event_name ] = callback;
- },
-
- end()
- {
- events.end();
- },
- } );
-
- return ( schema: AvroSchema ): Duplex =>
- {
- expect( schema ).to.equal( stub_schema );
- return mock_duplex;
+ return Promise.resolve( Buffer.from( '' ) );
+ }
};
-}
+} \ No newline at end of file
diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts
index caab191..abfbef8 100644
--- a/test/system/EventMediatorTest.ts
+++ b/test/system/EventMediatorTest.ts
@@ -62,11 +62,11 @@ describe( 'system.EventLogger captures and logs events', () =>
expect( method_called ).to.be.true;
} );
- it( 'amqp-conn-error triggers log#warning', () =>
+ it( 'amqp-conn-warn triggers log#warning', () =>
{
let method_called = false;
- const event_id = 'amqp-conn-error';
+ const event_id = 'amqp-conn-warn';
const emitter = new EventEmitter();
const log = createMockLogger();
diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts
index eafc77d..9a36584 100644
--- a/test/system/MetricsCollectorTest.ts
+++ b/test/system/MetricsCollectorTest.ts
@@ -72,7 +72,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () =>
const sut = new Sut( factory, conf, emitter, timer );
- emitter.emit( 'delta-process-error' );
+ emitter.emit( 'error' );
expect( counter_called ).to.be.true;
diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts
new file mode 100644
index 0000000..271d735
--- /dev/null
+++ b/test/system/V1MessageWriterTest.ts
@@ -0,0 +1,532 @@
+/**
+ * V1 Message Writer
+ *
+ * Copyright (C) 2010-2019 R-T Specialty, LLC.
+ *
+ * This file is part of liza.
+ *
+ * liza is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Tests for Version 1 of the avro message writer
+ */
+
+import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter';
+import { hasContext, context } from '../../src/error/ContextError';
+import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
+import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta';
+import { DocumentMeta, DocumentId } from '../../src/document/Document';
+import { Duplex } from 'stream';
+import { AvroSchema } from 'avro-js';
+
+import { expect, use as chai_use } from 'chai';
+chai_use( require( 'chai-as-promised' ) );
+
+const sinon = require( 'sinon' );
+
+describe( 'system.V1MessageWriter', () =>
+{
+ it( 'Rejects improperly formatted data', () =>
+ {
+ const delta = createMockDelta();
+ const bucket = createMockBucketData();
+ const ratedata = createMockBucketData();
+ const error = new Error( 'Oh no' );
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const meta = <DocumentMeta>{
+ id: <DocumentId>123,
+ entity_name: 'Some Agency',
+ entity_id: 234,
+ startDate: <UnixTimestamp>345,
+ lastUpdate: <UnixTimestamp>456,
+ };
+
+ const expected = {
+ invalid_paths: 'Foo',
+ invalid_data: 'Bar',
+ };
+
+ const error_context = context( error, expected );
+
+ schema.isValid = () => { throw error_context; };
+
+ const result = new Sut(
+ createMockEncoderCtor( schema ),
+ schema,
+ ).write( ts, meta, delta, bucket, ratedata );
+
+ return Promise.all( [
+ expect( result ).to.eventually.be.rejectedWith( error ),
+ result.catch( e =>
+ {
+ if ( !hasContext( e ) )
+ {
+ return expect.fail();
+ }
+
+ return expect( e.context ).to.deep.equal( expected );
+ } )
+ ] );
+ } );
+
+
+ describe( '#avroEncode parses', () =>
+ {
+ [
+ {
+ label: 'Null value',
+ valid: true,
+ delta_data: { foo: null },
+ },
+ {
+ label: 'Null array',
+ valid: true,
+ delta_data: { foo: { 'array': [ null ] } },
+ },
+ {
+ label: 'Boolean value',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null,
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ valid: true,
+ delta_data: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] }
+ ] } },
+ },
+ {
+ label: 'Non-array',
+ valid: false,
+ delta_data: { foo: 'bar' },
+ },
+ {
+ label: 'Map objects',
+ valid: true,
+ delta_data: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'baz': { 'double': 1572903485000 },
+ } }
+ } }
+ ] } },
+ }
+ ].forEach( ( { label, delta_data, valid } ) =>
+ {
+ it( label, () =>
+ {
+ const data = createMockData( delta_data );
+ const schema = createMockAvroSchema();
+
+ const sut = new Sut(
+ createMockEncoderCtor( schema ),
+ schema
+ );
+
+ sut.avroEncode( data )
+ .then( b =>
+ {
+ expect( typeof(b) ).to.equal( 'object' );
+ expect( valid ).to.be.true;
+ } )
+ .catch( _ =>
+ {
+ expect( valid ).to.be.false;
+ } );
+ } );
+ } );
+ } );
+
+
+ describe( '#setDataTypes annotates', () =>
+ {
+ [
+ {
+ label: 'Null',
+ delta_data: null,
+ expected: null,
+ },
+ {
+ label: 'Null Value',
+ delta_data: { foo: null },
+ expected: { foo: null },
+ },
+ {
+ label: 'Boolean Value',
+ delta_data: { foo: [ true ] },
+ expected: { foo: { 'array': [
+ { 'boolean': true },
+ ] } },
+ },
+ {
+ label: 'Simple string',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] } },
+ },
+ {
+ label: 'Simple int',
+ delta_data: { foo: [
+ 123
+ ] },
+ expected: { foo: { 'array': [
+ { 'double': 123 },
+ ] } },
+ },
+ {
+ label: 'Nested array',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 'baz',
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Double nested array',
+ delta_data: { foo: [
+ [
+ [
+ 'bar',
+ 123,
+ null
+ ],
+ ],
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123 },
+ null,
+ ] },
+ ] },
+ ] } },
+ },
+ {
+ label: 'Array with nulls',
+ delta_data: { foo: [
+ 'bar',
+ 'baz',
+ null
+ ] },
+ expected: { foo: { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ null
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ [
+ 'bar',
+ 123321,
+ null,
+ ]
+ ] },
+ expected: { foo: { 'array': [
+ { 'array': [
+ { 'string': 'bar' },
+ { 'double': 123321 },
+ null,
+ ] },
+ ] } },
+ },
+ {
+ label: 'Nested Array with mixed values',
+ delta_data: { foo: [
+ {
+ 'bar': {
+ 'wer': 'qaz',
+ 'qwe': 1572903485000,
+ 'asd': true,
+ 'zxc': null,
+ },
+ },
+ ] },
+ expected: { 'foo': { 'array': [
+ { 'map': {
+ 'bar': { 'map': {
+ 'wer': { 'string': 'qaz' },
+ 'qwe': { 'double': 1572903485000 },
+ 'asd': { 'boolean': true },
+ 'zxc': null,
+ } },
+ } },
+ ] } },
+ },
+ ].forEach( ( { label, delta_data, expected } ) =>
+ {
+ it( label, () =>
+ {
+ const encoded = 'FooBar';
+ const avroEncoderCtr = createMockEncoder( encoded );
+ const stub_schema = <AvroSchema>{};
+ const sut = new Sut(
+ avroEncoderCtr,
+ stub_schema,
+ );
+ const actual = sut.setDataTypes( delta_data );
+
+ expect( actual ).to.deep.equal( expected );
+ } );
+ } );
+ } );
+
+
+ it( 'Message is formatted correctly', () =>
+ {
+ const bucket = { foo: [ 'bar', 'baz' ] };
+ const ratedata = {};
+ const doc_id = <DocumentId>123;
+ const entity_name = 'Some Agency';
+ const entity_id = 123;
+ const startDate = <UnixTimestamp>345;
+ const lastUpdate = <UnixTimestamp>456;
+ const schema = createMockAvroSchema();
+ const ts = <UnixTimestamp>123;
+ const encoder = createMockEncoderCtor( schema );
+ const meta = <DocumentMeta>{
+ id: doc_id,
+ entity_name: entity_name,
+ entity_id: entity_id,
+ startDate: startDate,
+ lastUpdate: lastUpdate,
+ };
+
+ const delta = <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ };
+
+ const expected = {
+ event: {
+ id: 'STEP_SAVE',
+ ts: ts,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: doc_id,
+ created: startDate,
+ modified: lastUpdate,
+ },
+ session: {
+ Session: {
+ entity_name: entity_name,
+ entity_id: entity_id,
+ },
+ },
+ data: {
+ Data: {
+ bucket: {
+ 'foo': { 'array': [
+ { 'string': 'bar' },
+ { 'string': 'baz' },
+ ] }
+ },
+ },
+ },
+ ratedata: {
+ Data: {
+ bucket: {},
+ },
+ },
+ delta: {
+ Data: {
+ bucket: delta.data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: '',
+ },
+ },
+ };
+
+ let is_valid_called = false;
+
+ schema.isValid = ( data: Record<string, any>, _:any ) =>
+ {
+ expect( data ).to.deep.equal( expected );
+
+ is_valid_called = true;
+
+ return null;
+ }
+
+ return expect( new Sut( encoder, schema )
+ .write( ts, meta, delta, bucket, ratedata ) )
+ .to.eventually.deep.equal( Buffer.from( '' ) )
+ .then( _ =>
+ {
+ expect( is_valid_called ).to.be.true;
+ } )
+ } );
+} );
+
+
+function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr
+{
+ return ( _schema: AvroSchema ) =>
+ {
+ const mock = sinon.mock( Duplex );
+
+ mock.on = ( _: string, __: any ) => {};
+ mock.end = ( _: any ) => { return mock_encoded_data; };
+
+ return mock;
+ };
+}
+
+
+function createMockData( delta_data: any ): any
+{
+
+ return {
+ event: {
+ id: 'RATE',
+ ts: 1573856916,
+ actor: 'SERVER',
+ step: null,
+ },
+ document: {
+ id: 123123,
+ created: 1573856916,
+ modified: 1573856916,
+ top_visited_step: '2',
+ },
+ data: null,
+ ratedata: null,
+ delta: {
+ Data: {
+ bucket: delta_data,
+ },
+ },
+ program: {
+ Program: {
+ id: 'quote_server',
+ version: 'dadaddwafdwa',
+ },
+ },
+ };
+}
+
+
+function createMockDelta(): Delta<any>
+{
+ return <Delta<any>>{
+ type: <DeltaType>'data',
+ timestamp: <UnixTimestamp>123123123,
+ data: <DeltaResult<any>>{},
+ }
+}
+
+
+function createMockBucketData(): Record<string, any>
+{
+ return {
+ foo: [ 'bar', 'baz' ]
+ }
+}
+
+
+function createMockEncoderCtor( stub_schema: AvroSchema ):
+ ( schema: AvroSchema ) => Duplex
+{
+ const events = <Record<string, () => void>>{};
+
+ const mock_duplex = <Duplex>(<unknown>{
+ on( event_name: string, callback: () => void )
+ {
+ events[ event_name ] = callback;
+ },
+
+ end()
+ {
+ events.end();
+ },
+ } );
+
+ return ( schema: AvroSchema ): Duplex =>
+ {
+ expect( schema ).to.equal( stub_schema );
+ return mock_duplex;
+ };
+}
+
+
+function createMockAvroSchema(): AvroSchema
+{
+ return <AvroSchema>{
+ toBuffer() { return null },
+ isValid() { return null },
+ encode() {},
+ toString() { return '' },
+ fromBuffer() { return {} },
+ };
+}
diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts
index 36a332c..1e4237d 100644
--- a/test/system/amqp/AmqpConnectionTest.ts
+++ b/test/system/amqp/AmqpConnectionTest.ts
@@ -41,7 +41,7 @@ describe( 'AmqpConnection', () =>
assertExchange() {
return Promise.reject( expected_err );
},
- });
+ } );
const mock_connection = <amqplib.Connection>(<unknown>{
once() {},
@@ -49,13 +49,13 @@ describe( 'AmqpConnection', () =>
createChannel() {
return Promise.resolve( mock_channel );
},
- });
+ } );
const mock_amqp = <typeof amqplib>(<unknown>{
connect() {
return Promise.resolve( mock_connection );
}
- });
+ } );
const emitter = new EventEmitter();
const conf = <AmqpConfig>{};
@@ -65,5 +65,48 @@ describe( 'AmqpConnection', () =>
.to.eventually.be.rejectedWith( expected_err );
} );
} );
+
+
+ describe( '#reconnect', () =>
+ {
+ it( "is called when there is an error with the connection", () =>
+ {
+ let reconnect_called = false;
+
+ const mock_channel = <amqplib.Channel>(<unknown>{
+ assertExchange() {
+ return Promise.resolve();
+ },
+ } );
+
+ const mock_connection = <amqplib.Connection>Object.create(
+ new EventEmitter()
+ );
+
+ mock_connection.createChannel = (): any => {
+ return Promise.resolve( mock_channel );
+ };
+
+ const mock_amqp = <typeof amqplib>(<unknown>{
+ connect() {
+ return Promise.resolve( mock_connection );
+ }
+ } );
+
+ const emitter = new EventEmitter();
+
+ emitter.on( 'amqp-reconnect', () => { reconnect_called = true } );
+
+ const conf = <AmqpConfig>{};
+ const sut = new Sut( mock_amqp, conf, emitter );
+
+ const result = sut.connect()
+ .then( () => mock_connection.emit( 'error' ) )
+
+ return expect( result )
+ .to.eventually.deep.equal( true )
+ .then( _ => expect( reconnect_called ).to.be.true );
+ } );
+ } );
} );