UNPKG

13.6 kBJavaScriptView Raw
1var assert = require('assert');
2var crypto = require('crypto');
3var api = require('../lib/channel_api');
4var mock = require('./mocknet');
5var succeed = mock.succeed, fail = mock.fail;
6var when = require('when');
7var defer = when.defer;
8
9URL = process.env.URL || 'amqp://localhost';
10
11function connect() {
12 return api.connect(URL);
13}
14
15// Expect this promise to fail, and flip the results accordingly.
16function expectFail(promise) {
17 var rev = defer();
18 promise.then(rev.reject.bind(rev), rev.resolve.bind(rev));
19 return rev.promise;
20}
21
22// Often, even interdependent operations don't need to be explicitly
23// chained together with `.then`, since the channel implicitly
24// serialises RPCs. Synchronising on the last operation is sufficient,
25// provided all the operations are successful. This procedure removes
26// some of the `then` noise, while still failing if any of its
27// arguments fail.
28function doAll(/* promise... */) {
29 return when.all(arguments)
30 .then(function(results) {
31 return results[results.length - 1];
32 });
33}
34
35// I'll rely on operations being rejected, rather than the channel
36// close error, to detect failure.
37function ignore() {}
38function ignoreErrors(c) {
39 c.on('error', ignore); return c;
40}
41function logErrors(c) {
42 c.on('error', console.warn); return c;
43}
44
45function randomString() {
46 var hash = crypto.createHash('sha1');
47 hash.update(crypto.randomBytes(64));
48 return hash.digest('base64');
49}
50
51// Run a test with `name`, given a function that takes an open
52// channel, and returns a promise that is resolved on test success or
53// rejected on test failure.
54function channel_test(chmethod, name, chfun) {
55 test(name, function(done) {
56 connect(URL).then(logErrors).then(function(c) {
57 c[chmethod]().then(ignoreErrors).then(chfun)
58 .then(succeed(done), fail(done))
59 // close the connection regardless of what happens with the test
60 .then(function() {c.close();});
61 });
62 });
63}
64
65var chtest = channel_test.bind(null, 'createChannel');
66
67suite("connect", function() {
68
69test("at all", function(done) {
70 connect(URL).then(function(c) {
71 return c.close()
72 ;}).then(succeed(done), fail(done));
73});
74
75chtest("create channel", ignore); // i.e., just don't bork
76
77});
78
79var QUEUE_OPTS = {durable: false};
80var EX_OPTS = {durable: false};
81
82suite("assert, check, delete", function() {
83
84chtest("assert and check queue", function(ch) {
85 return ch.assertQueue('test.check-queue', QUEUE_OPTS)
86 .then(function(qok) {
87 return ch.checkQueue('test.check-queue');
88 });
89});
90
91chtest("assert and check exchange", function(ch) {
92 return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS)
93 .then(function(eok) {
94 return ch.checkExchange('test.check-exchange');
95 });
96});
97
98chtest("fail on reasserting queue with different options",
99 function(ch) {
100 var q = 'test.reassert-queue';
101 return ch.assertQueue(
102 q, {durable: false, autoDelete: true})
103 .then(function() {
104 return expectFail(
105 ch.assertQueue(q, {durable: false,
106 autoDelete: false}));
107 });
108 });
109
110chtest("fail on checking a queue that's not there", function(ch) {
111 return expectFail(ch.checkQueue('test.random-' + randomString()));
112});
113
114chtest("fail on checking an exchange that's not there", function(ch) {
115 return expectFail(ch.checkExchange('test.random-' + randomString()));
116});
117
118chtest("fail on reasserting exchange with different type",
119 function(ch) {
120 var ex = 'test.reassert-ex';
121 return ch.assertExchange(ex, 'fanout', EX_OPTS)
122 .then(function() {
123 return expectFail(
124 ch.assertExchange(ex, 'direct', EX_OPTS));
125 });
126 });
127
128chtest("channel break on publishing to non-exchange", function(ch) {
129 var bork = defer();
130 ch.on('error', bork.resolve.bind(bork));
131 ch.publish(randomString(), '', new Buffer('foobar'));
132 return bork.promise;
133});
134
135chtest("delete queue", function(ch) {
136 var q = 'test.delete-queue';
137 return doAll(
138 ch.assertQueue(q, QUEUE_OPTS),
139 ch.checkQueue(q))
140 .then(function() {
141 return ch.deleteQueue(q);})
142 .then(function() {
143 return expectFail(ch.checkQueue(q));});
144});
145
146chtest("fail to delete no queue", function(ch) {
147 return expectFail(ch.deleteQueue('test.random-' + randomString()));
148});
149
150chtest("delete exchange", function(ch) {
151 var ex = 'test.delete-exchange';
152 return doAll(
153 ch.assertExchange(ex, 'fanout', EX_OPTS),
154 ch.checkExchange(ex))
155 .then(function() {
156 return ch.deleteExchange(ex);})
157 .then(function() {
158 return expectFail(ch.checkExchange(ex));});
159});
160
161});
162
163// Wait for the queue to meet the condition; useful for waiting for
164// messages to arrive, for example.
165function waitForQueue(q, condition) {
166 var ready = defer();
167 connect(URL).then(function(c) {
168 return c.createChannel()
169 .then(function(ch) {
170 function check() {
171 ch.checkQueue(q).then(function(qok) {
172 if (condition(qok)) {
173 c.close();
174 ready.resolve(qok);
175 }
176 else setImmediate(check);
177 });
178 }
179 check();
180 });
181 });
182 return ready.promise;
183}
184
185// Return a promise that resolves when the queue has at least `num`
186// messages. If num is not supplied its assumed to be 1.
187function waitForMessages(q, num) {
188 var min = (num === undefined) ? 1 : num;
189 return waitForQueue(q, function(qok) {
190 return qok.messageCount >= min;
191 });
192}
193
194suite("binding, consuming", function() {
195
196// bind, publish, get
197chtest("route message", function(ch) {
198 var ex = 'test.route-message';
199 var q = 'test.route-message-q';
200 var msg = randomString();
201
202 return doAll(
203 ch.assertExchange(ex, 'fanout', EX_OPTS),
204 ch.assertQueue(q, QUEUE_OPTS),
205 ch.purgeQueue(q),
206 ch.bindQueue(q, ex, '', {}))
207 .then(function() {
208 ch.publish(ex, '', new Buffer(msg));
209 return waitForMessages(q);})
210 .then(function() {
211 return ch.get(q, {noAck: true});})
212 .then(function(m) {
213 assert(m);
214 assert.equal(msg, m.content.toString());
215 });
216});
217
218// send to queue, purge, get-empty
219chtest("purge queue", function(ch) {
220 var q = 'test.purge-queue';
221 return ch.assertQueue(q, {durable: false})
222 .then(function() {
223 ch.sendToQueue(q, new Buffer('foobar'));
224 return waitForMessages(q);})
225 .then(function() {
226 ch.purgeQueue(q);
227 return ch.get(q, {noAck: true});})
228 .then(function(m) {
229 assert(!m); // get-empty
230 });
231});
232
233// bind again, unbind, publish, get-empty
234chtest("unbind queue", function(ch) {
235 var ex = 'test.unbind-queue-ex';
236 var q = 'test.unbind-queue';
237 var viabinding = randomString();
238 var direct = randomString();
239
240 return doAll(
241 ch.assertExchange(ex, 'fanout', EX_OPTS),
242 ch.assertQueue(q, QUEUE_OPTS),
243 ch.purgeQueue(q),
244 ch.bindQueue(q, ex, '', {}))
245 .then(function() {
246 ch.publish(ex, '', new Buffer('foobar'));
247 return waitForMessages(q);})
248 .then(function() { // message got through!
249 return ch.get(q, {noAck:true})
250 .then(function(m) {assert(m);});})
251 .then(function() {
252 return ch.unbindQueue(q, ex, '', {});})
253 .then(function() {
254 // via the no-longer-existing binding
255 ch.publish(ex, '', new Buffer(viabinding));
256 // direct to the queue
257 ch.sendToQueue(q, new Buffer(direct));
258 return waitForMessages(q);})
259 .then(function() {return ch.get(q)})
260 .then(function(m) {
261 // the direct to queue message got through, the via-binding
262 // message (sent first) did not
263 assert.equal(direct, m.content.toString());
264 });
265});
266
267// To some extent this is now just testing semantics of the server,
268// but we can at least try out a few settings, and consume.
269chtest("consume via exchange-exchange binding", function(ch) {
270 var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';
271 var q = 'test.ex-ex-binding-q';
272 var rk = 'test.routing.key', msg = randomString();
273 return doAll(
274 ch.assertExchange(ex1, 'direct', EX_OPTS),
275 ch.assertExchange(ex2, 'fanout',
276 {durable: false, internal: true}),
277 ch.assertQueue(q, QUEUE_OPTS),
278 ch.purgeQueue(q),
279 ch.bindExchange(ex2, ex1, rk, {}),
280 ch.bindQueue(q, ex2, '', {}))
281 .then(function() {
282 var arrived = defer();
283 function delivery(m) {
284 if (m.content.toString() === msg) arrived.resolve();
285 else arrived.reject(new Error("Wrong message"));
286 }
287 ch.consume(q, delivery, {noAck: true})
288 .then(function() {
289 ch.publish(ex1, rk, new Buffer(msg));
290 });
291 return arrived.promise;
292 });
293});
294
295// This is a bit convoluted. Sorry.
296chtest("cancel consumer", function(ch) {
297 var q = 'test.consumer-cancel';
298 var recv1 = defer();
299 var ctag;
300
301 doAll(
302 ch.assertQueue(q, QUEUE_OPTS),
303 ch.purgeQueue(q),
304 // My callback is 'resolve the promise in `arrived`'
305 ch.consume(q, function() { recv1.resolve(); }, {noAck:true})
306 .then(function(ok) {
307 ctag = ok.consumerTag;
308 ch.sendToQueue(q, new Buffer('foo'));
309 }));
310 // A message should arrive because of the consume
311 return recv1.promise.then(function() {
312 // replace the promise resolved by the consume callback
313 recv1 = defer();
314
315 return doAll(
316 ch.cancel(ctag).then(function() {
317 ch.sendToQueue(q, new Buffer('bar'));
318 }),
319 // but check a message did arrive in the queue
320 waitForMessages(q))
321 .then(function() {
322 ch.get(q, {noAck:true})
323 .then(function(m) {
324 // I'm going to reject it, because I flip succeed/fail
325 // just below
326 if (m.content.toString() === 'bar')
327 recv1.reject();
328 });
329 return expectFail(recv1.promise);
330 // i.e., fail on delivery, succeed on get-ok
331 });
332 });
333});
334
335// ack, by default, removes a single message from the queue
336chtest("ack", function(ch) {
337 var q = 'test.ack';
338 var msg1 = randomString(), msg2 = randomString();
339
340 return doAll(
341 ch.assertQueue(q, QUEUE_OPTS),
342 ch.purgeQueue(q))
343 .then(function() {
344 ch.sendToQueue(q, new Buffer(msg1));
345 ch.sendToQueue(q, new Buffer(msg2));
346 return waitForMessages(q, 2);
347 })
348 .then(function() {
349 return ch.get(q, {noAck: false})
350 })
351 .then(function(m) {
352 assert.equal(msg1, m.content.toString());
353 ch.ack(m);
354 // %%% is there a race here? may depend on
355 // rabbitmq-sepcific semantics
356 return ch.get(q);
357 })
358 .then(function(m) {
359 assert(m);
360 assert.equal(msg2, m.content.toString());
361 });
362});
363
364// Nack, by default, puts a message back on the queue (where in the
365// queue is up to the server)
366chtest("nack", function(ch) {
367 var q = 'test.nack';
368 var msg1 = randomString();
369
370 return doAll(
371 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
372 .then(function() {
373 ch.sendToQueue(q, new Buffer(msg1));
374 return waitForMessages(q);})
375 .then(function() {
376 return ch.get(q, {noAck: false})})
377 .then(function(m) {
378 assert.equal(msg1, m.content.toString());
379 ch.nack(m);
380 return waitForMessages(q);})
381 .then(function() {
382 return ch.get(q);})
383 .then(function(m) {
384 assert(m);
385 assert.equal(msg1, m.content.toString());
386 });
387});
388
389chtest("prefetch", function(ch) {
390 var q = 'test.prefetch';
391 return doAll(
392 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),
393 ch.prefetch(1))
394 .then(function() {
395 ch.sendToQueue(q, new Buffer('foobar'));
396 ch.sendToQueue(q, new Buffer('foobar'));
397 return waitForMessages(q, 2);
398 })
399 .then(function() {
400 var first = defer();
401 return doAll(
402 ch.consume(q, function(m) {
403 first.resolve(m);
404 }, {noAck: false}),
405 first.promise.then(function(m) {
406 first = defer();
407 ch.ack(m);
408 return first.promise.then(function(m) {
409 ch.ack(m);
410 })
411 }));
412 });
413});
414
415});
416
417confirmtest = channel_test.bind(null, 'createConfirmChannel');
418
419suite("confirms", function() {
420
421confirmtest('message is confirmed', function(ch) {
422 var q = 'test.confirm-message';
423 return doAll(
424 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
425 .then(function() {
426 return ch.sendToQueue(q, new Buffer('bleep'));
427 });
428});
429
430// Usually one can provoke the server into confirming more than one
431// message in an ack by simply sending a few messages in quick
432// succession; a bit unscientific I know. Luckily we can eavesdrop on
433// the acknowledgements coming through to see if we really did get a
434// multi-ack.
435confirmtest('multiple confirms', function(ch) {
436 var q = 'test.multiple-confirms';
437 return doAll(
438 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
439 .then(function() {
440 var multipleRainbows = false;
441 ch.on('ack', function(a) {
442 if (a.multiple) multipleRainbows = true;
443 });
444
445 function prod(num) {
446 var cs = [];
447 for (var i=0; i < num; i++)
448 cs.push(ch.sendToQueue(q, new Buffer('bleep')));
449 return when.all(cs).then(function() {
450 if (multipleRainbows) return true;
451 else if (num > 500) throw new Error(
452 "Couldn't provoke the server" +
453 "into multi-acking with " + num +
454 " messages; giving up");
455 else {
456 //console.warn("Failed with " + num + "; trying " + num * 2);
457 return prod(num * 2);
458 }
459 });
460 }
461 return prod(5);
462 });
463});
464
465});