Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.env7
-rw-r--r--conf/vanilla-server.json12
-rw-r--r--package.json.in3
-rw-r--r--src/quote/BaseQuote.d.ts28
-rw-r--r--src/system/DeltaProcessor.ts22
-rw-r--r--src/system/DeltaPublisher.ts34
-rw-r--r--src/system/avro/schema.avsc161
7 files changed, 235 insertions, 32 deletions
diff --git a/.env b/.env
new file mode 100644
index 0000000..983d285
--- /dev/null
+++ b/.env
@@ -0,0 +1,7 @@
+hostname=localhost
+port=5672
+username=
+password=
+vhost=dev
+exchange=quoteupdate
+
diff --git a/conf/vanilla-server.json b/conf/vanilla-server.json
index 4f79ad6..e924fb1 100644
--- a/conf/vanilla-server.json
+++ b/conf/vanilla-server.json
@@ -61,18 +61,6 @@
"heartbeat": 0,
"vhost": "/",
"exchange": "postrate"
- },
- "deltaPublish": {
- "protocol": "amqp",
- "hostname": "localhost",
- "port": 5672,
- "username": "",
- "password": "",
- "locale": "en_US",
- "frameMax": 0,
- "heartbeat": 0,
- "vhost": "/",
- "exchange": "quoteupdate"
}
},
"c1export": {
diff --git a/package.json.in b/package.json.in
index d81861d..0aea8e0 100644
--- a/package.json.in
+++ b/package.json.in
@@ -39,7 +39,8 @@
"@types/mocha": "5.2.0",
"sinon": ">=1.17.4",
"es6-promise": "~3",
- "@types/amqplib": "0.5.13"
+ "@types/amqplib": "0.5.13",
+ "avro-js": "1.9.1"
},
"licenses": [
diff --git a/src/quote/BaseQuote.d.ts b/src/quote/BaseQuote.d.ts
index 8273ad1..eb2204a 100644
--- a/src/quote/BaseQuote.d.ts
+++ b/src/quote/BaseQuote.d.ts
@@ -34,7 +34,7 @@ export declare class BaseQuote implements Quote
*
* @return quote program
*/
- getProgram(): Program
+ getProgram(): Program;
/**
@@ -42,7 +42,7 @@ export declare class BaseQuote implements Quote
*
* @return program id
*/
- getProgramId(): string
+ getProgramId(): string;
/**
@@ -54,7 +54,7 @@ export declare class BaseQuote implements Quote
*
* @return quote id
*/
- getId(): QuoteId
+ getId(): QuoteId;
/**
@@ -62,7 +62,7 @@ export declare class BaseQuote implements Quote
*
* @return id of current step
*/
- getCurrentStepId(): number
+ getCurrentStepId(): number;
/**
@@ -73,7 +73,7 @@ export declare class BaseQuote implements Quote
*
* @return self
*/
- setExplicitLock( reason: string, step: number ): this
+ setExplicitLock( reason: string, step: number ): this;
/**
@@ -83,7 +83,7 @@ export declare class BaseQuote implements Quote
*
* @return self
*/
- setLastPremiumDate( timestamp: UnixTimestamp ): this
+ setLastPremiumDate( timestamp: UnixTimestamp ): this;
/**
@@ -91,7 +91,7 @@ export declare class BaseQuote implements Quote
*
* @return last calculated time or 0
*/
- getLastPremiumDate(): UnixTimestamp
+ getLastPremiumDate(): UnixTimestamp;
/**
@@ -99,7 +99,7 @@ export declare class BaseQuote implements Quote
*
* @return the data bucket
*/
- getBucket(): QuoteDataBucket
+ getBucket(): QuoteDataBucket;
/**
@@ -107,7 +107,7 @@ export declare class BaseQuote implements Quote
*
* @return lock reason
*/
- getExplicitLockReason(): string
+ getExplicitLockReason(): string;
/**
@@ -117,7 +117,7 @@ export declare class BaseQuote implements Quote
*
* @return {number} locked max step or 0 if not applicable
*/
- getExplicitLockStep(): PositiveInteger
+ getExplicitLockStep(): PositiveInteger;
/**
@@ -125,7 +125,7 @@ export declare class BaseQuote implements Quote
*
* @return true if imported, otherwise false
*/
- isImported(): boolean
+ isImported(): boolean;
/**
@@ -133,7 +133,7 @@ export declare class BaseQuote implements Quote
*
* @return true if bound, otherwise false
*/
- isBound(): boolean
+ isBound(): boolean;
/**
@@ -141,7 +141,7 @@ export declare class BaseQuote implements Quote
*
* @return top visited step id
*/
- getTopVisitedStepId(): PositiveInteger
+ getTopVisitedStepId(): PositiveInteger;
/**
@@ -149,5 +149,5 @@ export declare class BaseQuote implements Quote
*
* @return top saved step id
*/
- getTopSavedStepId(): PositiveInteger
+ getTopSavedStepId(): PositiveInteger;
}
diff --git a/src/system/DeltaProcessor.ts b/src/system/DeltaProcessor.ts
index 67e372d..678e700 100644
--- a/src/system/DeltaProcessor.ts
+++ b/src/system/DeltaProcessor.ts
@@ -168,4 +168,26 @@ export class DeltaProcessor
return 0;
}
+
+
+ /**
+ * Generate amqp config from environment variables
+ *
+ * @returns the amqp configuration
+ */
+ // generateConfigFromEnv(): AmqpConfig
+ // {
+ // return <AmqpConfig>{
+ // "protocol": "amqp",
+ // "hostname": process.env.hostname,
+ // "port": process.env.port,
+ // "username": process.env.username,
+ // "password": process.env.password,
+ // "locale": "en_US",
+ // "frameMax": 0,
+ // "heartbeat": 0,
+ // "vhost": process.env.vhost,
+ // "exchange": process.env.exchange,
+ // };
+ // }
} \ No newline at end of file
diff --git a/src/system/DeltaPublisher.ts b/src/system/DeltaPublisher.ts
index 2606c56..57a74b6 100644
--- a/src/system/DeltaPublisher.ts
+++ b/src/system/DeltaPublisher.ts
@@ -29,6 +29,8 @@ import {
Channel
} from 'amqplib';
+const avro = require( 'avro-js' );
+
export interface AmqpConfig extends Options.Connect {
/** The name of a queue or exchange to publish to */
@@ -38,6 +40,9 @@ export interface AmqpConfig extends Options.Connect {
export class DeltaPublisher implements AmqpPublisher
{
+ /** The path to the avro schema */
+ readonly SCHEMA_PATH = './avro/schema.avsc';
+
/** A mapping of which delta type translated to which avro event */
readonly DELTA_MAP: Record<string, string> = {
data: 'rate',
@@ -48,8 +53,8 @@ export class DeltaPublisher implements AmqpPublisher
/**
* Initialize trait
*
- * @param {Object} conf AMQP configuration
- * @param {DebugLog} logger logger instance
+ * @param _conf - amqp configuration
+ * @param _logger - logger instance
*/
constructor(
private readonly _conf: AmqpConfig,
@@ -115,10 +120,12 @@ export class DeltaPublisher implements AmqpPublisher
const event_id = this.DELTA_MAP[ delta.type ];
- const data = new Buffer( JSON.stringify( {
+ const data = {
delta: delta,
event: event_id,
- } ) );
+ };
+
+ const avro_buffer = this._avroEncode( data );
// we don't use a routing key; fanout exchange
const routing_key = '';
@@ -126,8 +133,25 @@ export class DeltaPublisher implements AmqpPublisher
return channel.publish(
exchange,
routing_key,
- data,
+ avro_buffer,
{ headers: headers },
);
}
+
+
+ /**
+ * Encode the data in an avro buffer
+ *
+ * @param data - the data to encode
+ *
+ * @return the avro buffer
+ */
+ _avroEncode( data: Record<string, any> ): Buffer
+ {
+ const type = avro.parse( this.SCHEMA_PATH );
+
+ const buffer = type.toBuffer( data );
+
+ return buffer;
+ }
}
diff --git a/src/system/avro/schema.avsc b/src/system/avro/schema.avsc
new file mode 100644
index 0000000..ee793a6
--- /dev/null
+++ b/src/system/avro/schema.avsc
@@ -0,0 +1,161 @@
+{
+ "type": "record",
+ "name": "update",
+ "fields": [
+ {
+ "name": "event",
+ "type": {
+ "type": "record",
+ "name": "Event",
+ "fields": [
+ {
+ "name": "id",
+ "type": {
+ "name": "EventId",
+ "type": "enum",
+ "symbols": [
+ "STEP_SAVE",
+ "RATE"
+ ]
+ }
+ },
+ {
+ "name": "ts",
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ },
+ {
+ "name": "actor",
+ "type": {
+ "type": "enum",
+ "name": "EventActor",
+ "symbols": [ "USER", "CLIENT", "SERVER" ]
+ }
+ },
+ {
+ "name": "step",
+ "type": {
+ "type": "record",
+ "name": "EventStep",
+ "fields": [
+ {
+ "name": "transition",
+ "type": {
+ "type": "enum",
+ "name": "EventStepTransition",
+ "symbols": [ "BACK", "FORWARD", "END" ]
+ }
+ },
+ {
+ "name": "src",
+ "type": "string"
+ },
+ {
+ "name": "dest",
+ "type": "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "document",
+ "type": {
+ "type": "record",
+ "name": "Document",
+ "doc": "Source document (quote)",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "created",
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ },
+ {
+ "name": "modified",
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ },
+ {
+ "name": "top_visited_step",
+ "type": "string"
+ }
+ ]
+ }
+ },
+ {
+ "name": "session",
+ "type": {
+ "type": "record",
+ "name": "Session",
+ "fields": [
+ {
+ "name": "entity_name",
+ "type": "string"
+ },
+ {
+ "name": "entity_id",
+ "type": "int"
+ }
+ ]
+ }
+ },
+ {
+ "name": "data",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Data",
+ "fields": [
+ {
+ "name": "bucket",
+ "type": {
+ "type": "map",
+ "values": {
+ "type" : "array",
+ "items" : "string"
+ }
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "delta",
+ "type": [
+ "null",
+ "Data"
+ ]
+ },
+ {
+ "name": "program",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "Program",
+ "fields": [
+ {
+ "type": "string",
+ "name": "id",
+ "doc": "Program id"
+ },
+ {
+ "type": "string",
+ "name": "version",
+ "doc": "Program version"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
+