1 | 'use strict';
|
2 |
|
3 | var amqp = require('amqplib/callback_api');
|
4 | var async = require('async');
|
5 | var util = require('util');
|
6 |
|
7 | var Emitter = require('events').EventEmitter;
|
8 |
|
9 | var AMQPConnectionError = require('./errors/amqp-connection-error');
|
10 | var NoChannelError = require('./errors/no-channel-error');
|
11 | var ChannelAlreadyExists = require('./errors/channel-already-exists');
|
12 | var ChannelClosed = require('./errors/channel-closed');
|
13 | var ConsumerCreationError = require('./errors/consumer-creation-error');
|
14 |
|
15 |
|
16 | process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0';
|
17 |
|
18 | var CONNECTED = "CONNECTED";
|
19 | var DISCONNECTED = "DISCONNECTED";
|
20 |
|
21 | var PUBLISH_CHANNEL = "publish.channel";
|
22 |
|
23 | function AMQPHutch() {
|
24 | this.channels = {};
|
25 | this.closed = {};
|
26 | this.status = DISCONNECTED;
|
27 | }
|
28 |
|
29 | util.inherits(AMQPHutch, Emitter);
|
30 |
|
31 | AMQPHutch.prototype._isConnectionEstablished = function() {
|
32 | return this._conn != undefined && this.status === CONNECTED;
|
33 | };
|
34 |
|
35 | AMQPHutch.prototype._handleConnectionEstablishError = function(callback) {
|
36 | return callback(new AMQPConnectionError('Could not Establish a Connection to RabbitMQ Server'));
|
37 | };
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 | AMQPHutch.prototype.configuration = null;
|
45 |
|
46 |
|
47 |
|
48 |
|
49 | AMQPHutch.prototype.initialise = function (configuration) {
|
50 | this.configuration = configuration;
|
51 | this._connectionString = configuration.connectionString;
|
52 | this._retryWait = configuration.retryWait;
|
53 | this._connect();
|
54 | };
|
55 |
|
56 |
|
57 |
|
58 |
|
59 | AMQPHutch.prototype.publish = function (options, payload, callback) {
|
60 |
|
61 | if (!this._isConnectionEstablished()) return this._handleConnectionEstablishError(callback);
|
62 |
|
63 | if (options.exchange) {
|
64 | this.publishToExchange(options.exchange.name, options.exchange.type, options, payload, callback);
|
65 | }
|
66 | };
|
67 |
|
68 |
|
69 |
|
70 |
|
71 | AMQPHutch.prototype.publishToExchange = function (exchange, type, options, payload, callback) {
|
72 | var self = this;
|
73 | var channel = this._getChannelByExchange(exchange);
|
74 |
|
75 | if (!this._isConnectionEstablished()) return this._handleConnectionEstablishError(callback);
|
76 |
|
77 | options = options || {exchange: {}, publish: {}};
|
78 |
|
79 |
|
80 | if (channel){
|
81 | channel.publish(exchange, '', new Buffer(JSON.stringify(payload)), options.publish);
|
82 | channel.waitForConfirms(function (err) {
|
83 | callback(err);
|
84 | });
|
85 | }
|
86 |
|
87 | else {
|
88 | this._conn.createConfirmChannel(function (err, channel) {
|
89 | if (err) return callback(err);
|
90 |
|
91 | channel.on('error', function (err) {
|
92 |
|
93 | });
|
94 |
|
95 | channel.on('close', function () {
|
96 | delete self.channels[PUBLISH_CHANNEL];
|
97 | });
|
98 |
|
99 | channel.assertExchange(exchange, type, options.exchange, function (err, ex) {
|
100 | if (err) return callback(err);
|
101 |
|
102 | channel.exchange = exchange;
|
103 | self.channels[PUBLISH_CHANNEL] = channel;
|
104 |
|
105 | channel.publish(ex.exchange, '', new Buffer(JSON.stringify(payload)), options.publish);
|
106 | channel.waitForConfirms(function (err) {
|
107 | callback(err);
|
108 | });
|
109 | });
|
110 | });
|
111 | }
|
112 | };
|
113 |
|
114 |
|
115 |
|
116 |
|
117 | AMQPHutch.prototype.consume = function (options, consumer, callback) {
|
118 |
|
119 | var self = this;
|
120 |
|
121 | if (!self._isConnectionEstablished(self)) return self._handleConnectionEstablishError(callback);
|
122 | if (self._getChannelByQueue(options.queue.name)) return callback(new ChannelAlreadyExists("Channel already exists for " + options.queue.name));
|
123 | if (self.closed[options.queue.name]) delete self.closed[options.queue.name];
|
124 |
|
125 | function subscribe(){
|
126 | if ( self.closed[options.queue.name]) return callback(new ChannelClosed("Channel has already been closed"));
|
127 |
|
128 | var createChannel = function(next){
|
129 | try {
|
130 | self._conn.createChannel(function (err, channel) {
|
131 | next(err, channel);
|
132 | });
|
133 | }
|
134 | catch(e){
|
135 |
|
136 | next(new ChannelClosed("Channel closed unexpectedly"));
|
137 | }
|
138 | };
|
139 |
|
140 | var checkExchange = function(channel, next){
|
141 | channel.on('error', function () {
|
142 |
|
143 | });
|
144 |
|
145 | channel.assertExchange(options.exchange.name, options.exchange.type, options.exchange, function (err, ok) {
|
146 | next(err, channel);
|
147 | });
|
148 | };
|
149 |
|
150 | var checkQueue = function(channel, next){
|
151 | var opts = { noAck: false };
|
152 |
|
153 | channel.assertQueue(options.queue.name, options.queue, function (err, ok) {
|
154 | next(err, ok, channel);
|
155 | }, opts);
|
156 | };
|
157 |
|
158 | var bindAndConsume = function(ok, channel, next) {
|
159 | var opts = { exclusive: options.exclusive };
|
160 |
|
161 | channel.bindQueue(ok.queue, options.exchange.name, options.routingKey || '#');
|
162 | channel.prefetch(options.queue.prefetch);
|
163 | channel.queue = ok.queue;
|
164 | self.channels[channel.ch] = channel;
|
165 |
|
166 | channel.consume(ok.queue, function (message) {
|
167 |
|
168 | var done = function () {
|
169 | try { channel.ack(message); }
|
170 | catch (e) {
|
171 |
|
172 | console.log("Error during message Ack");
|
173 | }
|
174 | };
|
175 |
|
176 | var fail = function () {
|
177 | setTimeout(function () {
|
178 | try { channel.nack(message); }
|
179 | catch (e) {
|
180 |
|
181 | console.log("Error during message Nack");
|
182 | }
|
183 | }, self._retryWait);
|
184 | };
|
185 |
|
186 |
|
187 | if(options.skipNext){
|
188 | delete options.skipNext;
|
189 | return done();
|
190 | }
|
191 |
|
192 | consumer(message, done, fail);
|
193 | }, opts, function (err, ok) {
|
194 |
|
195 | if (!err){
|
196 | channel.on('close', function () {
|
197 | self.emit('channelClosed', options.queue.name);
|
198 | delete self.channels[channel.ch];
|
199 | });
|
200 |
|
201 | next();
|
202 | }
|
203 | else if (err != undefined && err.message != undefined && err.message.includes('403')) {
|
204 |
|
205 | delete options.skipNext;
|
206 |
|
207 | setTimeout(subscribe, Math.floor(Math.random() * 4000) + 2000);
|
208 | }
|
209 | else {
|
210 | next(err);
|
211 | }
|
212 | });
|
213 | };
|
214 |
|
215 | async.waterfall([ createChannel, checkExchange, checkQueue, bindAndConsume], function(err){
|
216 | if(!err) return callback();
|
217 |
|
218 | self.close(options.queue.name, function(){
|
219 | callback(new ConsumerCreationError("Unable to create a consumer: " + err.message));
|
220 | });
|
221 | });
|
222 | }
|
223 |
|
224 | subscribe();
|
225 | };
|
226 |
|
227 | AMQPHutch.prototype.destroy = function (queue, exchange, callback) {
|
228 |
|
229 |
|
230 | this.closed[queue] = true;
|
231 |
|
232 | if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
|
233 |
|
234 |
|
235 | this.get(queue, function (err, channel) {
|
236 | if (err) return callback(err);
|
237 |
|
238 | channel.unbindQueue(queue, exchange, '', {}, function (err, ok) {
|
239 | if (err) return callback(err);
|
240 |
|
241 | channel.deleteQueue(queue, {}, function (err, ok) {
|
242 | if (err) return callback(err);
|
243 |
|
244 | channel.close(function (err) {
|
245 | callback(err);
|
246 | });
|
247 | });
|
248 | });
|
249 | });
|
250 | };
|
251 |
|
252 | AMQPHutch.prototype.get = function (queue, callback) {
|
253 |
|
254 | if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
|
255 |
|
256 |
|
257 | var channel = this._getChannelByQueue(queue);
|
258 | if (channel) return callback(null, channel);
|
259 |
|
260 |
|
261 | this._conn.createChannel(function (err, channel) {
|
262 | if (err) return callback(err);
|
263 |
|
264 | channel.on('error', function (err) {
|
265 |
|
266 | });
|
267 |
|
268 | return callback(err, channel);
|
269 | });
|
270 | };
|
271 |
|
272 |
|
273 |
|
274 |
|
275 | AMQPHutch.prototype.close = function (queue, callback) {
|
276 |
|
277 | if (!this._isConnectionEstablished(this)) return this._handleConnectionEstablishError(callback);
|
278 |
|
279 |
|
280 | this.closed[queue] = true;
|
281 |
|
282 |
|
283 | var channel = this._getChannelByQueue(queue);
|
284 | if (!channel) return callback(new NoChannelError('Could not close channel that does not exist'));
|
285 |
|
286 | channel.checkQueue(queue, function (err) {
|
287 | if (err) return callback(err);
|
288 |
|
289 | channel.close(function (err) {
|
290 | callback(err);
|
291 | });
|
292 | });
|
293 | };
|
294 |
|
295 | AMQPHutch.prototype.isConnected = function () {
|
296 | return this.status === CONNECTED;
|
297 | };
|
298 |
|
299 |
|
300 |
|
301 |
|
302 | AMQPHutch.prototype._connect = function () {
|
303 |
|
304 | var self = this;
|
305 |
|
306 | var retry = function () {
|
307 | setTimeout(function () {
|
308 | self._connect();
|
309 | }, self._retryWait);
|
310 | };
|
311 |
|
312 |
|
313 | amqp.connect(self._connectionString, function (err, conn) {
|
314 |
|
315 | if (err) {
|
316 | self.emit('error', err);
|
317 | return retry();
|
318 | }
|
319 |
|
320 | conn.on("error", function (err) {
|
321 | self.emit('error', err);
|
322 | });
|
323 |
|
324 | conn.once("close", function (err) {
|
325 | self.emit('close', err);
|
326 | self.status = DISCONNECTED;
|
327 | retry();
|
328 | });
|
329 |
|
330 | self.status = CONNECTED;
|
331 | self._conn = conn;
|
332 | self.emit('ready', conn);
|
333 | });
|
334 | };
|
335 |
|
336 | AMQPHutch.prototype._getChannelByQueue = function(queue){
|
337 | for(var id in this.channels) {
|
338 |
|
339 |
|
340 | if(id !== PUBLISH_CHANNEL && this.channels[id].queue === queue) return this.channels[id];
|
341 | }
|
342 | };
|
343 |
|
344 | AMQPHutch.prototype._getChannelByExchange = function(exchange){
|
345 | for(var id in this.channels) {
|
346 | if(this.channels[id].exchange === exchange) return this.channels[id];
|
347 | }
|
348 | };
|
349 |
|
350 | module.exports = AMQPHutch;
|
351 | module.exports.AMQPConnectionError = AMQPConnectionError;
|
352 | module.exports.NoChannelError = NoChannelError;
|
353 | module.exports.ChannelAlreadyExists = ChannelAlreadyExists;
|
354 | module.exports.ChannelClosed = ChannelClosed;
|
355 | module.exports.ConsumerCreationError = ConsumerCreationError;
|
356 | module.exports.CONNECTED = CONNECTED;
|
357 | module.exports.DISCONNECTED = DISCONNECTED;
|