diff options
Diffstat (limited to 'src/system/avro/V1MessageWriter.ts')
-rw-r--r-- | src/system/avro/V1MessageWriter.ts | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/src/system/avro/V1MessageWriter.ts b/src/system/avro/V1MessageWriter.ts new file mode 100644 index 0000000..09ee64a --- /dev/null +++ b/src/system/avro/V1MessageWriter.ts @@ -0,0 +1,259 @@ +/** + * Message Writer + * + * Copyright (C) 2010-2019 R-T Specialty, LLC. + * + * This file is part of liza. + * + * liza is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Write a message to be published to a queue + */ +import { DocumentMeta } from '../../document/Document'; +import { Delta } from '../../bucket/delta'; +import { AvroEncoderCtr } from '../avro/AvroFactory'; +import { AvroSchema } from 'avro-js'; +import { MessageWriter } from '../MessageWriter'; +import { context } from '../../error/ContextError'; + + +export class V1MessageWriter implements MessageWriter +{ + /** A mapping of which delta type translated to which avro event */ + readonly DELTA_MAP: Record<string, string> = { + data: 'STEP_SAVE', + ratedata: 'RATE', + }; + + + /** + * Delta publisher + * + * @param _encoder_ctr - a factory function to create an avro encoder + * @param _conn - the amqp connection + */ + constructor( + private readonly _encoder_ctor: AvroEncoderCtr, + private readonly _schema: AvroSchema, + ) {} + + + /** + * Write the data to a message + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + */ + write( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): Promise<Buffer> + { + const avro_object = this._avroFormat( + ts, + meta, + delta, + bucket, + ratedata, + ); + + return this.avroEncode( avro_object ); + } + + + /** + * Format the avro data with data type labels + * + * @param ts - timestamp + * @param meta - document meta data + * @param delta - current delta + * @param bucket - data bucket + * @param ratedata - ratedata bucket + * + * @return the formatted data + */ + private _avroFormat( + ts: UnixTimestamp, + meta: DocumentMeta, + delta: Delta<any>, + bucket: Record<string, any>, + ratedata: Record<string, any>, + ): any + { + const delta_formatted = this.setDataTypes( delta.data ); + const bucket_formatted = this.setDataTypes( bucket ); + const ratedata_formatted = this.setDataTypes( ratedata ); + const event_id = this.DELTA_MAP[ delta.type ]; + + return { + event: { + id: event_id, + ts: ts, + actor: 'SERVER', + step: null, + }, + document: { + id: meta.id, + created: meta.startDate, + modified: meta.lastUpdate, + }, + session: { + Session: { + entity_id: meta.entity_id, + entity_name: meta.entity_name, + }, + }, + data: { + Data: { + bucket: bucket_formatted, + }, + }, + ratedata: { + Data: { + bucket: ratedata_formatted, + }, + }, + delta: { + Data: { + bucket: delta_formatted, + }, + }, + program: { + Program: { + id: 'quote_server', + version: '', + }, + }, + } + } + + + /** + * Encode the data in an avro buffer + * + * @param data - the data to encode + * + * @return the avro buffer or null if there is an error + */ + avroEncode( data: Record<string, any> ): Promise<Buffer> + { + return new Promise<Buffer>( ( resolve, reject ) => + { + const bufs: Buffer[] = []; + + try + { + this._schema.isValid( + data, + { + errorHook: ( keys: any, vals: any) => + { + throw context( + new Error( 'Invalid Avro Schema' ), + { + invalid_paths: keys, + invalid_data: vals, + } + ); + } + } + ); + + const encoder = this._encoder_ctor( this._schema ) + + encoder.on('data', ( buf: Buffer ) => { bufs.push( buf ) } ) + encoder.on('error', ( err: Error ) => { reject( err ); } ) + encoder.on('end', () => { resolve( Buffer.concat( bufs ) ) } ) + encoder.end( data ); + } + catch ( e ) + { + reject( e ); + } + } ); + } + + + /** + * Format the data for avro by add type specifications to the data + * + * @param data - the data to format + * @param top_level - whether we are at the top level of the recursion + * + * @return the formatted data + */ + setDataTypes( data: any, top_level: boolean = true ): any + { + let data_formatted: any = {}; + + switch( typeof( data ) ) + { + case 'object': + if ( data == null ) + { + return null; + } + else if ( Array.isArray( data ) ) + { + let arr: any[] = []; + + data.forEach( ( datum ) => + { + arr.push( this.setDataTypes( datum, false ) ); + } ); + + data_formatted = ( top_level ) + ? arr + : { 'array': arr }; + } + else + { + let datum_formatted: any = {}; + + Object.keys( data).forEach( ( key: string ) => + { + const datum = this.setDataTypes( data[ key ], false ); + + datum_formatted[ key ] = datum; + + } ); + + data_formatted = ( top_level ) + ? datum_formatted + : { 'map': datum_formatted }; + } + break; + + case 'boolean': + return { 'boolean': data }; + + case 'number': + return { 'double': data }; + + case 'string': + return { 'string': data }; + + case 'undefined': + return null; + } + + return data_formatted; + } +}
\ No newline at end of file |