UNPKG

32.2 kBJavaScriptView Raw
1'use strict';
2
3/**
4 * A simple library for making microservice message bus
5 * calls using RabbitMQ via amqplib.
6 * https://www.npmjs.com/package/amqplib
7 * @module lib/postmaster-general
8 */
9
10const EventEmitter = require('events');
11const amqp = require('amqplib');
12const log4js = require('log4js');
13const Promise = require('bluebird');
14const uuidv4 = require('uuid/v4');
15const defaults = require('./defaults');
16
17class PostmasterGeneral extends EventEmitter {
18 /**
19 * Constructor function for the PostmasterGeneral object.
20 * @param {object} [options]
21 */
22 constructor(options) {
23 super();
24
25 // Set initial state values.
26 this._connection = null;
27 this._connecting = false;
28 this._shuttingDown = false;
29 this._channels = {};
30 this._handlers = {};
31 this._handlerTimingsTimeout = null;
32 this._replyConsumerTag = null;
33 this._replyHandlers = {};
34 this._shouldConsume = false;
35 this._topology = { exchanges: defaults.exchanges, queues: {}, bindings: {} };
36 this._createChannel = null;
37
38 // Set options and defaults.
39 options = options || {};
40 this._connectRetryDelay = typeof options.connectRetryDelay === 'undefined' ? defaults.connectRetryDelay : options.connectRetryDelay;
41 this._connectRetryLimit = typeof options.connectRetryLimit === 'undefined' ? defaults.connectRetryLimit : options.connectRetryLimit;
42 this._consumerPrefetch = typeof options.consumerPrefetch === 'undefined' ? defaults.consumerPrefetch : options.consumerPrefetch;
43 this._deadLetterExchange = options.deadLetterExchange || defaults.deadLetterExchange;
44 this._defaultExchange = defaults.exchanges.topic;
45 this._handlerTimingResetInterval = options.handlerTimingResetInterval || defaults.handlerTimingResetInterval;
46 this._heartbeat = typeof options.heartbeat === 'undefined' ? defaults.heartbeat : options.heartbeat;
47 this._publishRetryDelay = typeof options.publishRetryDelay === 'undefined' ? defaults.publishRetryDelay : options.publishRetryDelay;
48 this._publishRetryLimit = typeof options.publishRetryLimit === 'undefined' ? defaults.publishRetryLimit : options.publishRetryLimit;
49 this._removeListenerRetryDelay = typeof options.removeListenerRetryDelay === 'undefined' ? defaults.removeListenerRetryDelay : options.removeListenerRetryDelay;
50 this._removeListenerRetryLimit = typeof options.removeListenerRetryLimit === 'undefined' ? defaults.removeListenerRetryLimit : options.removeListenerRetryLimit;
51 this._replyTimeout = this._publishRetryDelay * this._publishRetryLimit * 2;
52 this._queuePrefix = options.queuePrefix || defaults.queuePrefix;
53 this._shutdownTimeout = options.shutdownTimeout || defaults.shutdownTimeout;
54 this._url = options.url || defaults.url;
55
56 // Configure the logger.
57 log4js.configure({
58 appenders: { out: { type: 'stdout' } },
59 categories: { default: { appenders: ['out'], level: options.logLevel ? options.logLevel : defaults.logLevel } },
60 pm2: options.pm2,
61 pm2InstanceVar: options.pm2InstanceVar
62 });
63 this._logger = log4js.getLogger('postmaster-general');
64 this._logger.level = options.logLevel ? options.logLevel : defaults.logLevel;
65
66 // Configure reply queue topology.
67 // Reply queue belongs only to this instance, but we want it to survive reconnects. Thus, we set an expiration for the queue.
68 const replyQueueName = `postmaster.reply.${this._queuePrefix}.${uuidv4()}`;
69 const replyQueueExpiration = (this._connectRetryDelay * this._connectRetryLimit) + (60000 * this._connectRetryLimit);
70 this._topology.queues = { reply: { name: replyQueueName, options: { noAck: true, expires: replyQueueExpiration } } };
71 }
72
73 /**
74 * Accessor property for retrieving the number of messages being handled, subscribed and replies.
75 */
76 get outstandingMessageCount() {
77 const listenerCount = Object.keys(this._handlers).reduce((sum, key) => {
78 return sum + this._handlers[key].outstandingMessages.size;
79 }, 0);
80
81 return listenerCount + Object.keys(this._replyHandlers).length;
82 }
83
84 /**
85 * Accessor property for getting the current handler timings.
86 */
87 get handlerTimings() {
88 const handlerTimings = {};
89 for (const key of Object.keys(this._handlers)) {
90 const handler = this._handlers[key];
91 if (handler.timings) {
92 handlerTimings[key] = handler.timings;
93 }
94 }
95 return handlerTimings;
96 }
97
98 /**
99 * Called to resolve the RabbitMQ topic key corresponding to an address.
100 * @param {String} pattern
101 */
102 _resolveTopic(pattern) {
103 return pattern.replace(/:/g, '.');
104 }
105
106 /**
107 * Called to connect to RabbitMQ and build all channels.
108 * @returns {Promise}
109 */
110 async connect() {
111 let connectionAttempts = 0;
112
113 const attemptConnect = async () => {
114 this._logger.debug('Attempting to connect to RabbitMQ...');
115
116 connectionAttempts++;
117 this._connecting = true;
118
119 // We always want to start on a clean-slate when we connect.
120 // Cancel outstanding messages, clear all consumers, and reset the connection.
121 try {
122 this._logger.debug('Closing any existing connections...');
123 this._replyConsumerTag = null;
124 for (const key of Object.keys(this._handlers)) {
125 delete this._handlers[key].consumerTag;
126 if (this._handlers[key].outstandingMessages) {
127 this._handlers[key].outstandingMessages.clear();
128 }
129 }
130 await this._connection.close();
131 } catch (err) {}
132
133 const reconnect = async (err) => {
134 try {
135 if (!this._connecting && !this._shuttingDown) {
136 this._logger.warn(`Lost RabbitMQ connection and will try to reconnect! err: ${err.message}`);
137 await attemptConnect();
138 await this._assertTopology();
139 if (this._shouldConsume) {
140 await this.startConsuming();
141 }
142 this._logger.warn('Restored RabbitMQ connection successfully!');
143 }
144 } catch (err) {
145 const errMessage = `Unable to re-establish RabbitMQ connection! err: ${err.message}`;
146 this._logger.error(errMessage);
147 this.emit('error', new Error(errMessage));
148 }
149 };
150
151 try {
152 this._logger.debug('Acquiring RabbitMQ connection...');
153 this._connection = await amqp.connect(this._url, { heartbeat: this._heartbeat });
154 this._connection.on('error', reconnect.bind(this));
155
156 this._createChannel = async () => {
157 const channel = await this._connection.createChannel();
158 channel.on('error', reconnect.bind(this));
159 return channel;
160 };
161
162 this._logger.debug('Initializing channels...');
163 this._channels = await Promise.props({
164 publish: this._createChannel(),
165 replyPublish: this._createChannel(),
166 topology: this._createChannel(),
167 consumers: Promise.reduce(Object.keys(this._topology.queues), async (consumerMap, key) => {
168 const queue = this._topology.queues[key];
169 consumerMap[queue.name] = await this._createChannel();
170 return consumerMap;
171 }, {})
172 });
173
174 connectionAttempts = 0;
175 this._connecting = false;
176 this._logger.debug('Finished connecting to RabbitMQ!');
177 } catch (err) {
178 if (connectionAttempts < this._connectRetryLimit) {
179 this._logger.warn('Failed to establish RabbitMQ connection! Retrying...');
180 await Promise.delay(this._connectRetryDelay);
181 return attemptConnect();
182 }
183 this._logger.error(`Failed to establish RabbitMQ connection after ${connectionAttempts} attempts! err: ${err.message}`);
184 throw err;
185 }
186 };
187
188 await attemptConnect();
189 await this._assertTopology();
190 }
191
192 /**
193 * Called to safely shutdown the RabbitMQ connection while allowing outstanding messages to process.
194 */
195 async shutdown() {
196 const shutdownRetryDelay = 1000;
197 const retryLimit = this._shutdownTimeout / shutdownRetryDelay;
198 let retryAttempts = 0;
199
200 const attempt = async () => {
201 retryAttempts++;
202 this._shuttingDown = true;
203
204 this._logger.debug('Attempting shutdown...');
205
206 if ((this._connecting || this.outstandingMessageCount > 0) && retryAttempts < retryLimit) {
207 if (this._connecting) {
208 this._logger.debug('Unable to shutdown while attempting reconnect! Will retry...');
209 } else {
210 this._logger.debug(`Unable to shutdown while processing ${this.outstandingMessageCount} messages! Will retry...`);
211 }
212 await Promise.delay(shutdownRetryDelay);
213 return attempt();
214 }
215
216 try {
217 await this.stopConsuming();
218 } catch (err) {}
219
220 try {
221 await this._connection.close();
222 } catch (err) {}
223
224 if (this._handlerTimingsTimeout) {
225 clearTimeout(this._handlerTimingsTimeout);
226 }
227
228 this._logger.debug('Shutdown completed successfully!');
229 };
230
231 try {
232 await this.stopConsuming(true);
233 } catch (err) {}
234
235 return attempt();
236 }
237
238 /**
239 * Asserts an exchange on RabbitMQ, adding it to the list of known topology.
240 * @param {String} name The name of the exchange.
241 * @param {String} type The type of exchange.
242 * @param {Object} [options] Various exchange options.
243 * @returns {Promise} Promise that resolves when the exchange has been asserted.
244 */
245 async assertExchange(name, type, options) {
246 this._logger.debug(`Asserting exchange name: ${name} type: ${type} options: ${JSON.stringify(options)}`);
247 options = options || {};
248 await this._channels.topology.assertExchange(name, type, options);
249 this._topology.exchanges[name] = { name, type, options };
250 }
251
252 /**
253 * Asserts a queue on RabbitMQ, adding it to the list of known topology.
254 * @param {String} name The name of the queue.
255 * @param {Object} [options] Various queue options.
256 * @returns {Promise} Promise that resolves when the queue has been asserted.
257 */
258 async assertQueue(name, options) {
259 this._logger.debug(`Asserting queue name: ${name} options: ${JSON.stringify(options)}`);
260 options = options || {};
261 await this._channels.topology.assertQueue(name, options);
262 this._topology.queues[name] = { name, options };
263 }
264
265 /**
266 * Binds a queue to an exchange, recording the binding in the topology definition.
267 * @param {String} queue The name of the queue to bind.
268 * @param {String} exchange The exchange to bind.
269 * @param {String} topic The routing key to bind.
270 * @param {Object} [options] Various binding options.
271 * @returns {Promise} Promise that resolves when the binding is complete.
272 */
273 async assertBinding(queue, exchange, topic, options) {
274 this._logger.debug(`Asserting binding queue: ${queue} exchange: ${exchange} topic: ${topic} options: ${JSON.stringify(options)}`);
275 options = options || {};
276 await this._channels.topology.bindQueue(queue, exchange, topic, options);
277 this._topology.bindings[`${queue}_${exchange}`] = { queue, exchange, topic, options };
278 }
279
280 /**
281 * Called to assert any RabbitMQ topology after a successful connection is established.
282 * @returns {Promise} Promise resolving when all defined topology has been confirmed.
283 */
284 async _assertTopology() {
285 const topologyPromises = [];
286
287 this._logger.debug('Asserting pre-defined topology...');
288
289 // Assert exchanges.
290 for (const key of Object.keys(this._topology.exchanges)) {
291 const exchange = this._topology.exchanges[key];
292 this._logger.debug(`Asserting exchange name: ${exchange.name} type: ${exchange.type} options: ${JSON.stringify(exchange.options)}...`);
293 topologyPromises.push(this._channels.topology.assertExchange(exchange.name, exchange.type, exchange.options));
294 }
295
296 // Assert consumer queues.
297 for (const key of Object.keys(this._topology.queues)) {
298 const queue = this._topology.queues[key];
299 this._logger.debug(`Asserting queue name: ${queue.name} options: ${JSON.stringify(queue.options)}...`);
300 topologyPromises.push(this._channels.topology.assertQueue(queue.name, queue.options));
301 }
302
303 // Await all assertions before asserting bindings.
304 await Promise.all(topologyPromises);
305
306 // Bind listeners.
307 await Promise.map(Object.keys(this._topology.bindings), (key) => {
308 const binding = this._topology.bindings[key];
309 this._logger.debug(`Asserting binding queue: ${binding.queue} exchange: ${binding.exchange} topic: ${binding.topic} options: ${JSON.stringify(binding.options)}...`);
310 return this._channels.topology.bindQueue(binding.queue, binding.exchange, binding.topic, binding.options);
311 });
312
313 this._logger.debug('Finished asserting defined topology!');
314 }
315
316 /**
317 * Called to start consuming incoming messages from all consumer channels.
318 * @returns {Promise} Promise that resolves when all consumers have begun consuming.
319 */
320 async startConsuming() {
321 this._shouldConsume = true;
322
323 this._logger.debug('Starting up consumers...');
324
325 this._resetHandlerTimings();
326
327 // Since the reply queue isn't bound to an exchange, we need to handle it separately.
328 if (this._topology.queues.reply) {
329 const replyQueue = this._topology.queues.reply;
330 this._replyConsumerTag = await this._channels.consumers[replyQueue.name].consume(replyQueue.name, this._handleReply.bind(this), replyQueue.options);
331 this._replyConsumerTag = this._replyConsumerTag.consumerTag;
332 this._logger.debug(`Starting consuming from reply queue: ${replyQueue.name}...`);
333 }
334
335 await Promise.map(Object.keys(this._topology.bindings), async (key) => {
336 const binding = this._topology.bindings[key];
337 const consumerTag = await this._channels.consumers[binding.queue].consume(binding.queue, this._handlers[binding.topic].callback.bind(this), binding.options);
338 this._handlers[binding.topic].consumerTag = consumerTag.consumerTag;
339 this._logger.debug(`Starting consuming from queue: ${binding.queue}...`);
340 });
341
342 this._logger.debug('Finished starting all consumers!');
343 }
344
345 /**
346 * Called to stop consuming incoming messages from all channels.
347 * @param {Boolean} [awaitReplies] If truthy, this function will skip cancelling the reply consumer.
348 * @returns {Promise} Promise that resolves when all consumers have stopped consuming.
349 */
350 async stopConsuming(awaitReplies) {
351 this._shouldConsume = false;
352
353 this._logger.debug('Starting to cancel all consumers...');
354
355 this._resetHandlerTimings();
356
357 if (this._replyConsumerTag && !awaitReplies) {
358 await this._channels.consumers[this._topology.queues.reply.name].cancel(this._replyConsumerTag);
359 this._replyConsumerTag = null;
360 this._logger.debug(`Stopped consuming from queue ${this._topology.queues.reply.name}...`);
361 }
362
363 await Promise.map(Object.keys(this._topology.bindings), async (key) => {
364 const binding = this._topology.bindings[key];
365 if (this._handlers[binding.topic] && this._handlers[binding.topic].consumerTag) {
366 const consumerTag = JSON.parse(JSON.stringify(this._handlers[binding.topic].consumerTag));
367 delete this._handlers[binding.topic].consumerTag;
368 await this._channels.consumers[binding.queue].cancel(consumerTag);
369 this._logger.debug(`Stopped consuming from queue ${binding.queue}...`);
370 }
371 });
372
373 this._logger.debug('Finished consumer shutdown!');
374 }
375
376 /**
377 * A "safe", promise-based method for acknowledging messages that is guaranteed to resolve.
378 * @param {String} queueName The queue name of the channel the message was received on.
379 * @param {Object} msg The RabbitMQ message to acknowledge.
380 * @param {String} pattern The routing key of the message.
381 * @param {Object} [reply] The request body of the response message to send.
382 * @returns {Promise} Promise that resolves when the message is acknowledged.
383 */
384 async _ackMessageAndReply(queueName, msg, pattern, reply) {
385 try {
386 this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
387
388 const topic = this._resolveTopic(pattern);
389 if (this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
390 this._channels.consumers[queueName].ack(msg);
391 if (msg.properties.replyTo && msg.properties.correlationId) {
392 reply = reply || {};
393 const options = {
394 contentType: 'application/json',
395 contentEncoding: 'utf8',
396 messageId: uuidv4(),
397 correlationId: msg.properties.correlationId,
398 timestamp: new Date().getTime(),
399 replyTo: msg.properties.replyTo
400 };
401 this._channels.replyPublish.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(reply)), options);
402 }
403 this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
404 } else {
405 this._logger.warn(`Skipping message ack due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
406 }
407 } catch (err) {
408 this._logger.warn(`Failed to ack a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
409 }
410 }
411
412 /**
413 * A "safe", promise-based method for nacking messages that is guaranteed to resolve.
414 * Nacked messages will not be requeued.
415 * @param {String} queueName The queue name of the channel the message was received on.
416 * @param {Object} msg The RabbitMQ message to nack.
417 * @param {String} pattern The routing key of the message.
418 * @param {String} [reply] The error message to end in reply.
419 * @returns {Promise} Promise that resolves when the message is nacked.
420 */
421 async _nackMessageAndReply(queueName, msg, pattern, reply) {
422 try {
423 this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
424
425 const topic = this._resolveTopic(pattern);
426 if (this._channels.consumers[queueName] && this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
427 this._channels.consumers[queueName].nack(msg, false, false);
428 if (msg.properties.replyTo && msg.properties.correlationId) {
429 reply = reply || 'An unknown error occurred during processing!';
430 const options = {
431 contentType: 'application/json',
432 contentEncoding: 'utf8',
433 messageId: uuidv4(),
434 correlationId: msg.properties.correlationId,
435 timestamp: new Date().getTime(),
436 replyTo: msg.properties.replyTo
437 };
438 this._channels.replyPublish.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify({ err: reply })), options);
439 }
440 this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
441 } else {
442 this._logger.warn(`Skipping message nack due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
443 }
444 } catch (err) {
445 this._logger.warn(`Failed to nack a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
446 }
447 }
448
449 /**
450 * A "safe", promise-based method for rejecting messages that is guaranteed to resolve.
451 * Rejected messages will be requeued for retry.
452 * @param {String} queueName The queueName of the channel the message was received on.
453 * @param {Object} msg The RabbitMQ message to reject.
454 * @param {String} pattern The routing key of the message.
455 * @returns {Promise} Promise that resolves when the message is rejected.
456 */
457 async _rejectMessage(queueName, msg, pattern) {
458 try {
459 this._logger.debug(`Attempting to acknowledge message: ${pattern} messageId: ${msg.properties.messageId}`);
460
461 const topic = this._resolveTopic(pattern);
462 if (this._channels.consumers[queueName] && this._handlers[topic] && this._handlers[topic].outstandingMessages.has(`${pattern}_${msg.properties.messageId}`)) {
463 this._channels.consumers[queueName].reject(msg);
464 this._handlers[topic].outstandingMessages.delete(`${pattern}_${msg.properties.messageId}`);
465 } else {
466 this._logger.warn(`Skipping message rejection due to connection failure! message: ${pattern} messageId: ${msg.properties.messageId}`);
467 }
468 } catch (err) {
469 this._logger.warn(`Failed to reject a message! message: ${pattern} messageId: ${msg.properties.messageId} err: ${err.message}`);
470 }
471 }
472
473 /**
474 * Called to handle a reply to an RPC-style message.
475 * @param {Object} msg The RabbitMQ message.
476 */
477 _handleReply(msg) {
478 msg = msg || {};
479
480 let body;
481 try {
482 body = (msg.content || '{}').toString();
483 body = JSON.parse(body);
484 } catch (err) {
485 body.err = 'Failed to parse message body due to invalid JSON!';
486 }
487
488 msg.properties = msg.properties || {};
489 msg.properties.headers = msg.properties.headers || {};
490
491 this._logger.debug(`Attempting to handle incoming reply. correlationId: ${msg.properties.correlationId || 'undefined'}`);
492
493 if (!msg.properties.correlationId || !this._replyHandlers[msg.properties.correlationId] || msg.properties.replyTo !== this._topology.queues.reply.name) {
494 this._logger.warn(`Reply handler received an invalid reply! correlationId: ${msg.properties.correlationId}`);
495 } else {
496 if (body.err) {
497 this._replyHandlers[msg.properties.correlationId](new Error(body.err));
498 } else {
499 this._replyHandlers[msg.properties.correlationId](null, body);
500 }
501 delete this._replyHandlers[msg.properties.correlationId];
502 this._logger.debug(`Finished processing reply. correlationId: ${msg.properties.correlationId || 'undefined'}`);
503 }
504 }
505
506 /**
507 * Updates the timing data for message handler callbacks.
508 * @param {String} pattern The pattern to record timing data for.
509 * @param {Date} start The time at which the handler started execution.
510 */
511 _setHandlerTiming(pattern, start) {
512 this._logger.debug(`Setting handler timings for message: ${pattern}`);
513 const elapsed = new Date().getTime() - start;
514 this._handlers[pattern].timings = this._handlers[pattern].timings || {
515 messageCount: 0,
516 elapsedTime: 0,
517 minElapsedTime: 0,
518 maxElapsedTime: 0
519 };
520 this._handlers[pattern].timings.messageCount++;
521 this._handlers[pattern].timings.elapsedTime += elapsed;
522
523 if (this._handlers[pattern].timings.minElapsedTime > elapsed ||
524 this._handlers[pattern].timings.minElapsedTime === 0) {
525 this._handlers[pattern].timings.minElapsedTime = elapsed;
526 }
527 if (this._handlers[pattern].timings.maxElapsedTime < elapsed) {
528 this._handlers[pattern].timings.maxElapsedTime = elapsed;
529 }
530 }
531
532 /**
533 * Resets the handler timings to prevent unbounded accumulation of stale data.
534 */
535 _resetHandlerTimings() {
536 this._logger.debug('Resetting handler timings.');
537 for (const key of Object.keys(this._handlers)) {
538 delete this._handlers[key].timings;
539 }
540
541 // If we're not manually managing the timing refresh, schedule the next timeout.
542 if (this._handlerTimingResetInterval) {
543 this._logger.debug(`Setting next timing refresh in ${this._handlerTimingResetInterval} ms.`);
544 this._handlerTimingsTimeout = setTimeout(() => {
545 this._resetHandlerTimings();
546 }, this._handlerTimingResetInterval);
547 }
548 }
549
550 /**
551 * Adds a new listener for the specified pattern, asserting any associated topology.
552 * @param {String} pattern The pattern to bind to.
553 * @param {Function} callback The callback function to handle messages. This function MUST return a promise!
554 * @param {Object} [options] Additional options for queues, exchanges, and binding.
555 * @returns {Promise} A promise that resolves when the listener has been added.
556 */
557 async addListener(pattern, callback, options) {
558 this._logger.debug(`Starting to add a listener for message: ${pattern}...`);
559 options = options || {};
560
561 const topic = this._resolveTopic(pattern);
562
563 // Configure queue options.
564 options.queue = options.queue || {};
565 options.queue.deadLetterExchange = options.deadLetterExchange || this._deadLetterExchange;
566 options.queue.deadLetterRoutingKey = topic;
567
568 const queueName = (options.queue.prefix || this._queuePrefix) + '.' + topic;
569
570 // Configure exchange options
571 options.exchange = options.exchange || this._defaultExchange;
572
573 // Grab a channel and assert the topology.
574 this._logger.debug(`Asserting listener topology for message: ${pattern}...`);
575 if (!this._channels.consumers[queueName]) {
576 this._channels.consumers[queueName] = await this._createChannel();
577 }
578 await this.assertExchange(options.exchange.name, options.exchange.type, options.exchange);
579 await this.assertQueue(queueName, options.queue);
580 await this.assertBinding(queueName, options.exchange.name, topic, options.binding);
581
582 // Define the callback handler.
583 this._handlers[topic] = { outstandingMessages: new Set() };
584 this._handlers[topic].callback = async (msg) => {
585 const start = new Date().getTime();
586
587 try {
588 msg.properties = msg.properties || {};
589 msg.properties.headers = msg.properties.headers || {};
590 msg.properties.messageId = msg.properties.messageId || msg.properties.correlationId;
591
592 this._logger.debug(`Handling incoming message: ${pattern} messageId: ${msg.properties.messageId}...`);
593
594 this._handlers[topic].outstandingMessages.add(`${pattern}_${msg.properties.messageId}`);
595
596 let body = (msg.content || '{}').toString();
597 body = JSON.parse(body);
598
599 const reply = await callback(body, msg.properties.headers);
600 await this._ackMessageAndReply(queueName, msg, pattern, reply);
601 this._setHandlerTiming(topic, start);
602 this._logger.debug(`Finished handling incoming message: ${pattern} messageId: ${msg.properties.messageId}.`);
603 } catch (err) {
604 this._logger.error(`Message handler failed and cannot retry! message: ${pattern} err: ${err.message}`);
605 await this._nackMessageAndReply(queueName, msg, pattern, err.message);
606 this._setHandlerTiming(topic, start);
607 }
608 };
609
610 this._logger.debug(`Finished adding a listener for message: ${pattern}.`);
611 }
612
613 /**
614 * Called to remove a listener. Note that this call DOES NOT delete any queues
615 * or exchanges. It is recommended that these constructs be made to auto-delete or expire
616 * if they are not intended to be persistent.
617 * @param {String} pattern The pattern to match.
618 * @param {String} [exchange] The name of the exchange to remove the binding.
619 * @param {String} [prefix] The queue prefix to match.
620 * @returns {Promise} Promise that resolves when the listener has been removed.
621 */
622 async removeListener(pattern, exchange, prefix) {
623 let attempts = 0;
624
625 const topic = this._resolveTopic(pattern);
626 exchange = exchange || this._defaultExchange.name;
627 const queueName = (prefix || this._queuePrefix) + '.' + topic;
628
629 const attempt = async (skipIncrement) => {
630 if (!skipIncrement) {
631 attempts++;
632 }
633
634 this._logger.debug(`Attempting to remove a listener for message: ${pattern}...`);
635
636 if (this._connecting) {
637 this._logger.debug(`Cannot remove a listener for message: ${pattern} while reconnecting! Will retry in ${this._removeListenerRetryDelay} ms...`);
638 await Promise.delay(this._removeListenerRetryDelay);
639 return attempt(true);
640 }
641
642 try {
643 if (this._channels.consumers[queueName]) {
644 this._logger.debug(`Cancelling consumer for message: ${pattern}...`);
645 if (this._handlers[topic].consumerTag) {
646 await this._channels.consumers[queueName].cancel(this._handlers[topic].consumerTag);
647 }
648 this._handlers[topic].outstandingMessages.clear();
649 await this._channels.consumers[queueName].close();
650 delete this._channels.consumers[queueName];
651 }
652 delete this._handlers[topic];
653 delete this._topology.bindings[`${queueName}_${exchange}`];
654 this._logger.debug(`Finished removing listener for message: ${pattern}.`);
655 } catch (err) {
656 if (attempts < this._removeListenerRetryLimit) {
657 await Promise.delay(this._removeListenerRetryDelay);
658 return attempt();
659 }
660 throw err;
661 }
662 };
663
664 return attempt();
665 }
666
667 /**
668 * Publishes a fire-and-forget message that doesn't wait for an explicit response.
669 * @param {String} routingKey The routing key to attach to the message.
670 * @param {Object} [message] The message data to publish.
671 * @param {Object} [options] Optional publishing options.
672 * @returns {Promise} A promise that resolves when the message is published or publishing has failed.
673 */
674 async publish(routingKey, message, options) {
675 try {
676 let publishAttempts = 0;
677
678 // Set default publishing options.
679 options = options || {};
680 options.contentType = 'application/json';
681 options.contentEncoding = 'utf8';
682 options.messageId = options.messageId || uuidv4();
683 options.timestamp = new Date().getTime();
684
685 const exchange = options.exchange || this._defaultExchange.name;
686 const msgBody = JSON.stringify(message || '{}');
687 const msgData = Buffer.from(msgBody);
688
689 const attempt = async (skipIncrement) => {
690 this._logger.debug(`Attempting to publish fire-and-forget message: ${routingKey}...`);
691 if (!skipIncrement) {
692 publishAttempts++;
693 }
694
695 if (this._connecting) {
696 this._logger.debug(`Unable to publish fire-and-forget message: ${routingKey} while reconnecting. Will retry...`);
697 await Promise.delay(this._publishRetryDelay);
698 return attempt(true);
699 }
700
701 try {
702 const published = this._channels.publish.publish(exchange, this._resolveTopic(routingKey), msgData, options);
703 if (published) {
704 publishAttempts = 0;
705 } else {
706 throw new Error(`Publish buffer full!`);
707 }
708 } catch (err) {
709 if (publishAttempts < this._publishRetryLimit) {
710 this._logger.debug(`Failed to publish fire-and-forget message and will retry! message: ${routingKey} err: ${err.message}`);
711 await Promise.delay(this._publishRetryDelay);
712 return attempt();
713 }
714 throw err;
715 }
716 };
717
718 await attempt();
719 } catch (err) {
720 this._logger.error(`Failed to publish a fire-and-forget message! message: ${routingKey} err: ${err.message}`);
721 }
722 }
723
724 /**
725 * Publishes an RPC-style message that waits for a response.
726 * @param {String} routingKey The routing key to attach to the message.
727 * @param {Object} [message] The message data to publish.
728 * @param {Object} [options] Optional publishing options.
729 * @returns {Promise} A promise that resolves when the message is successfully published and a reply is received.
730 */
731 async request(routingKey, message, options) {
732 let publishAttempts = 0;
733
734 // Set default publishing options.
735 options = options || {};
736 options.contentType = 'application/json';
737 options.contentEncoding = 'utf8';
738 options.messageId = options.messageId || uuidv4();
739 options.correlationId = options.correlationId || options.messageId;
740 options.replyTo = this._topology.queues.reply.name;
741 options.timestamp = new Date().getTime();
742
743 const exchange = options.exchange || this._defaultExchange.name;
744 const msgData = Buffer.from(JSON.stringify(message || '{}'));
745
746 const attempt = async (skipIncrement) => {
747 this._logger.debug(`Attempting to publish RPC message: ${routingKey}...`);
748 if (!skipIncrement) {
749 publishAttempts++;
750 }
751
752 if (this._connecting) {
753 this._logger.debug(`Unable to publish RPC message: ${routingKey} while reconnecting. Will retry...`);
754 await Promise.delay(this._publishRetryDelay);
755 return attempt(true);
756 }
757
758 try {
759 const published = this._channels.publish.publish(exchange, this._resolveTopic(routingKey), msgData, options);
760 if (published) {
761 publishAttempts = 0;
762 } else {
763 throw new Error(`Publish buffer full!`);
764 }
765
766 return new Promise((resolve, reject) => {
767 this._replyHandlers[options.correlationId] = (err, data) => {
768 if (err) {
769 reject(err);
770 } else {
771 resolve(data);
772 }
773 };
774 }).timeout(this._replyTimeout);
775 } catch (err) {
776 if (publishAttempts < this._publishRetryLimit) {
777 this._logger.debug(`Failed to publish RPC message and will retry! message: ${routingKey} err: ${err.message}`);
778 await Promise.delay(this._publishRetryDelay);
779 return attempt();
780 }
781 this._logger.debug(`Failed to publish RPC message: ${routingKey} err: ${err.message}`);
782 throw err;
783 }
784 };
785
786 return attempt();
787 }
788}
789
790module.exports = PostmasterGeneral;