Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMike Gerwitz <mike.gerwitz@ryansg.com>2019-12-09 14:03:47 -0500
committerAustin Schaffer <austin.schaffer@ryansg.com>2019-12-12 10:27:09 -0500
commite885026e0a1ecf1e21e821fb50fe5d1a46537664 (patch)
tree0003c2692ffdbbff0ea93e735c420852c175978a
parent4383d15c5a375b7e189888aa22f198557997db47 (diff)
downloadliza-e885026e0a1ecf1e21e821fb50fe5d1a46537664.tar.gz
liza-e885026e0a1ecf1e21e821fb50fe5d1a46537664.tar.bz2
liza-e885026e0a1ecf1e21e821fb50fe5d1a46537664.zip
DeltaPublisher: Remove parsing from constructor
-rw-r--r--bin/delta-processor.ts2
-rw-r--r--src/system/DeltaPublisher.ts17
-rw-r--r--test/system/DeltaPublisherTest.ts76
3 files changed, 74 insertions, 21 deletions
diff --git a/bin/delta-processor.ts b/bin/delta-processor.ts
index e8a4a92..11af02b 100644
--- a/bin/delta-processor.ts
+++ b/bin/delta-processor.ts
@@ -39,6 +39,7 @@ import {
createPrometheusConfig,
} from '../src/system/PrometheusFactory';
import { AmqpConnection } from '../src/system/amqp/AmqpConnection';
+import { parse as avro_parse } from "avro-js";
const amqp_conf = createAmqpConfig( process.env );
@@ -55,6 +56,7 @@ const publisher = new DeltaPublisher(
ts_ctr,
createAvroEncoder,
amqp_connection,
+ avro_parse( __dirname + '/../src/system/avro/schema.avsc' ),
);
// Prometheus Metrics
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index 5def3a9..9b05114 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -30,16 +30,10 @@ import { AmqpError } from '../error/AmqpError';
import { AvroEncoderCtr } from './avro/AvroFactory';
import { AmqpConnection } from './amqp/AmqpConnection';
-import { AvroSchema, parse } from "avro-js";
+import { AvroSchema } from "avro-js";
export class DeltaPublisher implements AmqpPublisher
{
- /** The avro schema */
- private _schema: AvroSchema;
-
- /** The path to the avro schema */
- readonly SCHEMA_PATH = __dirname + '/avro/schema.avsc';
-
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
data: 'STEP_SAVE',
@@ -58,11 +52,10 @@ export class DeltaPublisher implements AmqpPublisher
constructor(
private readonly _emitter: EventEmitter,
private readonly _ts_ctr: () => UnixTimestamp,
- private readonly _encoder_ctr: AvroEncoderCtr,
+ private readonly _encoder_ctor: AvroEncoderCtr,
private readonly _conn: AmqpConnection,
- ) {
- this._schema = parse( this.SCHEMA_PATH );
- }
+ private readonly _schema: AvroSchema,
+ ) {}
/**
@@ -263,7 +256,7 @@ export class DeltaPublisher implements AmqpPublisher
{
this._assertValidAvro( this._schema, data )
- const encoder = this._encoder_ctr( this._schema )
+ const encoder = this._encoder_ctor( this._schema )
encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } )
encoder.on('error', ( err: Error ) => { reject( err ); } )
diff --git a/test/system/DeltaPublisherTest.ts b/test/system/DeltaPublisherTest.ts
index b5dfed1..314c7da 100644
--- a/test/system/DeltaPublisherTest.ts
+++ b/test/system/DeltaPublisherTest.ts
@@ -28,10 +28,7 @@ import { EventEmitter } from "events";
import { hasContext } from '../../src/error/ContextError';
import { AmqpError } from '../../src/error/AmqpError';
import { Channel } from 'amqplib';
-import {
- createAvroEncoder,
- AvroEncoderCtr,
-} from '../../src/system/avro/AvroFactory';
+import { AvroEncoderCtr } from '../../src/system/avro/AvroFactory';
import { AvroSchema } from "avro-js";
@@ -66,7 +63,20 @@ describe( 'server.DeltaPublisher', () =>
};
};
- const sut = new Sut( emitter, ts_ctr, createAvroEncoder, conn );
+ const stub_schema = <AvroSchema>(<unknown>{
+ isValid()
+ {
+ // TODO: test me
+ },
+ } );
+
+ const sut = new Sut(
+ emitter,
+ ts_ctr,
+ createMockEncoderCtor( stub_schema ),
+ conn,
+ stub_schema,
+ );
return expect(
sut.publish( <DocumentId>123, delta, bucket, ratedata )
@@ -118,8 +128,20 @@ describe( 'server.DeltaPublisher', () =>
conn.getAmqpChannel = getChannelF;
- const result = new Sut( emitter, ts_ctr, createAvroEncoder, conn )
- .publish( doc_id, delta, bucket, ratedata );
+ const stub_schema = <AvroSchema>(<unknown>{
+ isValid()
+ {
+ // TODO: test me
+ },
+ } );
+
+ const result = new Sut(
+ emitter,
+ ts_ctr,
+ createMockEncoderCtor( stub_schema ),
+ conn,
+ stub_schema,
+ ).publish( doc_id, delta, bucket, ratedata );
return Promise.all( [
expect( result ).to.eventually.be.rejectedWith(
@@ -225,11 +247,20 @@ describe( 'server.DeltaPublisher', () =>
const emitter = createMockEventEmitter();
const conn = createMockAmqpConnection();
const data = createMockData( delta_data );
- const sut = new Sut(
+
+ const stub_schema = <AvroSchema>(<unknown>{
+ isValid()
+ {
+ // TODO: test me
+ },
+ } );
+
+ const sut = new Sut(
emitter,
ts_ctr,
- createAvroEncoder,
+ createMockEncoderCtor( stub_schema ),
conn,
+ stub_schema
);
sut.avroEncode( data )
@@ -384,11 +415,13 @@ describe( 'server.DeltaPublisher', () =>
const emitter = createMockEventEmitter();
const conn = createMockAmqpConnection();
const avroEncoderCtr = createMockEncoder( encoded );
+ const stub_schema = <AvroSchema>{};
const sut = new Sut(
emitter,
ts_ctr,
avroEncoderCtr,
conn,
+ stub_schema,
);
const actual = sut.setDataTypes( delta_data );
@@ -483,3 +516,28 @@ function createMockDelta(): Delta<any>
data: <DeltaResult<any>>{},
}
}
+
+
+function createMockEncoderCtor( stub_schema: AvroSchema ):
+ ( schema: AvroSchema ) => Duplex
+{
+ const events = <Record<string, () => void>>{};
+
+ const mock_duplex = <Duplex>(<unknown>{
+ on( event_name: string, callback: () => void )
+ {
+ events[ event_name ] = callback;
+ },
+
+ end()
+ {
+ events.end();
+ },
+ } );
+
+ return ( schema: AvroSchema ): Duplex =>
+ {
+ expect( schema ).to.equal( stub_schema );
+ return mock_duplex;
+ };
+}