UNPKG

10.6 kBJavaScriptView Raw
1'use strict';
2
3var amqp = require('amqplib/callback_api');
4var async = require('async');
5var util = require('util');
6
7var Emitter = require('events').EventEmitter;
8
9var AMQPConnectionError = require('./errors/amqp-connection-error');
10var NoChannelError = require('./errors/no-channel-error');
11var ChannelAlreadyExists = require('./errors/channel-already-exists');
12var ChannelClosed = require('./errors/channel-closed');
13var ConsumerCreationError = require('./errors/consumer-creation-error');
14
15//TODO: Replace with SSL pem file.
16process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0';
17
18var CONNECTED = "CONNECTED";
19var DISCONNECTED = "DISCONNECTED";
20
21var PUBLISH_CHANNEL = "publish.channel";
22
23function AMQPHutch() {
24 this.channels = {};
25 this.closed = {};
26 this.status = DISCONNECTED;
27}
28
29util.inherits(AMQPHutch, Emitter);
30
31AMQPHutch.prototype._isConnectionEstablished = function() {
32 return this._conn != undefined && this.status === CONNECTED;
33};
34
35AMQPHutch.prototype._handleConnectionEstablishError = function(callback) {
36 return callback(new AMQPConnectionError('Could not Establish a Connection to RabbitMQ Server'));
37};
38
39/**
40 * Configuration Accessor
41 *
42 * @type {undefined}
43 */
44AMQPHutch.prototype.configuration = null;
45
46/**
47 * Initialise AMQPHutch Connection
48 */
49AMQPHutch.prototype.initialise = function (configuration) {
50 this.configuration = configuration;
51 this._connectionString = configuration.connectionString;
52 this._retryWait = configuration.retryWait;
53 this._connect();
54};
55
56/**
57 * Publish message
58 */
59AMQPHutch.prototype.publish = function (options, payload, callback) {
60
61 if (!this._isConnectionEstablished()) return this._handleConnectionEstablishError(callback);
62
63 if (options.exchange) {
64 this.publishToExchange(options.exchange.name, options.exchange.type, options, payload, callback);
65 }
66};
67
68/**
69 * Publish message to the exchange
70 */
71AMQPHutch.prototype.publishToExchange = function (exchange, type, options, payload, callback) {
72 var self = this;
73 var channel = this._getChannelByExchange(exchange);
74
75 if (!this._isConnectionEstablished()) return this._handleConnectionEstablishError(callback);
76
77 options = options || {exchange: {}, publish: {}};
78
79 // Load Existing Channel for publishing to this exchange.
80 if (channel){
81 channel.publish(exchange, '', new Buffer(JSON.stringify(payload)), options.publish);
82 channel.waitForConfirms(function (err) {
83 callback(err);
84 });
85 }
86 // Create new channel for publishing.
87 else {
88 this._conn.createConfirmChannel(function (err, channel) {
89 if (err) return callback(err);
90
91 channel.on('error', function (err) {
92 // This will be handled by the 'close' event.npm
93 });
94
95 channel.on('close', function () {
96 delete self.channels[PUBLISH_CHANNEL];
97 });
98
99 channel.assertExchange(exchange, type, options.exchange, function (err, ex) {
100 if (err) return callback(err);
101
102 channel.exchange = exchange;
103 self.channels[PUBLISH_CHANNEL] = channel;
104
105 channel.publish(ex.exchange, '', new Buffer(JSON.stringify(payload)), options.publish);
106 channel.waitForConfirms(function (err) {
107 callback(err);
108 });
109 });
110 });
111 }
112};
113
114/**
115 * Consume Queue to Exchange wrapper.
116 */
117AMQPHutch.prototype.consume = function (options, consumer, callback) {
118
119 var self = this;
120
121 if (!self._isConnectionEstablished(self)) return self._handleConnectionEstablishError(callback);
122 if (self._getChannelByQueue(options.queue.name)) return callback(new ChannelAlreadyExists("Channel already exists for " + options.queue.name));
123 if (self.closed[options.queue.name]) delete self.closed[options.queue.name];
124
125 function subscribe(){
126 if ( self.closed[options.queue.name]) return callback(new ChannelClosed("Channel has already been closed"));
127
128 var createChannel = function(next){
129 try {
130 self._conn.createChannel(function (err, channel) {
131 next(err, channel);
132 });
133 }
134 catch(e){
135 /* Random error thrown during the operation of creating a new channel. This could be trigger due to connectivity issues or a unknown uncaught error */
136 next(new ChannelClosed("Channel closed unexpectedly"));
137 }
138 };
139
140 var checkExchange = function(channel, next){
141 channel.on('error', function () {
142 /* Silently handle channel error as we are only interested in the channel closure. */
143 });
144
145 channel.assertExchange(options.exchange.name, options.exchange.type, options.exchange, function (err, ok) {
146 next(err, channel);
147 });
148 };
149
150 var checkQueue = function(channel, next){
151 var opts = { noAck: false };
152
153 channel.assertQueue(options.queue.name, options.queue, function (err, ok) {
154 next(err, ok, channel);
155 }, opts);
156 };
157
158 var bindAndConsume = function(ok, channel, next) {
159 var opts = { exclusive: options.exclusive };
160
161 channel.bindQueue(ok.queue, options.exchange.name, options.routingKey || '#');
162 channel.prefetch(options.queue.prefetch);
163 channel.queue = ok.queue;
164 self.channels[channel.ch] = channel;
165
166 channel.consume(ok.queue, function (message) {
167
168 var done = function () {
169 try { channel.ack(message); }
170 catch (e) {
171 /* Random error thrown during the operation of a message ack. This could be trigger due to connectivity issues or a unknown uncaught error */
172 console.log("Error during message Ack");
173 }
174 };
175
176 var fail = function () {
177 setTimeout(function () {
178 try { channel.nack(message); }
179 catch (e) {
180 /* Random error thrown during the operation of a message nack. This could be trigger due to connectivity issues or a unknown uncaught error */
181 console.log("Error during message Nack");
182 }
183 }, self._retryWait);
184 };
185
186 // If the skip next option has been set remove the next message and remove the temp parameter.
187 if(options.skipNext){
188 delete options.skipNext;
189 return done();
190 }
191
192 consumer(message, done, fail);
193 }, opts, function (err, ok) {
194
195 if (!err){
196 channel.on('close', function () {
197 self.emit('channelClosed', options.queue.name);
198 delete self.channels[channel.ch];
199 });
200
201 next();
202 }
203 else if (err != undefined && err.message != undefined && err.message.includes('403')) {
204 // Delete skipNext when not the active consumer.
205 delete options.skipNext;
206 // Retry with Random Timeout for Exclusive consumer.
207 setTimeout(subscribe, Math.floor(Math.random() * 4000) + 2000);
208 }
209 else {
210 next(err);
211 }
212 });
213 };
214
215 async.waterfall([ createChannel, checkExchange, checkQueue, bindAndConsume], function(err){
216 if(!err) return callback();
217
218 self.close(options.queue.name, function(){
219 callback(new ConsumerCreationError("Unable to create a consumer: " + err.message));
220 });
221 });
222 }
223
224 subscribe();
225};
226
227AMQPHutch.prototype.destroy = function (queue, exchange, callback) {
228
229 // Add the queue to the closed object.
230 this.closed[queue] = true;
231
232 if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
233
234 // Load channel instance
235 this.get(queue, function (err, channel) {
236 if (err) return callback(err);
237
238 channel.unbindQueue(queue, exchange, '', {}, function (err, ok) {
239 if (err) return callback(err);
240
241 channel.deleteQueue(queue, {}, function (err, ok) {
242 if (err) return callback(err);
243
244 channel.close(function (err) {
245 callback(err);
246 });
247 });
248 });
249 });
250};
251
252AMQPHutch.prototype.get = function (queue, callback) {
253
254 if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
255
256 // Load channel instance
257 var channel = this._getChannelByQueue(queue);
258 if (channel) return callback(null, channel);
259
260 // If we dont have one create a new one
261 this._conn.createChannel(function (err, channel) {
262 if (err) return callback(err);
263
264 channel.on('error', function (err) {
265 // This will be handled by the 'close' event.
266 });
267
268 return callback(err, channel);
269 });
270};
271
272/**
273 * close consumer.
274 */
275AMQPHutch.prototype.close = function (queue, callback) {
276
277 if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
278
279 // Add the queue to the closed object.
280 this.closed[queue] = true;
281
282 // Load channel instance
283 var channel = this._getChannelByQueue(queue);
284 if (!channel) return callback(new NoChannelError('Could not close channel that does not exist'));
285
286 channel.checkQueue(queue, function (err) {
287 if (err) return callback(err);
288
289 channel.close(function (err) {
290 callback(err);
291 });
292 });
293};
294
295AMQPHutch.prototype.isConnected = function () {
296 return this.status === CONNECTED;
297};
298
299/**
300 * Initialise Connection
301 */
302AMQPHutch.prototype._connect = function () {
303
304 var self = this;
305
306 var retry = function () {
307 setTimeout(function () {
308 self._connect();
309 }, self._retryWait);
310 };
311
312 // Establish RabbisMQ Connection
313 amqp.connect(self._connectionString, function (err, conn) {
314
315 if (err) {
316 self.emit('error', err);
317 return retry();
318 }
319
320 conn.on("error", function (err) {
321 self.emit('error', err);
322 });
323
324 conn.once("close", function (err) {
325 self.emit('close', err);
326 self.status = DISCONNECTED;
327 retry();
328 });
329
330 self.status = CONNECTED;
331 self._conn = conn;
332 self.emit('ready', conn);
333 });
334};
335
336AMQPHutch.prototype._getChannelByQueue = function(queue){
337 for(var id in this.channels) {
338
339 // Ignore the explicit publish channel.
340 if(id !== PUBLISH_CHANNEL && this.channels[id].queue === queue) return this.channels[id];
341 }
342};
343
344AMQPHutch.prototype._getChannelByExchange = function(exchange){
345 for(var id in this.channels) {
346 if(this.channels[id].exchange === exchange) return this.channels[id];
347 }
348};
349
350module.exports = AMQPHutch;
351module.exports.AMQPConnectionError = AMQPConnectionError;
352module.exports.NoChannelError = NoChannelError;
353module.exports.ChannelAlreadyExists = ChannelAlreadyExists;
354module.exports.ChannelClosed = ChannelClosed;
355module.exports.ConsumerCreationError = ConsumerCreationError;
356module.exports.CONNECTED = CONNECTED;
357module.exports.DISCONNECTED = DISCONNECTED;