Mike Gerwitz

Activist for User Freedom

aboutsummaryrefslogtreecommitdiffstats
blob: 57a74b6909b2df817aa5c3e6bcfaa0eecdacfa1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
 * Delta Publisher
 *
 *  Copyright (C) 2010-2019 R-T Specialty, LLC.
 *
 *  This file is part of liza.
 *
 *  liza is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Publish delta message to a queue
 */

import { AmqpPublisher } from "./AmqpPublisher";
import { DeltaResult } from "../bucket/delta";
import {
    connect as amqpConnect,
    Options,
    Channel
} from 'amqplib';

const avro = require( 'avro-js' );


export interface AmqpConfig extends Options.Connect {
    /** The name of a queue or exchange to publish to */
    exchange: string;
}


export class DeltaPublisher implements AmqpPublisher
{
    /** The path to the avro schema */
    readonly SCHEMA_PATH = './avro/schema.avsc';

    /** A mapping of which delta type translated to which avro event */
    readonly DELTA_MAP: Record<string, string> = {
        data:     'rate',
        ratedata: 'update',
    };


    /**
     * Initialize trait
     *
     * @param _conf   - amqp configuration
     * @param _logger - logger instance
     */
    constructor(
        private readonly _conf:   AmqpConfig,
        private readonly _logger: any
    ) {}


    /**
     * Publish quote message to exchange post-rating
     *
     * @param delta - The delta to publish
    */
    publish( delta: DeltaResult<any> ): void
    {
        // check both as we transition from one to the other
        const exchange = this._conf.exchange;

        amqpConnect( this._conf )
            .then( conn =>
            {
                setTimeout( () => conn.close(), 10000 );
                return conn.createChannel();
            } )
            .then( ch => {
                ch.assertExchange( exchange, 'fanout', { durable: true } );

                return this._sendMessage( ch, exchange, delta );
            } )
            .then( () => this._logger.log(
                this._logger.PRIORITY_INFO,
                "Published " + delta.type + " delta with timestamp '" +
                    delta.timestamp + "' to quote-update exchange '"+
                    exchange + "'"
            ) )
            .catch( e => this._logger.log(
                this._logger.PRIORITY_ERROR,
                "Error publishing " + delta.type + " delta with timestamp '" +
                    delta.timestamp + "' to quote-update exchange '"+
                    exchange + "'" + ": " + e
            ) );
    }


    /**
     * Send message to exchange
     *
     * @param channel  - AMQP channel
     * @param exchange - exchange name
     * @param delta    - The delta to publish
     *
     * @return whether publish was successful
     */
    _sendMessage(
        channel:  Channel,
        exchange: string,
        delta:    DeltaResult<any>,
    ): boolean
    {
        const headers = {
            version: 1,
            created: Date.now(),
        };

        const event_id = this.DELTA_MAP[ delta.type ];

        const data = {
            delta: delta,
            event: event_id,
        };

        const avro_buffer = this._avroEncode( data );

        // we don't use a routing key; fanout exchange
        const routing_key = '';

        return channel.publish(
            exchange,
            routing_key,
            avro_buffer,
            { headers: headers },
        );
    }


    /**
     * Encode the data in an avro buffer
     *
     * @param data - the data to encode
     *
     * @return the avro buffer
     */
    _avroEncode( data: Record<string, any> ): Buffer
    {
        const type = avro.parse( this.SCHEMA_PATH );

        const buffer = type.toBuffer( data );

        return buffer;
    }
}