1 | 'use strict';
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | const EventEmitter = require('events');
|
11 | const amqp = require('amqplib');
|
12 | const log4js = require('log4js');
|
13 | const Promise = require('bluebird');
|
14 | const uuidv4 = require('uuid/v4');
|
15 | const defaults = require('./defaults');
|
16 |
|
17 | class PostmasterGeneral extends EventEmitter {
|
18 | |
19 |
|
20 |
|
21 |
|
22 | constructor(options) {
|
23 | super();
|
24 |
|
25 |
|
26 | this._connection = null;
|
27 | this._connecting = false;
|
28 | this._shuttingDown = false;
|
29 | this._channels = {};
|
30 | this._handlers = {};
|
31 | this._handlerTimingsTimeout = null;
|
32 | this._replyConsumerTag = null;
|
33 | this._replyHandlers = {};
|
34 | this._shouldConsume = false;
|
35 | this._topology = { exchanges: defaults.exchanges, queues: {}, bindings: {} };
|
36 | this._createChannel = null;
|
37 |
|
38 |
|
39 | options = options || {};
|
40 | this._connectRetryDelay = typeof options.connectRetryDelay === 'undefined' ? defaults.connectRetryDelay : options.connectRetryDelay;
|
41 | this._connectRetryLimit = typeof options.connectRetryLimit === 'undefined' ? defaults.connectRetryLimit : options.connectRetryLimit;
|
42 | this._consumerPrefetch = typeof options.consumerPrefetch === 'undefined' ? defaults.consumerPrefetch : options.consumerPrefetch;
|
43 | this._deadLetterExchange = options.deadLetterExchange || defaults.deadLetterExchange;
|
44 | this._defaultExchange = defaults.exchanges.topic;
|
45 | this._handlerTimingResetInterval = options.handlerTimingResetInterval || defaults.handlerTimingResetInterval;
|
46 | this._heartbeat = typeof options.heartbeat === 'undefined' ? defaults.heartbeat : options.heartbeat;
|
47 | this._publishRetryDelay = typeof options.publishRetryDelay === 'undefined' ? defaults.publishRetryDelay : options.publishRetryDelay;
|
48 | this._publishRetryLimit = typeof options.publishRetryLimit === 'undefined' ? defaults.publishRetryLimit : options.publishRetryLimit;
|
49 | this._removeListenerRetryDelay = typeof options.removeListenerRetryDelay === 'undefined' ? defaults.removeListenerRetryDelay : options.removeListenerRetryDelay;
|
50 | this._removeListenerRetryLimit = typeof options.removeListenerRetryLimit === 'undefined' ? defaults.removeListenerRetryLimit : options.removeListenerRetryLimit;
|
51 | this._replyTimeout = this._publishRetryDelay * this._publishRetryLimit * 2;
|
52 | this._queuePrefix = options.queuePrefix || defaults.queuePrefix;
|
53 | this._shutdownTimeout = options.shutdownTimeout || defaults.shutdownTimeout;
|
54 | this._url = options.url || defaults.url;
|
55 |
|
56 |
|
57 | log4js.configure({
|
58 | appenders: { out: { type: 'stdout' } },
|
59 | categories: { default: { appenders: ['out'], level: options.logLevel ? options.logLevel : defaults.logLevel } },
|
60 | pm2: options.pm2,
|
61 | pm2InstanceVar: options.pm2InstanceVar
|
62 | });
|
63 | this._logger = log4js.getLogger('postmaster-general');
|
64 | this._logger.level = options.logLevel ? options.logLevel : defaults.logLevel;
|
65 |
|
66 |
|
67 |
|
68 | const replyQueueName = `postmaster.reply.${this._queuePrefix}.${uuidv4()}`;
|
69 | const replyQueueExpiration = (this._connectRetryDelay * this._connectRetryLimit) + (60000 * this._connectRetryLimit);
|
70 | this._topology.queues = { reply: { name: replyQueueName, options: { noAck: true, expires: replyQueueExpiration } } };
|
71 | }
|
72 |
|
73 | |
74 |
|
75 |
|
76 | get outstandingMessageCount() {
|
77 | const listenerCount = Object.keys(this._handlers).reduce((sum, key) => {
|
78 | return sum + this._handlers[key].outstandingMessages.size;
|
79 | }, 0);
|
80 |
|
81 | return listenerCount + Object.keys(this._replyHandlers).length;
|
82 | }
|
83 |
|
84 | |
85 |
|
86 |
|
87 | get handlerTimings() {
|
88 | const handlerTimings = {};
|
89 | for (const key of Object.keys(this._handlers)) {
|
90 | const handler = this._handlers[key];
|
91 | if (handler.timings) {
|
92 | handlerTimings[key] = handler.timings;
|
93 | }
|
94 | }
|
95 | return handlerTimings;
|
96 | }
|
97 |
|
98 | |
99 |
|
100 |
|
101 |
|
102 | _resolveTopic(pattern) {
|
103 | return pattern.replace(/:/g, '.');
|
104 | }
|
105 |
|
106 | |
107 |
|
108 |
|
109 |
|
110 | async connect() {
|
111 | let connectionAttempts = 0;
|
112 |
|
113 | const attemptConnect = async () => {
|
114 | this._logger.debug('Attempting to connect to RabbitMQ...');
|
115 |
|
116 | connectionAttempts++;
|
117 | this._connecting = true;
|
118 |
|
119 |
|
120 |
|
121 | try {
|
122 | this._logger.debug('Closing any existing connections...');
|
123 | this._replyConsumerTag = null;
|
124 | for (const key of Object.keys(this._handlers)) {
|
125 | delete this._handlers[key].consumerTag;
|
126 | if (this._handlers[key].outstandingMessages) {
|
127 | this._handlers[key].outstandingMessages.clear();
|
128 | }
|
129 | }
|
130 | await this._connection.close();
|
131 | } catch (err) {}
|
132 |
|
133 | const reconnect = async (err) => {
|
134 | try {
|
135 | if (!this._connecting && !this._shuttingDown) {
|
136 | this._logger.warn(`Lost RabbitMQ connection and will try to reconnect! err: ${err.message}`);
|
137 | await attemptConnect();
|
138 | await this._assertTopology();
|
139 | if (this._shouldConsume) {
|
140 | await this.startConsuming();
|
141 | }
|
142 | this._logger.warn('Restored RabbitMQ connection successfully!');
|
143 | }
|
144 | } catch (err) {
|
145 | const errMessage = `Unable to re-establish RabbitMQ connection! err: ${err.message}`;
|
146 | this._logger.error(errMessage);
|
147 | this.emit('error', new Error(errMessage));
|
148 | }
|
149 | };
|
150 |
|
151 | try {
|
152 | this._logger.debug('Acquiring RabbitMQ connection...');
|
153 | this._connection = await amqp.connect(this._url, { heartbeat: this._heartbeat });
|
154 | this._connection.on('error', reconnect.bind(this));
|
155 |
|
156 | this._createChannel = async () => {
|
157 | const channel = await this._connection.createChannel();
|
158 | channel.on('error', reconnect.bind(this));
|
159 | return channel;
|
160 | };
|
161 |
|
162 | this._logger.debug('Initializing channels...');
|
163 | this._channels = await Promise.props({
|
164 | publish: this._createChannel(),
|
165 | replyPublish: this._createChannel(),
|
166 | topology: this._createChannel(),
|
167 | consumers: Promise.reduce(Object.keys(this._topology.queues), async (consumerMap, key) => {
|
168 | const queue = this._topology.queues[key];
|
169 | consumerMap[queue.name] = await this._createChannel();
|
170 | return consumerMap;
|
171 | }, {})
|
172 | });
|
173 |
|
174 | connectionAttempts = 0;
|
175 | this._connecting = false;
|
176 | this._logger.debug('Finished connecting to RabbitMQ!');
|
177 | } catch (err) {
|
178 | if (connectionAttempts < this._connectRetryLimit) {
|
179 | this._logger.warn('Failed to establish RabbitMQ connection! Retrying...');
|
180 | await Promise.delay(this._connectRetryDelay);
|
181 | return attemptConnect();
|
182 | }
|
183 | this._logger.error(`Failed to establish RabbitMQ connection after ${connectionAttempts} attempts! err: ${err.message}`);
|
184 | throw err;
|
185 | }
|
186 | };
|
187 |
|
188 | await attemptConnect();
|
189 | await this._assertTopology();
|
190 | }
|
191 |
|
192 | |
193 |
|
194 |
|
195 | async shutdown() {
|
196 | const shutdownRetryDelay = 1000;
|
197 | const retryLimit = this._shutdownTimeout / shutdownRetryDelay;
|
198 | let retryAttempts = 0;
|
199 |
|
200 | const attempt = async () => {
|
201 | retryAttempts++;
|
202 | this._shuttingDown = true;
|
203 |
|
204 | this._logger.debug('Attempting shutdown...');
|
205 |
|
206 | if ((this._connecting || this.outstandingMessageCount > 0) && retryAttempts < retryLimit) {
|
207 | if (this._connecting) {
|
208 | this._logger.debug('Unable to shutdown while attempting reconnect! Will retry...');
|
209 | } else {
|
210 | this._logger.debug(`Unable to shutdown while processing ${this.outstandingMessageCount} messages! Will retry...`);
|
211 | }
|
212 | await Promise.delay(shutdownRetryDelay);
|
213 | return attempt();
|
214 | }
|
215 |
|
216 | try {
|
217 | await this.stopConsuming();
|
218 | } catch (err) {}
|
219 |
|
220 | try {
|
221 | await this._connection.close();
|
222 | } catch (err) {}
|
223 |
|
224 | if (this._handlerTimingsTimeout) {
|
225 | clearTimeout(this._handlerTimingsTimeout);
|
226 | }
|
227 |
|
228 | this._logger.debug('Shutdown completed successfully!');
|
229 | };
|
230 |
|
231 | try {
|
232 | await this.stopConsuming(true);
|
233 | } catch (err) {}
|
234 |
|
235 | return attempt();
|
236 | }
|
237 |
|
238 | |
239 |
|
240 |
|
241 |
|
242 |
|
243 |
|
244 |
|
245 | async assertExchange(name, type, options) {
|
246 | this._logger.debug(`Asserting exchange name: ${name} type: ${type} options: ${JSON.stringify(options)}`);
|
247 | options = options || {};
|
248 | await this._channels.topology.assertExchange(name, type, options);
|
249 | this._topology.exchanges[name] = { name, type, options };
|
250 | }
|
251 |
|
252 | |
253 |
|
254 |
|
255 |
|
256 |
|
257 |
|
258 | async assertQueue(name, options) {
|
259 | this._logger.debug(`Asserting queue name: ${name} options: ${JSON.stringify(options)}`);
|
260 | options = options || {};
|
261 | await this._channels.topology.assertQueue(name, options);
|
262 | this._topology.queues[name] = { name, options };
|
263 | }
|
264 |
|
265 | |
266 |
|
267 |
|
268 |
|
269 |
|
270 |
|
271 |
|
272 |
|
273 | async assertBinding(queue, exchange, topic, options) {
|
274 | this._logger.debug(`Asserting binding queue: ${queue} exchange: ${exchange} topic: ${topic} options: ${JSON.stringify(options)}`);
|
275 | options = options || {};
|
276 | await this._channels.topology.bindQueue(queue, exchange, topic, options);
|
277 | this._topology.bindings[`${queue}_${exchange}`] = { queue, exchange, topic, options };
|
278 | }
|
279 |
|
280 | |
281 |
|
282 |
|
283 |
|
284 | async _assertTopology() {
|
285 | const topologyPromises = [];
|
286 |
|
287 | this._logger.debug('Asserting pre-defined topology...');
|
288 |
|
289 |
|
290 | for (const key of Object.keys(this._topology.exchanges)) {
|
291 | const exchange = this._topology.exchanges[key];
|
292 | this._logger.debug(`Asserting exchange name: ${exchange.name} type: ${exchange.type} options: ${JSON.stringify(exchange.options)}...`);
|
293 | topologyPromises.push(this._channels.topology.assertExchange(exchange.name, exchange.type, exchange.options));
|
294 | }
|
295 |
|
296 |
|
297 | for (const key of Object.keys(this._topology.queues)) {
|
298 | const queue = this._topology.queues[key];
|
299 | this._logger.debug(`Asserting queue name: ${queue.name} options: ${JSON.stringify(queue.options)}...`);
|
300 | topologyPromises.push(this._channels.topology.assertQueue(queue.name, queue.options));
|
301 | }
|
302 |
|
303 |
|
304 | await Promise.all(topologyPromises);
|
305 |
|
306 |
|
307 | await Promise.map(Object.keys(this._topology.bindings), (key) => {
|
308 | const binding = this._topology.bindings[key];
|
309 | this._logger.debug(`Asserting binding queue: ${binding.queue} exchange: ${binding.exchange} topic: ${binding.topic} options: ${JSON.stringify(binding.options)}...`);
|
310 | return this._channels.topology.bindQueue(binding.queue, binding.exchange, binding.topic, binding.options);
|
311 | });
|
312 |
|
313 | this._logger.debug('Finished asserting defined topology!');
|
314 | }
|
315 |
|
316 | |
317 |
|
318 |
|
319 |
|
320 | async startConsuming() {
|
321 | this._shouldConsume = true;
|
322 |
|
323 | this._logger.debug('Starting up consumers...');
|
324 |
|
325 | this._resetHandlerTimings();
|
326 |
|
327 |
|
328 | if (this._topology.queues.reply) {
|
329 | const replyQueue = this._topology.queues.reply;
|
330 | this._replyConsumerTag = await this._channels.consumers[replyQueue.name].consume(replyQueue.name, this._handleReply.bind(this), replyQueue.options);
|
331 | this._replyConsumerTag = this._replyConsumerTag.consumerTag;
|
332 | this._logger.debug(`Starting consuming from reply queue: ${replyQueue.name}...`);
|
333 | }
|
334 |
|
335 | await Promise.map(Object.keys(this._topology.bindings), async (key) => {
|
336 | const binding = this._topology.bindings[key];
|
337 | const consumerTag = await this._channels.consumers[binding.queue].consume(binding.queue, this._handlers[binding.topic].callback.bind(this), binding.options);
|
338 | this._handlers[binding.topic].consumerTag = consumerTag.consumerTag;
|
339 | this._logger.debug(`Starting consuming from queue: ${binding.queue}...`);
|
340 | });
|
341 |
|
342 | this._logger.debug('Finished starting all consumers!');
|
343 | }
|
344 |
|
345 | |
346 |
|
347 |
|
348 |
|
349 |
|
350 | async stopConsuming(awaitReplies) {
|
351 | this._shouldConsume = false;
|
352 |
|
353 | this._logger.debug('Starting to cancel all consumers...');
|
354 |
|
355 | this._resetHandlerTimings();
|
356 |
|
357 | if (this._replyConsumerTag && !awaitReplies) {
|
358 | await this._channels.consumers[this._topology.queues.reply.name].cancel(this._replyConsumerTag);
|
359 | this._replyConsumerTag = null;
|
360 | this._logger.debug(`Stopped consuming from queue ${this._topology.queues.reply.name}...`);
|
361 | }
|
362 |
|
363 | await Promise.map(Object.keys(this._topology.bindings), async (key) => {
|
364 | const binding = this._topology.bindings[key];
|
365 | if (this._handlers[binding.topic] && this._handlers[binding.topic].consumerTag) {
|
366 | const consumerTag = JSON.parse(JSON.stringify(this._handlers[binding.topic].consumerTag));
|
367 | delete this._handlers[binding.topic].consumerTag;
|
368 | await this._channels.consumers[binding.queue].cancel(consumerTag);
|
369 | this._logger.debug(`Stopped consuming from queue ${binding.queue}...`);
|
370 | }
|
371 | });
|
372 |
|
373 | this._logger.debug('Finished consumer shutdown!');
|
374 | }
|
375 |
|
376 | |
377 |
|
378 |
|
379 |
|
380 |
|
381 |
|
382 |
|
383 |
|
384 | async _ackMessageAndReply(queueName, msg, pattern, reply) {
|
385 | try {
|
386 | this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
|
387 |
|
388 | const topic = this._resolveTopic(pattern);
|
389 | if (this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
|
390 | this._channels.consumers[queueName].ack(msg);
|
391 | if (msg.properties.replyTo && msg.properties.correlationId) {
|
392 | reply = reply || {};
|
393 | const options = {
|
394 | contentType: 'application/json',
|
395 | contentEncoding: 'utf8',
|
396 | messageId: uuidv4(),
|
397 | correlationId: msg.properties.correlationId,
|
398 | timestamp: new Date().getTime(),
|
399 | replyTo: msg.properties.replyTo
|
400 | };
|
401 | this._channels.replyPublish.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(reply)), options);
|
402 | }
|
403 | this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
|
404 | } else {
|
405 | this._logger.warn(`Skipping message ack due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
|
406 | }
|
407 | } catch (err) {
|
408 | this._logger.warn(`Failed to ack a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
|
409 | }
|
410 | }
|
411 |
|
412 | |
413 |
|
414 |
|
415 |
|
416 |
|
417 |
|
418 |
|
419 |
|
420 |
|
421 | async _nackMessageAndReply(queueName, msg, pattern, reply) {
|
422 | try {
|
423 | this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
|
424 |
|
425 | const topic = this._resolveTopic(pattern);
|
426 | if (this._channels.consumers[queueName] && this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
|
427 | this._channels.consumers[queueName].nack(msg, false, false);
|
428 | if (msg.properties.replyTo && msg.properties.correlationId) {
|
429 | reply = reply || 'An unknown error occurred during processing!';
|
430 | const options = {
|
431 | contentType: 'application/json',
|
432 | contentEncoding: 'utf8',
|
433 | messageId: uuidv4(),
|
434 | correlationId: msg.properties.correlationId,
|
435 | timestamp: new Date().getTime(),
|
436 | replyTo: msg.properties.replyTo
|
437 | };
|
438 | this._channels.replyPublish.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify({ err: reply })), options);
|
439 | }
|
440 | this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
|
441 | } else {
|
442 | this._logger.warn(`Skipping message nack due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
|
443 | }
|
444 | } catch (err) {
|
445 | this._logger.warn(`Failed to nack a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
|
446 | }
|
447 | }
|
448 |
|
449 | |
450 |
|
451 |
|
452 |
|
453 |
|
454 |
|
455 |
|
456 |
|
457 | async _rejectMessage(queueName, msg, pattern) {
|
458 | try {
|
459 | this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
|
460 |
|
461 | const topic = this._resolveTopic(pattern);
|
462 | if (this._channels.consumers[queueName] && this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
|
463 | this._channels.consumers[queueName].reject(msg);
|
464 | this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
|
465 | } else {
|
466 | this._logger.warn(`Skipping message rejection due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
|
467 | }
|
468 | } catch (err) {
|
469 | this._logger.warn(`Failed to reject a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
|
470 | }
|
471 | }
|
472 |
|
473 | |
474 |
|
475 |
|
476 |
|
477 | _handleReply(msg) {
|
478 | msg = msg || {};
|
479 |
|
480 | let body;
|
481 | try {
|
482 | body = (msg.content || '{}').toString();
|
483 | body = JSON.parse(body);
|
484 | } catch (err) {
|
485 | body.err = 'Failed to parse message body due to invalid JSON!';
|
486 | }
|
487 |
|
488 | msg.properties = msg.properties || {};
|
489 | msg.properties.headers = msg.properties.headers || {};
|
490 |
|
491 | this._logger.debug(`Attempting to handle incoming reply. correlationId: ${msg.properties.correlationId || 'undefined'}`);
|
492 |
|
493 | if (!msg.properties.correlationId || !this._replyHandlers[msg.properties.correlationId] || msg.properties.replyTo !== this._topology.queues.reply.name) {
|
494 | this._logger.warn(`Reply handler received an invalid reply! correlationId: ${msg.properties.correlationId}`);
|
495 | } else {
|
496 | if (body.err) {
|
497 | this._replyHandlers[msg.properties.correlationId](new Error(body.err));
|
498 | } else {
|
499 | this._replyHandlers[msg.properties.correlationId](null, body);
|
500 | }
|
501 | delete this._replyHandlers[msg.properties.correlationId];
|
502 | this._logger.debug(`Finished processing reply. correlationId: ${msg.properties.correlationId || 'undefined'}`);
|
503 | }
|
504 | }
|
505 |
|
506 | |
507 |
|
508 |
|
509 |
|
510 |
|
511 | _setHandlerTiming(pattern, start) {
|
512 | this._logger.debug(`Setting handler timings for message: ${pattern}`);
|
513 | const elapsed = new Date().getTime() - start;
|
514 | this._handlers[pattern].timings = this._handlers[pattern].timings || {
|
515 | messageCount: 0,
|
516 | elapsedTime: 0,
|
517 | minElapsedTime: 0,
|
518 | maxElapsedTime: 0
|
519 | };
|
520 | this._handlers[pattern].timings.messageCount++;
|
521 | this._handlers[pattern].timings.elapsedTime += elapsed;
|
522 |
|
523 | if (this._handlers[pattern].timings.minElapsedTime > elapsed ||
|
524 | this._handlers[pattern].timings.minElapsedTime === 0) {
|
525 | this._handlers[pattern].timings.minElapsedTime = elapsed;
|
526 | }
|
527 | if (this._handlers[pattern].timings.maxElapsedTime < elapsed) {
|
528 | this._handlers[pattern].timings.maxElapsedTime = elapsed;
|
529 | }
|
530 | }
|
531 |
|
532 | |
533 |
|
534 |
|
535 | _resetHandlerTimings() {
|
536 | this._logger.debug('Resetting handler timings.');
|
537 | for (const key of Object.keys(this._handlers)) {
|
538 | delete this._handlers[key].timings;
|
539 | }
|
540 |
|
541 |
|
542 | if (this._handlerTimingResetInterval) {
|
543 | this._logger.debug(`Setting next timing refresh in ${this._handlerTimingResetInterval} ms.`);
|
544 | this._handlerTimingsTimeout = setTimeout(() => {
|
545 | this._resetHandlerTimings();
|
546 | }, this._handlerTimingResetInterval);
|
547 | }
|
548 | }
|
549 |
|
550 | |
551 |
|
552 |
|
553 |
|
554 |
|
555 |
|
556 |
|
557 | async addListener(pattern, callback, options) {
|
558 | this._logger.debug(`Starting to add a listener for message: ${pattern}...`);
|
559 | options = options || {};
|
560 |
|
561 | const topic = this._resolveTopic(pattern);
|
562 |
|
563 |
|
564 | options.queue = options.queue || {};
|
565 | options.queue.deadLetterExchange = options.deadLetterExchange || this._deadLetterExchange;
|
566 | options.queue.deadLetterRoutingKey = topic;
|
567 |
|
568 | const queueName = (options.queue.prefix || this._queuePrefix) + '.' + topic;
|
569 |
|
570 |
|
571 | options.exchange = options.exchange || this._defaultExchange;
|
572 |
|
573 |
|
574 | this._logger.debug(`Asserting listener topology for message: ${pattern}...`);
|
575 | if (!this._channels.consumers[queueName]) {
|
576 | this._channels.consumers[queueName] = await this._createChannel();
|
577 | }
|
578 | await this.assertExchange(options.exchange.name, options.exchange.type, options.exchange);
|
579 | await this.assertQueue(queueName, options.queue);
|
580 | await this.assertBinding(queueName, options.exchange.name, topic, options.binding);
|
581 |
|
582 |
|
583 | this._handlers[topic] = { outstandingMessages: new Set() };
|
584 | this._handlers[topic].callback = async (msg) => {
|
585 | const start = new Date().getTime();
|
586 |
|
587 | try {
|
588 | msg.properties = msg.properties || {};
|
589 | msg.properties.headers = msg.properties.headers || {};
|
590 | msg.properties.messageId = msg.properties.messageId || msg.properties.correlationId;
|
591 |
|
592 | this._logger.debug(`Handling incoming message: ${pattern} messageId: ${msg.properties.messageId}...`);
|
593 |
|
594 | this._handlers[topic].outstandingMessages.add(`${pattern}_${msg.properties.messageId}`);
|
595 |
|
596 | let body = (msg.content || '{}').toString();
|
597 | body = JSON.parse(body);
|
598 |
|
599 | const reply = await callback(body, msg.properties.headers);
|
600 | await this._ackMessageAndReply(queueName, msg, pattern, reply);
|
601 | this._setHandlerTiming(topic, start);
|
602 | this._logger.debug(`Finished handling incoming message: ${pattern} messageId: ${msg.properties.messageId}.`);
|
603 | } catch (err) {
|
604 | this._logger.error(`Message handler failed and cannot retry! message: ${pattern} err: ${err.message}`);
|
605 | await this._nackMessageAndReply(queueName, msg, pattern, err.message);
|
606 | this._setHandlerTiming(topic, start);
|
607 | }
|
608 | };
|
609 |
|
610 | this._logger.debug(`Finished adding a listener for message: ${pattern}.`);
|
611 | }
|
612 |
|
613 | |
614 |
|
615 |
|
616 |
|
617 |
|
618 |
|
619 |
|
620 |
|
621 |
|
622 | async removeListener(pattern, exchange, prefix) {
|
623 | let attempts = 0;
|
624 |
|
625 | const topic = this._resolveTopic(pattern);
|
626 | exchange = exchange || this._defaultExchange.name;
|
627 | const queueName = (prefix || this._queuePrefix) + '.' + topic;
|
628 |
|
629 | const attempt = async (skipIncrement) => {
|
630 | if (!skipIncrement) {
|
631 | attempts++;
|
632 | }
|
633 |
|
634 | this._logger.debug(`Attempting to remove a listener for message: ${pattern}...`);
|
635 |
|
636 | if (this._connecting) {
|
637 | this._logger.debug(`Cannot remove a listener for message: ${pattern} while reconnecting! Will retry in ${this._removeListenerRetryDelay} ms...`);
|
638 | await Promise.delay(this._removeListenerRetryDelay);
|
639 | return attempt(true);
|
640 | }
|
641 |
|
642 | try {
|
643 | if (this._channels.consumers[queueName]) {
|
644 | this._logger.debug(`Cancelling consumer for message: ${pattern}...`);
|
645 | if (this._handlers[topic].consumerTag) {
|
646 | await this._channels.consumers[queueName].cancel(this._handlers[topic].consumerTag);
|
647 | }
|
648 | this._handlers[topic].outstandingMessages.clear();
|
649 | await this._channels.consumers[queueName].close();
|
650 | delete this._channels.consumers[queueName];
|
651 | }
|
652 | delete this._handlers[topic];
|
653 | delete this._topology.bindings[`${queueName}_${exchange}`];
|
654 | this._logger.debug(`Finished removing listener for message: ${pattern}.`);
|
655 | } catch (err) {
|
656 | if (attempts < this._removeListenerRetryLimit) {
|
657 | await Promise.delay(this._removeListenerRetryDelay);
|
658 | return attempt();
|
659 | }
|
660 | throw err;
|
661 | }
|
662 | };
|
663 |
|
664 | return attempt();
|
665 | }
|
666 |
|
667 | |
668 |
|
669 |
|
670 |
|
671 |
|
672 |
|
673 |
|
674 | async publish(routingKey, message, options) {
|
675 | try {
|
676 | let publishAttempts = 0;
|
677 |
|
678 |
|
679 | options = options || {};
|
680 | options.contentType = 'application/json';
|
681 | options.contentEncoding = 'utf8';
|
682 | options.messageId = options.messageId || uuidv4();
|
683 | options.timestamp = new Date().getTime();
|
684 |
|
685 | const exchange = options.exchange || this._defaultExchange.name;
|
686 | const msgBody = JSON.stringify(message || '{}');
|
687 | const msgData = Buffer.from(msgBody);
|
688 |
|
689 | const attempt = async (skipIncrement) => {
|
690 | this._logger.debug(`Attempting to publish fire-and-forget message: ${routingKey}...`);
|
691 | if (!skipIncrement) {
|
692 | publishAttempts++;
|
693 | }
|
694 |
|
695 | if (this._connecting) {
|
696 | this._logger.debug(`Unable to publish fire-and-forget message: ${routingKey} while reconnecting. Will retry...`);
|
697 | await Promise.delay(this._publishRetryDelay);
|
698 | return attempt(true);
|
699 | }
|
700 |
|
701 | try {
|
702 | const published = this._channels.publish.publish(exchange, this._resolveTopic(routingKey), msgData, options);
|
703 | if (published) {
|
704 | publishAttempts = 0;
|
705 | } else {
|
706 | throw new Error(`Publish buffer full!`);
|
707 | }
|
708 | } catch (err) {
|
709 | if (publishAttempts < this._publishRetryLimit) {
|
710 | this._logger.debug(`Failed to publish fire-and-forget message and will retry! message: ${routingKey} err: ${err.message}`);
|
711 | await Promise.delay(this._publishRetryDelay);
|
712 | return attempt();
|
713 | }
|
714 | throw err;
|
715 | }
|
716 | };
|
717 |
|
718 | await attempt();
|
719 | } catch (err) {
|
720 | this._logger.error(`Failed to publish a fire-and-forget message! message: ${routingKey} err: ${err.message}`);
|
721 | }
|
722 | }
|
723 |
|
724 | |
725 |
|
726 |
|
727 |
|
728 |
|
729 |
|
730 |
|
731 | async request(routingKey, message, options) {
|
732 | let publishAttempts = 0;
|
733 |
|
734 |
|
735 | options = options || {};
|
736 | options.contentType = 'application/json';
|
737 | options.contentEncoding = 'utf8';
|
738 | options.messageId = options.messageId || uuidv4();
|
739 | options.correlationId = options.correlationId || options.messageId;
|
740 | options.replyTo = this._topology.queues.reply.name;
|
741 | options.timestamp = new Date().getTime();
|
742 |
|
743 | const exchange = options.exchange || this._defaultExchange.name;
|
744 | const msgData = Buffer.from(JSON.stringify(message || '{}'));
|
745 |
|
746 | const attempt = async (skipIncrement) => {
|
747 | this._logger.debug(`Attempting to publish RPC message: ${routingKey}...`);
|
748 | if (!skipIncrement) {
|
749 | publishAttempts++;
|
750 | }
|
751 |
|
752 | if (this._connecting) {
|
753 | this._logger.debug(`Unable to publish RPC message: ${routingKey} while reconnecting. Will retry...`);
|
754 | await Promise.delay(this._publishRetryDelay);
|
755 | return attempt(true);
|
756 | }
|
757 |
|
758 | try {
|
759 | const published = this._channels.publish.publish(exchange, this._resolveTopic(routingKey), msgData, options);
|
760 | if (published) {
|
761 | publishAttempts = 0;
|
762 | } else {
|
763 | throw new Error(`Publish buffer full!`);
|
764 | }
|
765 |
|
766 | return new Promise((resolve, reject) => {
|
767 | this._replyHandlers[options.correlationId] = (err, data) => {
|
768 | if (err) {
|
769 | reject(err);
|
770 | } else {
|
771 | resolve(data);
|
772 | }
|
773 | };
|
774 | }).timeout(this._replyTimeout);
|
775 | } catch (err) {
|
776 | if (publishAttempts < this._publishRetryLimit) {
|
777 | this._logger.debug(`Failed to publish RPC message and will retry! message: ${routingKey} err: ${err.message}`);
|
778 | await Promise.delay(this._publishRetryDelay);
|
779 | return attempt();
|
780 | }
|
781 | this._logger.debug(`Failed to publish RPC message: ${routingKey} err: ${err.message}`);
|
782 | throw err;
|
783 | }
|
784 | };
|
785 |
|
786 | return attempt();
|
787 | }
|
788 | }
|
789 |
|
790 | module.exports = PostmasterGeneral;
|