1 | var mocha = require('mocha');
|
2 | var sinon = require('sinon');
|
3 | var should = require('chai').should();
|
4 |
|
5 | var AMQPHutch = require('..');
|
6 |
|
7 | describe('Hutch', function() {
|
8 |
|
9 | it('isConnected should return true when hutch is connected', function(complete) {
|
10 | var hutch = new AMQPHutch();
|
11 |
|
12 | hutch.initialise({
|
13 | connectionString: 'amqp://localhost',
|
14 | retryWait: 100000
|
15 | });
|
16 |
|
17 | hutch.on('ready', function(){
|
18 | hutch.isConnected().should.equal(true);
|
19 | complete();
|
20 | })
|
21 | });
|
22 |
|
23 | it('isConnected should return false when hutch is not connected', function(complete) {
|
24 | var hutch = new AMQPHutch();
|
25 | hutch.isConnected().should.equal(false);
|
26 | complete();
|
27 | });
|
28 |
|
29 | it('should return a no channel error when attempting to close a channel that doesnt exist', function(complete) {
|
30 | var hutch = new AMQPHutch();
|
31 |
|
32 | hutch.initialise({
|
33 | connectionString: 'amqp://localhost',
|
34 | retryWait: 100000
|
35 | });
|
36 |
|
37 | hutch.on('ready', function(){
|
38 | hutch.close("not/going/to/exist", function(err){
|
39 | err.name.should.equal('NoChannelError');
|
40 | complete();
|
41 | })
|
42 | })
|
43 | });
|
44 |
|
45 | it('should bind and consumer a message', function(complete) {
|
46 |
|
47 | var hutch = new AMQPHutch();
|
48 |
|
49 | hutch.initialise({
|
50 | connectionString: 'amqp://localhost',
|
51 | retryWait: 100
|
52 | });
|
53 |
|
54 | hutch.on('ready', function() {
|
55 |
|
56 | var options = {
|
57 | exchange: {
|
58 | name: 'example.exchange.1',
|
59 | type: 'topic'
|
60 | },
|
61 | queue: {
|
62 | name: 'example.queue',
|
63 | prefetch: 1,
|
64 | durable: true
|
65 | },
|
66 | publish: {
|
67 | persistent: true,
|
68 | expiration: 86400000
|
69 | }
|
70 | };
|
71 |
|
72 | var consumer = function(message, done, fail) {
|
73 | JSON.parse(message.content).should.equal('Example Message!');
|
74 | done();
|
75 |
|
76 | hutch.close(options.queue.name, function(){
|
77 | complete();
|
78 | });
|
79 | };
|
80 |
|
81 | hutch.consume(options, consumer, function(err) {
|
82 | hutch.publish(options, "Example Message!", function(err, res){});
|
83 | });
|
84 | });
|
85 | });
|
86 |
|
87 | it('should publish a message using the same channel', function(complete) {
|
88 |
|
89 | var hutch = new AMQPHutch();
|
90 | var spy = sinon.spy(hutch, '_getChannelByExchange');
|
91 |
|
92 | hutch.initialise({
|
93 | connectionString: 'amqp://localhost',
|
94 | retryWait: 100
|
95 | });
|
96 |
|
97 | hutch.on('ready', function() {
|
98 |
|
99 | var options = {
|
100 | exchange: {
|
101 | name: 'example.exchange.1',
|
102 | type: 'topic'
|
103 | },
|
104 | queue: {
|
105 | name: 'example.queue',
|
106 | prefetch: 1,
|
107 | durable: true
|
108 | },
|
109 | publish: {
|
110 | persistent: true,
|
111 | expiration: 86400000
|
112 | }
|
113 | };
|
114 |
|
115 | var count = 0;
|
116 | var consumer = function(message, done, fail) {
|
117 | count++;
|
118 | done();
|
119 |
|
120 | if(count === 2){
|
121 | should.not.exist(spy.firstCall.returnValue);
|
122 | spy.secondCall.returnValue.exchange.should.equal(options.exchange.name);
|
123 |
|
124 | hutch.close(options.queue.name, function(){
|
125 | complete();
|
126 | });
|
127 | }
|
128 | };
|
129 |
|
130 | hutch.consume(options, consumer, function(err) {
|
131 | hutch.publish(options, "Example Message!", function(err, res){
|
132 | hutch.publish(options, "Example Message!", function(err, res){});
|
133 | });
|
134 | });
|
135 | });
|
136 | });
|
137 |
|
138 | it('should error when not connected', function(complete) {
|
139 | this.timeout(5000);
|
140 |
|
141 | var hutch = new AMQPHutch();
|
142 |
|
143 | hutch.initialise({
|
144 | connectionString: 'amqp://bad',
|
145 | retryWait: 100000
|
146 | });
|
147 |
|
148 | hutch.on('error', function(err) {
|
149 | complete();
|
150 | });
|
151 | });
|
152 |
|
153 | it('should destroy a queue', function(complete) {
|
154 |
|
155 | var hutch = new AMQPHutch();
|
156 |
|
157 | hutch.initialise({
|
158 | connectionString: 'amqp://localhost',
|
159 | retryWait: 100
|
160 | });
|
161 |
|
162 | hutch.on('ready', function() {
|
163 |
|
164 | var options = {
|
165 | exchange: {
|
166 | name: 'example.exchange.2',
|
167 | type: 'topic'
|
168 | },
|
169 | queue: {
|
170 | name: 'example.queue.1',
|
171 | prefetch: 1,
|
172 | durable: true
|
173 | },
|
174 | publish: {
|
175 | persistent: true,
|
176 | expiration: 86400000
|
177 | }
|
178 | };
|
179 |
|
180 | var consumer = function(message, done, fail) {
|
181 | hutch.destroy(options.queue.name, options.exchange.name, function(err){
|
182 | should.not.exist(err);
|
183 | complete();
|
184 | });
|
185 | };
|
186 |
|
187 | hutch.consume(options, consumer, function(err) {
|
188 | hutch.publish(options, "Example Message!", function(err, res){});
|
189 | });
|
190 | });
|
191 | });
|
192 |
|
193 | it('should trigger a closeChannel event with the queue name for the channel', function(complete) {
|
194 |
|
195 | var hutch = new AMQPHutch();
|
196 |
|
197 | hutch.initialise({
|
198 | connectionString: 'amqp://localhost',
|
199 | retryWait: 100
|
200 | });
|
201 |
|
202 | hutch.on('ready', function() {
|
203 |
|
204 | var options = {
|
205 | exchange: {
|
206 | name: 'example.exchange.2',
|
207 | type: 'topic'
|
208 | },
|
209 | queue: {
|
210 | name: 'example.queue.1',
|
211 | prefetch: 1,
|
212 | durable: true
|
213 | },
|
214 | publish: {
|
215 | persistent: true,
|
216 | expiration: 86400000
|
217 | }
|
218 | };
|
219 |
|
220 | var consumer = function(message, done, fail) {
|
221 | hutch.close(options.queue.name, function(err){});
|
222 | };
|
223 |
|
224 | hutch.consume(options, consumer, function(err) {
|
225 | hutch.publish(options, "Example Message!", function(err, res){});
|
226 | });
|
227 | });
|
228 |
|
229 | hutch.on('channelClosed', function(queue){
|
230 | queue.should.equal('example.queue.1');
|
231 | complete();
|
232 | });
|
233 | });
|
234 |
|
235 | it('should error when one instance of hutch tries to consumer against the same queue', function(complete) {
|
236 |
|
237 | var hutch = new AMQPHutch();
|
238 |
|
239 | hutch.initialise({
|
240 | connectionString: 'amqp://localhost',
|
241 | retryWait: 100
|
242 | });
|
243 |
|
244 | hutch.on('ready', function() {
|
245 |
|
246 | var consumer = function(message, done, fail) {done();};
|
247 |
|
248 | var options = {
|
249 | exchange: {
|
250 | name: 'example.exchange.1',
|
251 | type: 'topic'
|
252 | },
|
253 | queue: {
|
254 | name: 'example.queue',
|
255 | prefetch: 1,
|
256 | durable: true
|
257 | },
|
258 | publish: {
|
259 | persistent: true,
|
260 | expiration: 86400000
|
261 | }
|
262 | };
|
263 |
|
264 | hutch.consume(options, consumer, function(err) {
|
265 | hutch.consume(options, consumer, function(err) {
|
266 | err.name.should.equal('ChannelAlreadyExists');
|
267 |
|
268 | hutch.close(options.queue.name, function(){
|
269 | complete();
|
270 | });
|
271 | });
|
272 | });
|
273 | });
|
274 | });
|
275 |
|
276 | it('should bind and skip the next message', function(complete) {
|
277 |
|
278 | var hutch = new AMQPHutch();
|
279 |
|
280 | hutch.initialise({
|
281 | connectionString: 'amqp://localhost',
|
282 | retryWait: 100
|
283 | });
|
284 |
|
285 | hutch.on('ready', function() {
|
286 |
|
287 | var options = {
|
288 | exchange: {
|
289 | name: 'example.exchange.1',
|
290 | type: 'topic'
|
291 | },
|
292 | queue: {
|
293 | name: 'example.queue',
|
294 | prefetch: 1,
|
295 | durable: true
|
296 | },
|
297 | publish: {
|
298 | persistent: true,
|
299 | expiration: 86400000
|
300 | },
|
301 | skipNext: true,
|
302 | exclusive: true
|
303 | };
|
304 |
|
305 | var count = 0;
|
306 | var consumer = function(message, done, fail) {
|
307 | count++;
|
308 | done();
|
309 | };
|
310 |
|
311 | hutch.consume(options, consumer, function(err) {
|
312 | hutch.publish(options, "Example Message!", function(err, res){});
|
313 | hutch.publish(options, "Example Message!", function(err, res){});
|
314 | hutch.publish(options, "Example Message!", function(err, res){});
|
315 | });
|
316 |
|
317 | setTimeout(function(){
|
318 | count.should.equal(2);
|
319 | hutch.close(options.queue.name, function(){
|
320 | complete();
|
321 | });
|
322 | }, 500);
|
323 | });
|
324 | });
|
325 |
|
326 | it('should retry for exclusive consumer after consumer one goes down', function(complete) {
|
327 |
|
328 | this.timeout(20000);
|
329 |
|
330 | var config = {
|
331 | connectionString: 'amqp://localhost',
|
332 | retryWait: 100
|
333 | };
|
334 |
|
335 | var options = {
|
336 | exchange: {
|
337 | name: 'example.exchange.1',
|
338 | type: 'topic'
|
339 | },
|
340 | queue: {
|
341 | name: 'example.queue',
|
342 | prefetch: 1,
|
343 | durable: true
|
344 | },
|
345 | publish: {
|
346 | persistent: true,
|
347 | expiration: 86400000
|
348 | },
|
349 | exclusive: true
|
350 | };
|
351 |
|
352 | var consumer = function(message, done, fail) { done(); };
|
353 |
|
354 | var instance1 = new AMQPHutch();
|
355 | var instance2 = new AMQPHutch();
|
356 |
|
357 | instance1.initialise(config);
|
358 | instance2.initialise(config);
|
359 |
|
360 | instance1.on('ready', function() {
|
361 |
|
362 | console.log("Hutch Instance Ready");
|
363 |
|
364 | instance1.consume(options, consumer, function(err) {
|
365 | console.log("Setup Consumer 1");
|
366 |
|
367 | instance2.consume(options, consumer, function(err) {
|
368 | console.log("Setup Consumer 2");
|
369 | should.not.exist(err);
|
370 |
|
371 | instance1.close(options.queue.name, function(){
|
372 | instance2.close(options.queue.name, function(){ complete(); });
|
373 | });
|
374 | });
|
375 |
|
376 | setTimeout(function(){
|
377 | instance1.close(options.queue.name, function(){
|
378 |
|
379 | console.log("Shutting down Consumer 1");
|
380 | })
|
381 | }, 4000)
|
382 | });
|
383 | });
|
384 | });
|
385 |
|
386 | it('should clear skip option for exclusive consumer after consumer is rejected', function(complete) {
|
387 |
|
388 | this.timeout(20000);
|
389 |
|
390 | var config = {
|
391 | connectionString: 'amqp://localhost',
|
392 | retryWait: 100
|
393 | };
|
394 |
|
395 | var options = {
|
396 | exchange: {
|
397 | name: 'example.exchange.1',
|
398 | type: 'topic'
|
399 | },
|
400 | queue: {
|
401 | name: 'example.queue',
|
402 | prefetch: 1,
|
403 | durable: true
|
404 | },
|
405 | publish: {
|
406 | persistent: true,
|
407 | expiration: 86400000
|
408 | },
|
409 | exclusive: true,
|
410 | skipNext: true
|
411 | };
|
412 |
|
413 | var messages = [];
|
414 | var consumer1 = function(message, done, fail) { messages.push('c1'); done(); };
|
415 | var consumer2 = function(message, done, fail) { messages.push('c2'); done(); };
|
416 |
|
417 | var instance1 = new AMQPHutch();
|
418 | var instance2 = new AMQPHutch();
|
419 |
|
420 | instance1.initialise(config);
|
421 | instance2.initialise(config);
|
422 |
|
423 | instance1.on('ready', function() {
|
424 | instance1.consume(options, consumer1, function(err) {
|
425 |
|
426 | instance2.consume(options, consumer2, function(err) {
|
427 | instance1.close(options.queue.name, function(){
|
428 |
|
429 | instance2.publish(options, "Example Message!", function(err, res){});
|
430 | });
|
431 | });
|
432 |
|
433 |
|
434 | instance1.publish(options, "Example Message!", function(err, res){
|
435 | instance1.close(options.queue.name, function(err){});
|
436 | });
|
437 |
|
438 | setTimeout(function(){
|
439 |
|
440 | messages.length.should.equal(1);
|
441 | messages[0].should.equal('c2');
|
442 | instance2.close(options.queue.name, function(){
|
443 | complete();
|
444 | });
|
445 | }, 8000)
|
446 | });
|
447 | });
|
448 | });
|
449 | });
|