UNPKG

11.1 kBJavaScriptView Raw
1var mocha = require('mocha');
2var sinon = require('sinon');
3var should = require('chai').should();
4
5var AMQPHutch = require('..');
6
7describe('Hutch', function() {
8
9 it('isConnected should return true when hutch is connected', function(complete) {
10 var hutch = new AMQPHutch();
11
12 hutch.initialise({
13 connectionString: 'amqp://localhost',
14 retryWait: 100000
15 });
16
17 hutch.on('ready', function(){
18 hutch.isConnected().should.equal(true);
19 complete();
20 })
21 });
22
23 it('isConnected should return false when hutch is not connected', function(complete) {
24 var hutch = new AMQPHutch();
25 hutch.isConnected().should.equal(false);
26 complete();
27 });
28
29 it('should return a no channel error when attempting to close a channel that doesnt exist', function(complete) {
30 var hutch = new AMQPHutch();
31
32 hutch.initialise({
33 connectionString: 'amqp://localhost',
34 retryWait: 100000
35 });
36
37 hutch.on('ready', function(){
38 hutch.close("not/going/to/exist", function(err){
39 err.name.should.equal('NoChannelError');
40 complete();
41 })
42 })
43 });
44
45 it('should bind and consumer a message', function(complete) {
46
47 var hutch = new AMQPHutch();
48
49 hutch.initialise({
50 connectionString: 'amqp://localhost',
51 retryWait: 100
52 });
53
54 hutch.on('ready', function() {
55
56 var options = {
57 exchange: {
58 name: 'example.exchange.1',
59 type: 'topic'
60 },
61 queue: {
62 name: 'example.queue',
63 prefetch: 1,
64 durable: true
65 },
66 publish: {
67 persistent: true,
68 expiration: 86400000
69 }
70 };
71
72 var consumer = function(message, done, fail) {
73 JSON.parse(message.content).should.equal('Example Message!');
74 done();
75
76 hutch.close(options.queue.name, function(){
77 complete();
78 });
79 };
80
81 hutch.consume(options, consumer, function(err) {
82 hutch.publish(options, "Example Message!", function(err, res){});
83 });
84 });
85 });
86
87 it('should publish a message using the same channel', function(complete) {
88
89 var hutch = new AMQPHutch();
90 var spy = sinon.spy(hutch, '_getChannelByExchange');
91
92 hutch.initialise({
93 connectionString: 'amqp://localhost',
94 retryWait: 100
95 });
96
97 hutch.on('ready', function() {
98
99 var options = {
100 exchange: {
101 name: 'example.exchange.1',
102 type: 'topic'
103 },
104 queue: {
105 name: 'example.queue',
106 prefetch: 1,
107 durable: true
108 },
109 publish: {
110 persistent: true,
111 expiration: 86400000
112 }
113 };
114
115 var count = 0;
116 var consumer = function(message, done, fail) {
117 count++;
118 done();
119
120 if(count === 2){
121 should.not.exist(spy.firstCall.returnValue);
122 spy.secondCall.returnValue.exchange.should.equal(options.exchange.name);
123
124 hutch.close(options.queue.name, function(){
125 complete();
126 });
127 }
128 };
129
130 hutch.consume(options, consumer, function(err) {
131 hutch.publish(options, "Example Message!", function(err, res){
132 hutch.publish(options, "Example Message!", function(err, res){});
133 });
134 });
135 });
136 });
137
138 it('should error when not connected', function(complete) {
139 this.timeout(5000); // Allow up to 5 seconds for windows
140
141 var hutch = new AMQPHutch();
142
143 hutch.initialise({
144 connectionString: 'amqp://bad',
145 retryWait: 100000
146 });
147
148 hutch.on('error', function(err) {
149 complete();
150 });
151 });
152
153 it('should destroy a queue', function(complete) {
154
155 var hutch = new AMQPHutch();
156
157 hutch.initialise({
158 connectionString: 'amqp://localhost',
159 retryWait: 100
160 });
161
162 hutch.on('ready', function() {
163
164 var options = {
165 exchange: {
166 name: 'example.exchange.2',
167 type: 'topic'
168 },
169 queue: {
170 name: 'example.queue.1',
171 prefetch: 1,
172 durable: true
173 },
174 publish: {
175 persistent: true,
176 expiration: 86400000
177 }
178 };
179
180 var consumer = function(message, done, fail) {
181 hutch.destroy(options.queue.name, options.exchange.name, function(err){
182 should.not.exist(err);
183 complete();
184 });
185 };
186
187 hutch.consume(options, consumer, function(err) {
188 hutch.publish(options, "Example Message!", function(err, res){});
189 });
190 });
191 });
192
193 it('should trigger a closeChannel event with the queue name for the channel', function(complete) {
194
195 var hutch = new AMQPHutch();
196
197 hutch.initialise({
198 connectionString: 'amqp://localhost',
199 retryWait: 100
200 });
201
202 hutch.on('ready', function() {
203
204 var options = {
205 exchange: {
206 name: 'example.exchange.2',
207 type: 'topic'
208 },
209 queue: {
210 name: 'example.queue.1',
211 prefetch: 1,
212 durable: true
213 },
214 publish: {
215 persistent: true,
216 expiration: 86400000
217 }
218 };
219
220 var consumer = function(message, done, fail) {
221 hutch.close(options.queue.name, function(err){});
222 };
223
224 hutch.consume(options, consumer, function(err) {
225 hutch.publish(options, "Example Message!", function(err, res){});
226 });
227 });
228
229 hutch.on('channelClosed', function(queue){
230 queue.should.equal('example.queue.1');
231 complete();
232 });
233 });
234
235 it('should error when one instance of hutch tries to consumer against the same queue', function(complete) {
236
237 var hutch = new AMQPHutch();
238
239 hutch.initialise({
240 connectionString: 'amqp://localhost',
241 retryWait: 100
242 });
243
244 hutch.on('ready', function() {
245
246 var consumer = function(message, done, fail) {done();};
247
248 var options = {
249 exchange: {
250 name: 'example.exchange.1',
251 type: 'topic'
252 },
253 queue: {
254 name: 'example.queue',
255 prefetch: 1,
256 durable: true
257 },
258 publish: {
259 persistent: true,
260 expiration: 86400000
261 }
262 };
263
264 hutch.consume(options, consumer, function(err) {
265 hutch.consume(options, consumer, function(err) {
266 err.name.should.equal('ChannelAlreadyExists');
267
268 hutch.close(options.queue.name, function(){
269 complete();
270 });
271 });
272 });
273 });
274 });
275
276 it('should bind and skip the next message', function(complete) {
277
278 var hutch = new AMQPHutch();
279
280 hutch.initialise({
281 connectionString: 'amqp://localhost',
282 retryWait: 100
283 });
284
285 hutch.on('ready', function() {
286
287 var options = {
288 exchange: {
289 name: 'example.exchange.1',
290 type: 'topic'
291 },
292 queue: {
293 name: 'example.queue',
294 prefetch: 1,
295 durable: true
296 },
297 publish: {
298 persistent: true,
299 expiration: 86400000
300 },
301 skipNext: true,
302 exclusive: true
303 };
304
305 var count = 0;
306 var consumer = function(message, done, fail) {
307 count++;
308 done();
309 };
310
311 hutch.consume(options, consumer, function(err) {
312 hutch.publish(options, "Example Message!", function(err, res){});
313 hutch.publish(options, "Example Message!", function(err, res){});
314 hutch.publish(options, "Example Message!", function(err, res){});
315 });
316
317 setTimeout(function(){
318 count.should.equal(2);
319 hutch.close(options.queue.name, function(){
320 complete();
321 });
322 }, 500);
323 });
324 });
325
326 it('should retry for exclusive consumer after consumer one goes down', function(complete) {
327
328 this.timeout(20000); // Allow up to 15 seconds for windows
329
330 var config = {
331 connectionString: 'amqp://localhost',
332 retryWait: 100
333 };
334
335 var options = {
336 exchange: {
337 name: 'example.exchange.1',
338 type: 'topic'
339 },
340 queue: {
341 name: 'example.queue',
342 prefetch: 1,
343 durable: true
344 },
345 publish: {
346 persistent: true,
347 expiration: 86400000
348 },
349 exclusive: true
350 };
351
352 var consumer = function(message, done, fail) { done(); };
353
354 var instance1 = new AMQPHutch();
355 var instance2 = new AMQPHutch();
356
357 instance1.initialise(config);
358 instance2.initialise(config);
359
360 instance1.on('ready', function() {
361
362 console.log("Hutch Instance Ready");
363
364 instance1.consume(options, consumer, function(err) {
365 console.log("Setup Consumer 1");
366
367 instance2.consume(options, consumer, function(err) {
368 console.log("Setup Consumer 2");
369 should.not.exist(err);
370
371 instance1.close(options.queue.name, function(){
372 instance2.close(options.queue.name, function(){ complete(); });
373 });
374 });
375
376 setTimeout(function(){
377 instance1.close(options.queue.name, function(){
378 // This will trigger consumer 2 to start
379 console.log("Shutting down Consumer 1");
380 })
381 }, 4000)
382 });
383 });
384 });
385
386 it('should clear skip option for exclusive consumer after consumer is rejected', function(complete) {
387
388 this.timeout(20000); // Allow up to 20 seconds for windows
389
390 var config = {
391 connectionString: 'amqp://localhost',
392 retryWait: 100
393 };
394
395 var options = {
396 exchange: {
397 name: 'example.exchange.1',
398 type: 'topic'
399 },
400 queue: {
401 name: 'example.queue',
402 prefetch: 1,
403 durable: true
404 },
405 publish: {
406 persistent: true,
407 expiration: 86400000
408 },
409 exclusive: true,
410 skipNext: true
411 };
412
413 var messages = [];
414 var consumer1 = function(message, done, fail) { messages.push('c1'); done(); };
415 var consumer2 = function(message, done, fail) { messages.push('c2'); done(); };
416
417 var instance1 = new AMQPHutch();
418 var instance2 = new AMQPHutch();
419
420 instance1.initialise(config);
421 instance2.initialise(config);
422
423 instance1.on('ready', function() {
424 instance1.consume(options, consumer1, function(err) {
425
426 instance2.consume(options, consumer2, function(err) {
427 instance1.close(options.queue.name, function(){
428 // Send a message to consumer 2;
429 instance2.publish(options, "Example Message!", function(err, res){});
430 });
431 });
432
433 // Send a message to consumer 1 then close it.
434 instance1.publish(options, "Example Message!", function(err, res){
435 instance1.close(options.queue.name, function(err){});
436 });
437
438 setTimeout(function(){
439 // We should have only processed one of the files.
440 messages.length.should.equal(1);
441 messages[0].should.equal('c2');
442 instance2.close(options.queue.name, function(){
443 complete();
444 });
445 }, 8000)
446 });
447 });
448 });
449});