1 | import { channelManager } from './ChannelManager'
|
2 | import { Channel } from "amqplib/callback_api"
|
3 | import {Options} from "amqplib/properties";
|
4 | import { AMQPEventEmitter } from './EventEmitter'
|
5 | const EXCHANGE_PREFIX = "nimbus:event:";
|
6 | const EXCHANGE_ALL_EVENTS = "nimbus:events";
|
7 | const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
|
8 | const QUEUE_PREFIX = "nimbus:listener:";
|
9 | const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true};
|
10 | const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false};
|
11 | const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true};
|
12 | const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
|
13 |
|
14 | import util = require("util");
|
15 |
|
16 | var debug = util.debuglog("amqptools");
|
17 |
|
18 | export interface EventListenerConstructorOptions {
|
19 | exchange?: string;
|
20 | runtime?: string;
|
21 | topic?: string;
|
22 | userId?: string;
|
23 | persistent?: boolean;
|
24 | autoAck?: boolean;
|
25 | prefetchCount?: number;
|
26 | }
|
27 |
|
28 | export interface MessageExtra {
|
29 | ack?: () => void
|
30 | }
|
31 |
|
32 | export interface ListenerFunc {
|
33 | (message: any, extra: MessageExtra): void | Promise
|
34 | }
|
35 |
|
36 | export class EventListener {
|
37 | exchange: string;
|
38 | topic: string;
|
39 | queue: string;
|
40 | userId: string;
|
41 |
|
42 | persistent: boolean = false;
|
43 |
|
44 | autoAck: boolean = true;
|
45 | prefetchCount: number;
|
46 | private listener: ListenerFunc;
|
47 | private queueOptions: Options.AssertQueue;
|
48 | private consumerTag: string;
|
49 | private eventEmitter: AMQPEventEmitter;
|
50 |
|
51 | constructor(options: EventListenerConstructorOptions, eventEmitter: AMQPEventEmitter) {
|
52 | this.exchange = options.exchange;
|
53 | this.topic = options.topic;
|
54 | this.userId = options.userId;
|
55 | this.queueOptions = QUEUE_OPTIONS;
|
56 | this.prefetchCount = options.prefetchCount || 1
|
57 | if (!eventEmitter) {
|
58 | throw new Error('eventEmitter is required for EventListener')
|
59 | }
|
60 | this.eventEmitter = eventEmitter
|
61 | if(options.hasOwnProperty("persistent")) {
|
62 | this.persistent = options.persistent;
|
63 | }
|
64 | if(options.hasOwnProperty("autoAck")) {
|
65 | this.autoAck = options.autoAck;
|
66 | }
|
67 | if (options.runtime) {
|
68 | this.queue = QUEUE_PREFIX + options.runtime +
|
69 | (this.exchange ? ':' + this.exchange : '') +
|
70 | (this.topic ? ':' + this.topic : '');
|
71 | this.queueOptions = QUEUE_RUNTIME_OPTIONS;
|
72 | if(this.persistent) {
|
73 | this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
|
74 | }
|
75 | }
|
76 |
|
77 | channelManager.on("reconnect", this.onReconnect);
|
78 | channelManager.on("finalize", this.onFinalize);
|
79 | }
|
80 |
|
81 | onReconnect = () => {
|
82 | debug("Trying to re establish consuming on event queue %s", this.queueName);
|
83 | this.consume();
|
84 | }
|
85 |
|
86 | onFinalize = () => {
|
87 | debug("Cancel consuming queue %s because of the finalization process", this.queueName);
|
88 | this.cancel();
|
89 | }
|
90 |
|
91 | get fullExchangeName(): string {
|
92 | if (this.userId) {
|
93 | return EXCHANGE_EVENTS_BY_USER;
|
94 | }
|
95 | return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
|
96 | }
|
97 |
|
98 | get queueName(): string {
|
99 | return this.queue;
|
100 | }
|
101 |
|
102 | get routeKey(): string {
|
103 | if (!this.topic && !this.exchange && !this.userId) return '#';
|
104 | return [this.exchange, this.topic]
|
105 | .map(str => (str ? str : '*'))
|
106 | .join('.')
|
107 | .concat(this.userId ? '.' + this.userId : '');
|
108 | }
|
109 |
|
110 | set queueName(val: string) {
|
111 | this.queue = val;
|
112 | }
|
113 |
|
114 | private assertExchange() {
|
115 | return channelManager.getChannel().then((channel) => {
|
116 | return new Promise((resolve, reject) => {
|
117 | channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
|
118 | (err) => err ? reject(err) : resolve(channel));
|
119 | })
|
120 | })
|
121 | }
|
122 |
|
123 | private assertQueue() {
|
124 | return channelManager.getChannel().then((channel) => {
|
125 | return new Promise((resolve, reject) => {
|
126 | channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
|
127 | if (err) return reject(err);
|
128 | this.queueName = ok.queue;
|
129 | resolve(channel);
|
130 | });
|
131 | })
|
132 | })
|
133 | }
|
134 |
|
135 | private bindQueue() {
|
136 | return channelManager.getChannel().then((channel) => {
|
137 | return new Promise<Channel>((resolve, reject) => {
|
138 | channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {},
|
139 | (err) => err ? reject(err) : resolve(channel));
|
140 | })
|
141 | })
|
142 | }
|
143 |
|
144 | private ack(msg) {
|
145 | return channelManager.getChannel().then((channel) => {
|
146 | channel.ack(msg);
|
147 | });
|
148 | }
|
149 |
|
150 | private onMessageReceived = (msg) => {
|
151 | var message = JSON.parse(msg.content.toString());
|
152 | var extra: MessageExtra = {};
|
153 | if(this.autoAck) {
|
154 | this.ack(msg);
|
155 | }
|
156 | else {
|
157 | extra.ack = () => {
|
158 | this.ack(msg);
|
159 | };
|
160 | }
|
161 | this.listener(message, extra);
|
162 | }
|
163 |
|
164 | private consume() {
|
165 | let _this = this;
|
166 | return this.assertExchange()
|
167 | .then(() => this.assertQueue())
|
168 | .then(() => this.bindQueue())
|
169 | .then((channel) => {
|
170 | channel.prefetch(this.prefetchCount)
|
171 | channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
|
172 | if(err) {
|
173 | console.error("Fail to consume on queue " + _this.queueName, err)
|
174 | throw err
|
175 | }
|
176 | _this.consumerTag = ok.consumerTag;
|
177 | });
|
178 | });
|
179 | }
|
180 |
|
181 | listen(listener: (message, extra?) => void) {
|
182 | if(this.listener) {
|
183 | throw new Error("Listener is already set");
|
184 | }
|
185 | this.listener = listener;
|
186 | return this.consume();
|
187 | }
|
188 |
|
189 | cancel() {
|
190 | channelManager.getChannel().then((channel) => {
|
191 | channel.cancel(this.consumerTag);
|
192 | channelManager.removeListener("reconnect", this.onReconnect);
|
193 | channelManager.removeListener("finalize", this.onFinalize);
|
194 | })
|
195 | }
|
196 | } |
\ | No newline at end of file |