diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/system/DeltaProcessorTest.ts | 226 | ||||
-rw-r--r-- | test/system/DeltaPublisherTest.ts | 443 | ||||
-rw-r--r-- | test/system/EventMediatorTest.ts | 4 | ||||
-rw-r--r-- | test/system/MetricsCollectorTest.ts | 2 | ||||
-rw-r--r-- | test/system/V1MessageWriterTest.ts | 532 | ||||
-rw-r--r-- | test/system/amqp/AmqpConnectionTest.ts | 49 |
6 files changed, 779 insertions, 477 deletions
diff --git a/test/system/DeltaProcessorTest.ts b/test/system/DeltaProcessorTest.ts index 2cd68e2..1587bb2 100644 --- a/test/system/DeltaProcessorTest.ts +++ b/test/system/DeltaProcessorTest.ts @@ -22,7 +22,7 @@ import { DeltaProcessor as Sut } from '../../src/system/DeltaProcessor'; import { AmqpPublisher } from '../../src/system/AmqpPublisher'; import { DeltaDao } from '../../src/system/db/DeltaDao'; -import { DeltaDocument } from "../../src/bucket/delta"; +import { DeltaDocument } from '../../src/bucket/delta'; import { DocumentId } from '../../src/document/Document'; import { EventEmitter } from 'events'; @@ -40,7 +40,7 @@ describe( 'system.DeltaProcessor', () => expected: any }[]>[ { - label: "No deltas are processed", + label: 'No deltas are processed', given: [ { id: 123, @@ -53,9 +53,9 @@ describe( 'system.DeltaProcessor', () => expected: [], }, - // when quote is initialized: { foo: [ "" ], state: [ "a" ] } + // when quote is initialized: { foo: [ '' ], state: [ 'a' ] } { - label: "Publishes deltas in order", + label: 'Publishes deltas in order', given: [ { @@ -63,13 +63,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 123123123, data: { - foo: [ "third" ], - state: [ "a", "b", "c", "d" ], + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], }, ratedata: { - prem: [ "rate_second" ], - state: [ "i", "ii", "iii" ], + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], }, rdelta: { @@ -77,21 +77,21 @@ describe( 'system.DeltaProcessor', () => { timestamp: 1, data: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, }, { timestamp: 3, data: { - foo: [ "first" ], + foo: [ 'first' ], state: [ undefined, undefined, null ], }, }, { timestamp: 5, data: { - foo: [ "second" ], + foo: [ 'second' ], state: [ undefined, undefined, undefined, null ], }, }, @@ -101,14 +101,14 @@ describe( 'system.DeltaProcessor', () => { timestamp: 2, data: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, }, { timestamp: 4, data: { - prem: [ "rate_first" ], + prem: [ 'rate_first' ], state: [ undefined, undefined, null ], }, }, @@ -122,12 +122,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -136,16 +136,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, @@ -153,12 +153,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "first" ], + foo: [ 'first' ], state: [ undefined, undefined, null ], }, bucket: { - foo: [ "second" ], - state: [ "a", "b", "c" ], + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], }, ratedata: {}, }, @@ -167,16 +167,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "rate_first" ], + prem: [ 'rate_first' ], state: [ undefined, undefined, null ], }, bucket: { - foo: [ "second" ], - state: [ "a", "b", "c" ], + foo: [ 'second' ], + state: [ 'a', 'b', 'c' ], }, ratedata: { - prem: [ "rate_second" ], - state: [ "i", "ii", "iii" ], + prem: [ 'rate_second' ], + state: [ 'i', 'ii', 'iii' ], }, }, @@ -184,12 +184,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "second" ], + foo: [ 'second' ], state: [ undefined, undefined, undefined, null ], }, bucket: { - foo: [ "third" ], - state: [ "a", "b", "c", "d" ], + foo: [ 'third' ], + state: [ 'a', 'b', 'c', 'd' ], }, ratedata: {}, }, @@ -197,7 +197,7 @@ describe( 'system.DeltaProcessor', () => }, { - label: "Publishes deltas in order for multiple documents", + label: 'Publishes deltas in order for multiple documents', given: [ { @@ -205,13 +205,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 123123123, data: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, rdelta: { @@ -219,7 +219,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 1, data: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, }, @@ -229,7 +229,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 4, data: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, }, @@ -245,13 +245,13 @@ describe( 'system.DeltaProcessor', () => lastUpdate: 121212123, data: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem2: [ "rate_first" ], - state: [ "i", "ii" ], + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, rdelta: { @@ -259,7 +259,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 2, data: { - foo2: [ "" ], + foo2: [ '' ], state: [ undefined, null ], }, }, @@ -269,7 +269,7 @@ describe( 'system.DeltaProcessor', () => { timestamp: 3, data: { - prem2: [ "" ], + prem2: [ '' ], state: [ undefined, null ], }, }, @@ -283,12 +283,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - foo: [ "" ], + foo: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -297,16 +297,16 @@ describe( 'system.DeltaProcessor', () => { doc_id: 123, rdelta: { - prem: [ "" ], + prem: [ '' ], state: [ undefined, null ], }, bucket: { - foo: [ "first" ], - state: [ "a", "b" ], + foo: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem: [ "rate_first" ], - state: [ "i", "ii" ], + prem: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, @@ -314,12 +314,12 @@ describe( 'system.DeltaProcessor', () => { doc_id: 234, rdelta: { - foo2: [ "" ], + foo2: [ '' ], state: [ undefined, null ], }, bucket: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: {}, }, @@ -328,37 +328,37 @@ describe( 'system.DeltaProcessor', () => { doc_id: 234, rdelta: { - prem2: [ "" ], + prem2: [ '' ], state: [ undefined, null ], }, bucket: { - foo2: [ "first" ], - state: [ "a", "b" ], + foo2: [ 'first' ], + state: [ 'a', 'b' ], }, ratedata: { - prem2: [ "rate_first" ], - state: [ "i", "ii" ], + prem2: [ 'rate_first' ], + state: [ 'i', 'ii' ], }, }, ], }, { - label: "trims delta array based on index", + label: 'trims delta array based on index', given: [ { id: 111, lastUpdate: 123123123, - data: { foo: [ "second" ] }, + data: { foo: [ 'second' ] }, ratedata: {}, rdelta: { data: [ { - data: { foo: [ "" ] }, + data: { foo: [ '' ] }, timestamp: 123, }, { - data: { foo: [ "first" ] }, + data: { foo: [ 'first' ] }, timestamp: 234, }, ], @@ -371,8 +371,8 @@ describe( 'system.DeltaProcessor', () => expected: [ { doc_id: 111, - rdelta: { foo: [ "first" ] }, - bucket: { foo: [ "second" ] }, + rdelta: { foo: [ 'first' ] }, + bucket: { foo: [ 'second' ] }, ratedata: {} }, ], @@ -390,14 +390,14 @@ describe( 'system.DeltaProcessor', () => } publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise<void> => { published.push( { - doc_id: doc_id, + doc_id: meta.id, rdelta: delta.data, bucket: bucket, ratedata: ratedata, @@ -422,12 +422,19 @@ describe( 'system.DeltaProcessor', () => const dao = createMockDeltaDao(); const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); + const entity_num = 'Some Agency'; + const entity_id = 4321; + const lastUpdate = <UnixTimestamp>123123123; + const createdData = <UnixTimestamp>234234234; const doc = <DeltaDocument[]>[ { - id: <DocumentId>123, - lastUpdate: <UnixTimestamp>123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: <DocumentId>123, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: lastUpdate, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -439,11 +446,14 @@ describe( 'system.DeltaProcessor', () => }, }, { - id: <DocumentId>234, - lastUpdate: <UnixTimestamp>123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: <DocumentId>234, + agentName: entity_num, + agentEntityId: entity_id, + startDate: createdData, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -457,13 +467,25 @@ describe( 'system.DeltaProcessor', () => const expected_published = [ { - doc_id: 123, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, }, { - doc_id: 234, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 234, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, @@ -485,14 +507,14 @@ describe( 'system.DeltaProcessor', () => } publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise<void> => { published.push( { - doc_id: doc_id, + meta: meta, delta: delta.data, bucket: bucket, ratedata: ratedata, @@ -525,11 +547,14 @@ describe( 'system.DeltaProcessor', () => const publisher = createMockDeltaPublisher(); const emitter = new EventEmitter(); const doc = <DeltaDocument[]>[ { - id: <DocumentId>123, - lastUpdate: <UnixTimestamp>123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: <DocumentId>123, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: <UnixTimestamp>234234234, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -541,11 +566,14 @@ describe( 'system.DeltaProcessor', () => }, }, { - id: <DocumentId>234, - lastUpdate: <UnixTimestamp>123123123, - data: { foo: [ 'start_bar' ] }, - ratedata: {}, - rdelta: { + id: <DocumentId>234, + agentName: 'Some Agency', + agentEntityId: 4321, + startDate: <UnixTimestamp>234234234, + lastUpdate: <UnixTimestamp>123123123, + data: { foo: [ 'start_bar' ] }, + ratedata: {}, + rdelta: { data: [ { data: { foo: [ 'first_bar' ] }, @@ -559,7 +587,13 @@ describe( 'system.DeltaProcessor', () => // Only one is published const expected_published = [ { - doc_id: 123, + meta: { + entity_id: 4321, + entity_name: 'Some Agency', + id: 123, + lastUpdate: 123123123, + startDate: 234234234, + }, delta: { foo: [ 'first_bar' ] }, bucket: { foo: [ 'start_bar' ] }, ratedata: {}, @@ -577,14 +611,14 @@ describe( 'system.DeltaProcessor', () => Promise.reject( new Error( expected_error ) ); publisher.publish = ( - doc_id, + meta, delta, bucket, ratedata, ): Promise<void> => { published.push( { - doc_id: doc_id, + meta, delta: delta.data, bucket: bucket, ratedata: ratedata, diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts index 314c7da..f352b0c 100644 --- a/test/system/DeltaPublisherTest.ts +++ b/test/system/DeltaPublisherTest.ts @@ -22,20 +22,17 @@ import { AmqpConnection } from '../../src/system/amqp/AmqpConnection'; import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; import { DeltaPublisher as Sut } from '../../src/system/DeltaPublisher'; -import { DocumentId } from '../../src/document/Document'; -import { Duplex } from 'stream'; -import { EventEmitter } from "events"; +import { DocumentId, DocumentMeta } from '../../src/document/Document'; +import { EventEmitter } from 'events'; import { hasContext } from '../../src/error/ContextError'; import { AmqpError } from '../../src/error/AmqpError'; import { Channel } from 'amqplib'; -import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; +import { MessageWriter } from '../../src/system/MessageWriter'; -import { AvroSchema } from "avro-js"; import { expect, use as chai_use } from 'chai'; chai_use( require( 'chai-as-promised' ) ); -const sinon = require( 'sinon' ); describe( 'server.DeltaPublisher', () => { @@ -49,6 +46,15 @@ describe( 'server.DeltaPublisher', () => const ratedata = createMockBucketData(); const emitter = new EventEmitter(); const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + conn.getAmqpChannel = () => { return <Channel>{ @@ -63,23 +69,10 @@ describe( 'server.DeltaPublisher', () => }; }; - const stub_schema = <AvroSchema>(<unknown>{ - isValid() - { - // TODO: test me - }, - } ); - - const sut = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema, - ); + const sut = new Sut( emitter, ts_ctr, conn, writer ); return expect( - sut.publish( <DocumentId>123, delta, bucket, ratedata ) + sut.publish( meta, delta, bucket, ratedata ) ).to.eventually.deep.equal( undefined ) .then( _ => { @@ -119,29 +112,25 @@ describe( 'server.DeltaPublisher', () => const ratedata = createMockBucketData(); const emitter = new EventEmitter(); const conn = createMockAmqpConnection(); - const doc_id = <DocumentId>123; + const writer = createMockWriter(); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + const expected = { - doc_id: doc_id, + doc_id: meta.id, delta_type: delta.type, delta_ts: delta.timestamp } conn.getAmqpChannel = getChannelF; - const stub_schema = <AvroSchema>(<unknown>{ - isValid() - { - // TODO: test me - }, - } ); - - const result = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema, - ).publish( doc_id, delta, bucket, ratedata ); + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); return Promise.all( [ expect( result ).to.eventually.be.rejectedWith( @@ -158,276 +147,47 @@ describe( 'server.DeltaPublisher', () => } ) ] ); } ) ); - } ); - - describe( '#avroEncode parses', () => - { - [ - { - label: 'Null value', - valid: true, - delta_data: { foo: null }, - }, - { - label: 'Null array', - valid: true, - delta_data: { foo: { "array": [ null ] } }, - }, - { - label: 'Boolean value', - valid: true, - delta_data: { foo: { "array": [ - { "boolean": true }, - ] } }, - }, - { - label: 'Simple string', - valid: true, - delta_data: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] } }, - }, - { - label: 'Simple int', - valid: true, - delta_data: { foo: { "array": [ - { "double": 123 }, - ] } }, - }, - { - label: 'Nested array', - valid: true, - delta_data: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - ] }, - ] } }, - }, - { - label: 'Array with nulls', - valid: true, - delta_data: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - null, - ] } }, - }, - { - label: 'Nested Array with mixed values', - valid: true, - delta_data: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123321 }, - null, - ] } - ] } }, - }, - { - label: 'Non-array', - valid: false, - delta_data: { foo: 'bar' }, - }, - { - label: 'Map objects', - valid: true, - delta_data: { "foo": { "array": [ - { "map": { - "bar": { "map": { - "baz": { "double": 1572903485000 }, - } } - } } - ] } }, - } - ].forEach( ( { label, delta_data, valid } ) => - { - it( label, () => - { - const emitter = createMockEventEmitter(); - const conn = createMockAmqpConnection(); - const data = createMockData( delta_data ); - - const stub_schema = <AvroSchema>(<unknown>{ - isValid() - { - // TODO: test me - }, - } ); - - const sut = new Sut( - emitter, - ts_ctr, - createMockEncoderCtor( stub_schema ), - conn, - stub_schema - ); - - sut.avroEncode( data ) - .then( b => - { - expect( typeof(b) ).to.equal( 'object' ); - expect( valid ).to.be.true; - } ) - .catch( _ => - { - expect( valid ).to.be.false; - } ); - } ); - } ); - } ); - describe( '#setDataTypes annotates', () => - { - [ - { - label: 'Null', - delta_data: null, - expected: null, - }, - { - label: 'Null Value', - delta_data: { foo: null }, - expected: { foo: null }, - }, - { - label: 'Boolean Value', - delta_data: { foo: [ true ] }, - expected: { foo: { "array": [ - { "boolean": true }, - ] } }, - }, - { - label: 'Simple string', - delta_data: { foo: [ - 'bar', - 'baz', - ] }, - expected: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] } }, - }, - { - label: 'Simple int', - delta_data: { foo: [ - 123 - ] }, - expected: { foo: { "array": [ - { "double": 123 }, - ] } }, - }, - { - label: 'Nested array', - delta_data: { foo: [ - [ - 'bar', - 'baz', - ] - ] }, - expected: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - ] }, - ] } }, - }, - { - label: 'Double nested array', - delta_data: { foo: [ - [ - [ - 'bar', - 123, - null - ], - ], - ] }, - expected: { foo: { "array": [ - { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123 }, - null, - ] }, - ] }, - ] } }, - }, - { - label: 'Array with nulls', - delta_data: { foo: [ - 'bar', - 'baz', - null - ] }, - expected: { foo: { "array": [ - { "string": 'bar' }, - { "string": 'baz' }, - null - ] } }, - }, - { - label: 'Nested Array with mixed values', - delta_data: { foo: [ - [ - 'bar', - 123321, - null, - ] - ] }, - expected: { foo: { "array": [ - { "array": [ - { "string": 'bar' }, - { "double": 123321 }, - null, - ] }, - ] } }, - }, - { - label: 'Nested Array with mixed values', - delta_data: { foo: [ - { - "bar": { - "wer": 'qaz', - "qwe": 1572903485000, - "asd": true, - "zxc": null, - }, - }, - ] }, - expected: { "foo": { "array": [ - { "map": { - "bar": { "map": { - "wer": { "string": 'qaz' }, - "qwe": { "double": 1572903485000 }, - "asd": { "boolean": true }, - "zxc": null, - } }, - } }, - ] } }, - }, - ].forEach( ( { label, delta_data, expected } ) => + it( 'writer#write rejects', () => { - it( label, () => + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const emitter = new EventEmitter(); + const conn = createMockAmqpConnection(); + const writer = createMockWriter(); + const error = new Error( 'Bad thing happened' ); + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + writer.write = ( + _: any, + __: any, + ___: any, + ____: any, + _____: any + ): Promise<Buffer> => { - const encoded = 'FooBar'; - const emitter = createMockEventEmitter(); - const conn = createMockAmqpConnection(); - const avroEncoderCtr = createMockEncoder( encoded ); - const stub_schema = <AvroSchema>{}; - const sut = new Sut( - emitter, - ts_ctr, - avroEncoderCtr, - conn, - stub_schema, - ); - const actual = sut.setDataTypes( delta_data ); + return Promise.reject( error ); + }; - expect( actual ).to.deep.equal( expected ); - } ); - } ); + const result = new Sut( emitter, ts_ctr, conn, writer ) + .publish( meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + return expect( e ).to.deep.equal( error ); + } ) + ] ); + } ) } ); } ); @@ -438,26 +198,6 @@ function ts_ctr(): UnixTimestamp } -function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr -{ - return ( _schema: AvroSchema ) => - { - const mock = sinon.mock( Duplex ); - - mock.on = ( _: string, __: any ) => {}; - mock.end = ( _: any ) => { return mock_encoded_data; }; - - return mock; - }; -} - - -function createMockEventEmitter(): EventEmitter -{ - return <EventEmitter>{}; -} - - function createMockAmqpConnection(): AmqpConnection { return <AmqpConnection>{ @@ -467,39 +207,6 @@ function createMockAmqpConnection(): AmqpConnection } -function createMockData( delta_data: any ): any -{ - - return { - event: { - id: 'RATE', - ts: 1573856916, - actor: 'SERVER', - step: null, - }, - document: { - id: 123123, - created: 1573856916, - modified: 1573856916, - top_visited_step: '2', - }, - data: null, - ratedata: null, - delta: { - Data: { - bucket: delta_data, - }, - }, - program: { - Program: { - id: 'quote_server', - version: 'dadaddwafdwa', - }, - }, - }; -} - - function createMockBucketData(): Record<string, any> { return { @@ -518,26 +225,12 @@ function createMockDelta(): Delta<any> } -function createMockEncoderCtor( stub_schema: AvroSchema ): - ( schema: AvroSchema ) => Duplex +function createMockWriter(): MessageWriter { - const events = <Record<string, () => void>>{}; - - const mock_duplex = <Duplex>(<unknown>{ - on( event_name: string, callback: () => void ) + return <MessageWriter>{ + write( _: any, __:any, ___:any, ____:any, _____:any ): Promise<Buffer> { - events[ event_name ] = callback; - }, - - end() - { - events.end(); - }, - } ); - - return ( schema: AvroSchema ): Duplex => - { - expect( schema ).to.equal( stub_schema ); - return mock_duplex; + return Promise.resolve( Buffer.from( '' ) ); + } }; -} +}
\ No newline at end of file diff --git a/test/system/EventMediatorTest.ts b/test/system/EventMediatorTest.ts index caab191..abfbef8 100644 --- a/test/system/EventMediatorTest.ts +++ b/test/system/EventMediatorTest.ts @@ -62,11 +62,11 @@ describe( 'system.EventLogger captures and logs events', () => expect( method_called ).to.be.true; } ); - it( 'amqp-conn-error triggers log#warning', () => + it( 'amqp-conn-warn triggers log#warning', () => { let method_called = false; - const event_id = 'amqp-conn-error'; + const event_id = 'amqp-conn-warn'; const emitter = new EventEmitter(); const log = createMockLogger(); diff --git a/test/system/MetricsCollectorTest.ts b/test/system/MetricsCollectorTest.ts index eafc77d..9a36584 100644 --- a/test/system/MetricsCollectorTest.ts +++ b/test/system/MetricsCollectorTest.ts @@ -72,7 +72,7 @@ describe( 'system.MetricsCollector captures events and pushes metrics', () => const sut = new Sut( factory, conf, emitter, timer ); - emitter.emit( 'delta-process-error' ); + emitter.emit( 'error' ); expect( counter_called ).to.be.true; diff --git a/test/system/V1MessageWriterTest.ts b/test/system/V1MessageWriterTest.ts new file mode 100644 index 0000000..271d735 --- /dev/null +++ b/test/system/V1MessageWriterTest.ts @@ -0,0 +1,532 @@ +/** + * V1 Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Tests for Version 1 of the avro message writer + */ + +import { V1MessageWriter as Sut } from '../../src/system/avro/V1MessageWriter'; +import { hasContext, context } from '../../src/error/ContextError'; +import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory'; +import { Delta, DeltaResult, DeltaType } from '../../src/bucket/delta'; +import { DocumentMeta, DocumentId } from '../../src/document/Document'; +import { Duplex } from 'stream'; +import { AvroSchema } from 'avro-js'; + +import { expect, use as chai_use } from 'chai'; +chai_use( require( 'chai-as-promised' ) ); + +const sinon = require( 'sinon' ); + +describe( 'system.V1MessageWriter', () => +{ + it( 'Rejects improperly formatted data', () => + { + const delta = createMockDelta(); + const bucket = createMockBucketData(); + const ratedata = createMockBucketData(); + const error = new Error( 'Oh no' ); + const schema = createMockAvroSchema(); + const ts = <UnixTimestamp>123; + const meta = <DocumentMeta>{ + id: <DocumentId>123, + entity_name: 'Some Agency', + entity_id: 234, + startDate: <UnixTimestamp>345, + lastUpdate: <UnixTimestamp>456, + }; + + const expected = { + invalid_paths: 'Foo', + invalid_data: 'Bar', + }; + + const error_context = context( error, expected ); + + schema.isValid = () => { throw error_context; }; + + const result = new Sut( + createMockEncoderCtor( schema ), + schema, + ).write( ts, meta, delta, bucket, ratedata ); + + return Promise.all( [ + expect( result ).to.eventually.be.rejectedWith( error ), + result.catch( e => + { + if ( !hasContext( e ) ) + { + return expect.fail(); + } + + return expect( e.context ).to.deep.equal( expected ); + } ) + ] ); + } ); + + + describe( '#avroEncode parses', () => + { + [ + { + label: 'Null value', + valid: true, + delta_data: { foo: null }, + }, + { + label: 'Null array', + valid: true, + delta_data: { foo: { 'array': [ null ] } }, + }, + { + label: 'Boolean value', + valid: true, + delta_data: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + valid: true, + delta_data: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + valid: true, + delta_data: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null, + ] } }, + }, + { + label: 'Nested Array with mixed values', + valid: true, + delta_data: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] } + ] } }, + }, + { + label: 'Non-array', + valid: false, + delta_data: { foo: 'bar' }, + }, + { + label: 'Map objects', + valid: true, + delta_data: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'baz': { 'double': 1572903485000 }, + } } + } } + ] } }, + } + ].forEach( ( { label, delta_data, valid } ) => + { + it( label, () => + { + const data = createMockData( delta_data ); + const schema = createMockAvroSchema(); + + const sut = new Sut( + createMockEncoderCtor( schema ), + schema + ); + + sut.avroEncode( data ) + .then( b => + { + expect( typeof(b) ).to.equal( 'object' ); + expect( valid ).to.be.true; + } ) + .catch( _ => + { + expect( valid ).to.be.false; + } ); + } ); + } ); + } ); + + + describe( '#setDataTypes annotates', () => + { + [ + { + label: 'Null', + delta_data: null, + expected: null, + }, + { + label: 'Null Value', + delta_data: { foo: null }, + expected: { foo: null }, + }, + { + label: 'Boolean Value', + delta_data: { foo: [ true ] }, + expected: { foo: { 'array': [ + { 'boolean': true }, + ] } }, + }, + { + label: 'Simple string', + delta_data: { foo: [ + 'bar', + 'baz', + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } }, + }, + { + label: 'Simple int', + delta_data: { foo: [ + 123 + ] }, + expected: { foo: { 'array': [ + { 'double': 123 }, + ] } }, + }, + { + label: 'Nested array', + delta_data: { foo: [ + [ + 'bar', + 'baz', + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] }, + ] } }, + }, + { + label: 'Double nested array', + delta_data: { foo: [ + [ + [ + 'bar', + 123, + null + ], + ], + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123 }, + null, + ] }, + ] }, + ] } }, + }, + { + label: 'Array with nulls', + delta_data: { foo: [ + 'bar', + 'baz', + null + ] }, + expected: { foo: { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + null + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + [ + 'bar', + 123321, + null, + ] + ] }, + expected: { foo: { 'array': [ + { 'array': [ + { 'string': 'bar' }, + { 'double': 123321 }, + null, + ] }, + ] } }, + }, + { + label: 'Nested Array with mixed values', + delta_data: { foo: [ + { + 'bar': { + 'wer': 'qaz', + 'qwe': 1572903485000, + 'asd': true, + 'zxc': null, + }, + }, + ] }, + expected: { 'foo': { 'array': [ + { 'map': { + 'bar': { 'map': { + 'wer': { 'string': 'qaz' }, + 'qwe': { 'double': 1572903485000 }, + 'asd': { 'boolean': true }, + 'zxc': null, + } }, + } }, + ] } }, + }, + ].forEach( ( { label, delta_data, expected } ) => + { + it( label, () => + { + const encoded = 'FooBar'; + const avroEncoderCtr = createMockEncoder( encoded ); + const stub_schema = <AvroSchema>{}; + const sut = new Sut( + avroEncoderCtr, + stub_schema, + ); + const actual = sut.setDataTypes( delta_data ); + + expect( actual ).to.deep.equal( expected ); + } ); + } ); + } ); + + + it( 'Message is formatted correctly', () => + { + const bucket = { foo: [ 'bar', 'baz' ] }; + const ratedata = {}; + const doc_id = <DocumentId>123; + const entity_name = 'Some Agency'; + const entity_id = 123; + const startDate = <UnixTimestamp>345; + const lastUpdate = <UnixTimestamp>456; + const schema = createMockAvroSchema(); + const ts = <UnixTimestamp>123; + const encoder = createMockEncoderCtor( schema ); + const meta = <DocumentMeta>{ + id: doc_id, + entity_name: entity_name, + entity_id: entity_id, + startDate: startDate, + lastUpdate: lastUpdate, + }; + + const delta = <Delta<any>>{ + type: <DeltaType>'data', + timestamp: <UnixTimestamp>123123123, + data: <DeltaResult<any>>{}, + }; + + const expected = { + event: { + id: 'STEP_SAVE', + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: doc_id, + created: startDate, + modified: lastUpdate, + }, + session: { + Session: { + entity_name: entity_name, + entity_id: entity_id, + }, + }, + data: { + Data: { + bucket: { + 'foo': { 'array': [ + { 'string': 'bar' }, + { 'string': 'baz' }, + ] } + }, + }, + }, + ratedata: { + Data: { + bucket: {}, + }, + }, + delta: { + Data: { + bucket: delta.data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + }; + + let is_valid_called = false; + + schema.isValid = ( data: Record<string, any>, _:any ) => + { + expect( data ).to.deep.equal( expected ); + + is_valid_called = true; + + return null; + } + + return expect( new Sut( encoder, schema ) + .write( ts, meta, delta, bucket, ratedata ) ) + .to.eventually.deep.equal( Buffer.from( '' ) ) + .then( _ => + { + expect( is_valid_called ).to.be.true; + } ) + } ); +} ); + + +function createMockEncoder( mock_encoded_data: string ): AvroEncoderCtr +{ + return ( _schema: AvroSchema ) => + { + const mock = sinon.mock( Duplex ); + + mock.on = ( _: string, __: any ) => {}; + mock.end = ( _: any ) => { return mock_encoded_data; }; + + return mock; + }; +} + + +function createMockData( delta_data: any ): any +{ + + return { + event: { + id: 'RATE', + ts: 1573856916, + actor: 'SERVER', + step: null, + }, + document: { + id: 123123, + created: 1573856916, + modified: 1573856916, + top_visited_step: '2', + }, + data: null, + ratedata: null, + delta: { + Data: { + bucket: delta_data, + }, + }, + program: { + Program: { + id: 'quote_server', + version: 'dadaddwafdwa', + }, + }, + }; +} + + +function createMockDelta(): Delta<any> +{ + return <Delta<any>>{ + type: <DeltaType>'data', + timestamp: <UnixTimestamp>123123123, + data: <DeltaResult<any>>{}, + } +} + + +function createMockBucketData(): Record<string, any> +{ + return { + foo: [ 'bar', 'baz' ] + } +} + + +function createMockEncoderCtor( stub_schema: AvroSchema ): + ( schema: AvroSchema ) => Duplex +{ + const events = <Record<string, () => void>>{}; + + const mock_duplex = <Duplex>(<unknown>{ + on( event_name: string, callback: () => void ) + { + events[ event_name ] = callback; + }, + + end() + { + events.end(); + }, + } ); + + return ( schema: AvroSchema ): Duplex => + { + expect( schema ).to.equal( stub_schema ); + return mock_duplex; + }; +} + + +function createMockAvroSchema(): AvroSchema +{ + return <AvroSchema>{ + toBuffer() { return null }, + isValid() { return null }, + encode() {}, + toString() { return '' }, + fromBuffer() { return {} }, + }; +} diff --git a/test/system/amqp/AmqpConnectionTest.ts b/test/system/amqp/AmqpConnectionTest.ts index 36a332c..1e4237d 100644 --- a/test/system/amqp/AmqpConnectionTest.ts +++ b/test/system/amqp/AmqpConnectionTest.ts @@ -41,7 +41,7 @@ describe( 'AmqpConnection', () => assertExchange() { return Promise.reject( expected_err ); }, - }); + } ); const mock_connection = <amqplib.Connection>(<unknown>{ once() {}, @@ -49,13 +49,13 @@ describe( 'AmqpConnection', () => createChannel() { return Promise.resolve( mock_channel ); }, - }); + } ); const mock_amqp = <typeof amqplib>(<unknown>{ connect() { return Promise.resolve( mock_connection ); } - }); + } ); const emitter = new EventEmitter(); const conf = <AmqpConfig>{}; @@ -65,5 +65,48 @@ describe( 'AmqpConnection', () => .to.eventually.be.rejectedWith( expected_err ); } ); } ); + + + describe( '#reconnect', () => + { + it( "is called when there is an error with the connection", () => + { + let reconnect_called = false; + + const mock_channel = <amqplib.Channel>(<unknown>{ + assertExchange() { + return Promise.resolve(); + }, + } ); + + const mock_connection = <amqplib.Connection>Object.create( + new EventEmitter() + ); + + mock_connection.createChannel = (): any => { + return Promise.resolve( mock_channel ); + }; + + const mock_amqp = <typeof amqplib>(<unknown>{ + connect() { + return Promise.resolve( mock_connection ); + } + } ); + + const emitter = new EventEmitter(); + + emitter.on( 'amqp-reconnect', () => { reconnect_called = true } ); + + const conf = <AmqpConfig>{}; + const sut = new Sut( mock_amqp, conf, emitter ); + + const result = sut.connect() + .then( () => mock_connection.emit( 'error' ) ) + + return expect( result ) + .to.eventually.deep.equal( true ) + .then( _ => expect( reconnect_called ).to.be.true ); + } ); + } ); } ); |