1 | var assert = require('assert');
|
2 | var crypto = require('crypto');
|
3 | var api = require('../lib/channel_api');
|
4 | var mock = require('./mocknet');
|
5 | var succeed = mock.succeed, fail = mock.fail;
|
6 | var when = require('when');
|
7 | var defer = when.defer;
|
8 |
|
9 | URL = process.env.URL || 'amqp://localhost';
|
10 |
|
11 | function connect() {
|
12 | return api.connect(URL);
|
13 | }
|
14 |
|
15 |
|
16 | function expectFail(promise) {
|
17 | var rev = defer();
|
18 | promise.then(rev.reject.bind(rev), rev.resolve.bind(rev));
|
19 | return rev.promise;
|
20 | }
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | function doAll(/* promise... */) {
|
29 | return when.all(arguments)
|
30 | .then(function(results) {
|
31 | return results[results.length - 1];
|
32 | });
|
33 | }
|
34 |
|
35 |
|
36 |
|
37 | function ignore() {}
|
38 | function ignoreErrors(c) {
|
39 | c.on('error', ignore); return c;
|
40 | }
|
41 | function logErrors(c) {
|
42 | c.on('error', console.warn); return c;
|
43 | }
|
44 |
|
45 | function randomString() {
|
46 | var hash = crypto.createHash('sha1');
|
47 | hash.update(crypto.randomBytes(64));
|
48 | return hash.digest('base64');
|
49 | }
|
50 |
|
51 |
|
52 |
|
53 |
|
54 | function channel_test(chmethod, name, chfun) {
|
55 | test(name, function(done) {
|
56 | connect(URL).then(logErrors).then(function(c) {
|
57 | c[chmethod]().then(ignoreErrors).then(chfun)
|
58 | .then(succeed(done), fail(done))
|
59 |
|
60 | .then(function() {c.close();});
|
61 | });
|
62 | });
|
63 | }
|
64 |
|
65 | var chtest = channel_test.bind(null, 'createChannel');
|
66 |
|
67 | suite("connect", function() {
|
68 |
|
69 | test("at all", function(done) {
|
70 | connect(URL).then(function(c) {
|
71 | return c.close()
|
72 | ;}).then(succeed(done), fail(done));
|
73 | });
|
74 |
|
75 | chtest("create channel", ignore);
|
76 |
|
77 | });
|
78 |
|
79 | var QUEUE_OPTS = {durable: false};
|
80 | var EX_OPTS = {durable: false};
|
81 |
|
82 | suite("assert, check, delete", function() {
|
83 |
|
84 | chtest("assert and check queue", function(ch) {
|
85 | return ch.assertQueue('test.check-queue', QUEUE_OPTS)
|
86 | .then(function(qok) {
|
87 | return ch.checkQueue('test.check-queue');
|
88 | });
|
89 | });
|
90 |
|
91 | chtest("assert and check exchange", function(ch) {
|
92 | return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS)
|
93 | .then(function(eok) {
|
94 | return ch.checkExchange('test.check-exchange');
|
95 | });
|
96 | });
|
97 |
|
98 | chtest("fail on reasserting queue with different options",
|
99 | function(ch) {
|
100 | var q = 'test.reassert-queue';
|
101 | return ch.assertQueue(
|
102 | q, {durable: false, autoDelete: true})
|
103 | .then(function() {
|
104 | return expectFail(
|
105 | ch.assertQueue(q, {durable: false,
|
106 | autoDelete: false}));
|
107 | });
|
108 | });
|
109 |
|
110 | chtest("fail on checking a queue that's not there", function(ch) {
|
111 | return expectFail(ch.checkQueue('test.random-' + randomString()));
|
112 | });
|
113 |
|
114 | chtest("fail on checking an exchange that's not there", function(ch) {
|
115 | return expectFail(ch.checkExchange('test.random-' + randomString()));
|
116 | });
|
117 |
|
118 | chtest("fail on reasserting exchange with different type",
|
119 | function(ch) {
|
120 | var ex = 'test.reassert-ex';
|
121 | return ch.assertExchange(ex, 'fanout', EX_OPTS)
|
122 | .then(function() {
|
123 | return expectFail(
|
124 | ch.assertExchange(ex, 'direct', EX_OPTS));
|
125 | });
|
126 | });
|
127 |
|
128 | chtest("channel break on publishing to non-exchange", function(ch) {
|
129 | var bork = defer();
|
130 | ch.on('error', bork.resolve.bind(bork));
|
131 | ch.publish(randomString(), '', new Buffer('foobar'));
|
132 | return bork.promise;
|
133 | });
|
134 |
|
135 | chtest("delete queue", function(ch) {
|
136 | var q = 'test.delete-queue';
|
137 | return doAll(
|
138 | ch.assertQueue(q, QUEUE_OPTS),
|
139 | ch.checkQueue(q))
|
140 | .then(function() {
|
141 | return ch.deleteQueue(q);})
|
142 | .then(function() {
|
143 | return expectFail(ch.checkQueue(q));});
|
144 | });
|
145 |
|
146 | chtest("fail to delete no queue", function(ch) {
|
147 | return expectFail(ch.deleteQueue('test.random-' + randomString()));
|
148 | });
|
149 |
|
150 | chtest("delete exchange", function(ch) {
|
151 | var ex = 'test.delete-exchange';
|
152 | return doAll(
|
153 | ch.assertExchange(ex, 'fanout', EX_OPTS),
|
154 | ch.checkExchange(ex))
|
155 | .then(function() {
|
156 | return ch.deleteExchange(ex);})
|
157 | .then(function() {
|
158 | return expectFail(ch.checkExchange(ex));});
|
159 | });
|
160 |
|
161 | });
|
162 |
|
163 |
|
164 |
|
165 | function waitForQueue(q, condition) {
|
166 | var ready = defer();
|
167 | connect(URL).then(function(c) {
|
168 | return c.createChannel()
|
169 | .then(function(ch) {
|
170 | function check() {
|
171 | ch.checkQueue(q).then(function(qok) {
|
172 | if (condition(qok)) {
|
173 | c.close();
|
174 | ready.resolve(qok);
|
175 | }
|
176 | else setImmediate(check);
|
177 | });
|
178 | }
|
179 | check();
|
180 | });
|
181 | });
|
182 | return ready.promise;
|
183 | }
|
184 |
|
185 |
|
186 |
|
187 | function waitForMessages(q, num) {
|
188 | var min = (num === undefined) ? 1 : num;
|
189 | return waitForQueue(q, function(qok) {
|
190 | return qok.messageCount >= min;
|
191 | });
|
192 | }
|
193 |
|
194 | suite("binding, consuming", function() {
|
195 |
|
196 |
|
197 | chtest("route message", function(ch) {
|
198 | var ex = 'test.route-message';
|
199 | var q = 'test.route-message-q';
|
200 | var msg = randomString();
|
201 |
|
202 | return doAll(
|
203 | ch.assertExchange(ex, 'fanout', EX_OPTS),
|
204 | ch.assertQueue(q, QUEUE_OPTS),
|
205 | ch.purgeQueue(q),
|
206 | ch.bindQueue(q, ex, '', {}))
|
207 | .then(function() {
|
208 | ch.publish(ex, '', new Buffer(msg));
|
209 | return waitForMessages(q);})
|
210 | .then(function() {
|
211 | return ch.get(q, {noAck: true});})
|
212 | .then(function(m) {
|
213 | assert(m);
|
214 | assert.equal(msg, m.content.toString());
|
215 | });
|
216 | });
|
217 |
|
218 |
|
219 | chtest("purge queue", function(ch) {
|
220 | var q = 'test.purge-queue';
|
221 | return ch.assertQueue(q, {durable: false})
|
222 | .then(function() {
|
223 | ch.sendToQueue(q, new Buffer('foobar'));
|
224 | return waitForMessages(q);})
|
225 | .then(function() {
|
226 | ch.purgeQueue(q);
|
227 | return ch.get(q, {noAck: true});})
|
228 | .then(function(m) {
|
229 | assert(!m);
|
230 | });
|
231 | });
|
232 |
|
233 |
|
234 | chtest("unbind queue", function(ch) {
|
235 | var ex = 'test.unbind-queue-ex';
|
236 | var q = 'test.unbind-queue';
|
237 | var viabinding = randomString();
|
238 | var direct = randomString();
|
239 |
|
240 | return doAll(
|
241 | ch.assertExchange(ex, 'fanout', EX_OPTS),
|
242 | ch.assertQueue(q, QUEUE_OPTS),
|
243 | ch.purgeQueue(q),
|
244 | ch.bindQueue(q, ex, '', {}))
|
245 | .then(function() {
|
246 | ch.publish(ex, '', new Buffer('foobar'));
|
247 | return waitForMessages(q);})
|
248 | .then(function() {
|
249 | return ch.get(q, {noAck:true})
|
250 | .then(function(m) {assert(m);});})
|
251 | .then(function() {
|
252 | return ch.unbindQueue(q, ex, '', {});})
|
253 | .then(function() {
|
254 |
|
255 | ch.publish(ex, '', new Buffer(viabinding));
|
256 |
|
257 | ch.sendToQueue(q, new Buffer(direct));
|
258 | return waitForMessages(q);})
|
259 | .then(function() {return ch.get(q)})
|
260 | .then(function(m) {
|
261 |
|
262 |
|
263 | assert.equal(direct, m.content.toString());
|
264 | });
|
265 | });
|
266 |
|
267 |
|
268 |
|
269 | chtest("consume via exchange-exchange binding", function(ch) {
|
270 | var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';
|
271 | var q = 'test.ex-ex-binding-q';
|
272 | var rk = 'test.routing.key', msg = randomString();
|
273 | return doAll(
|
274 | ch.assertExchange(ex1, 'direct', EX_OPTS),
|
275 | ch.assertExchange(ex2, 'fanout',
|
276 | {durable: false, internal: true}),
|
277 | ch.assertQueue(q, QUEUE_OPTS),
|
278 | ch.purgeQueue(q),
|
279 | ch.bindExchange(ex2, ex1, rk, {}),
|
280 | ch.bindQueue(q, ex2, '', {}))
|
281 | .then(function() {
|
282 | var arrived = defer();
|
283 | function delivery(m) {
|
284 | if (m.content.toString() === msg) arrived.resolve();
|
285 | else arrived.reject(new Error("Wrong message"));
|
286 | }
|
287 | ch.consume(q, delivery, {noAck: true})
|
288 | .then(function() {
|
289 | ch.publish(ex1, rk, new Buffer(msg));
|
290 | });
|
291 | return arrived.promise;
|
292 | });
|
293 | });
|
294 |
|
295 |
|
296 | chtest("cancel consumer", function(ch) {
|
297 | var q = 'test.consumer-cancel';
|
298 | var recv1 = defer();
|
299 | var ctag;
|
300 |
|
301 | doAll(
|
302 | ch.assertQueue(q, QUEUE_OPTS),
|
303 | ch.purgeQueue(q),
|
304 |
|
305 | ch.consume(q, function() { recv1.resolve(); }, {noAck:true})
|
306 | .then(function(ok) {
|
307 | ctag = ok.consumerTag;
|
308 | ch.sendToQueue(q, new Buffer('foo'));
|
309 | }));
|
310 |
|
311 | return recv1.promise.then(function() {
|
312 |
|
313 | recv1 = defer();
|
314 |
|
315 | return doAll(
|
316 | ch.cancel(ctag).then(function() {
|
317 | ch.sendToQueue(q, new Buffer('bar'));
|
318 | }),
|
319 |
|
320 | waitForMessages(q))
|
321 | .then(function() {
|
322 | ch.get(q, {noAck:true})
|
323 | .then(function(m) {
|
324 |
|
325 |
|
326 | if (m.content.toString() === 'bar')
|
327 | recv1.reject();
|
328 | });
|
329 | return expectFail(recv1.promise);
|
330 |
|
331 | });
|
332 | });
|
333 | });
|
334 |
|
335 |
|
336 | chtest("ack", function(ch) {
|
337 | var q = 'test.ack';
|
338 | var msg1 = randomString(), msg2 = randomString();
|
339 |
|
340 | return doAll(
|
341 | ch.assertQueue(q, QUEUE_OPTS),
|
342 | ch.purgeQueue(q))
|
343 | .then(function() {
|
344 | ch.sendToQueue(q, new Buffer(msg1));
|
345 | ch.sendToQueue(q, new Buffer(msg2));
|
346 | return waitForMessages(q, 2);
|
347 | })
|
348 | .then(function() {
|
349 | return ch.get(q, {noAck: false})
|
350 | })
|
351 | .then(function(m) {
|
352 | assert.equal(msg1, m.content.toString());
|
353 | ch.ack(m);
|
354 |
|
355 |
|
356 | return ch.get(q);
|
357 | })
|
358 | .then(function(m) {
|
359 | assert(m);
|
360 | assert.equal(msg2, m.content.toString());
|
361 | });
|
362 | });
|
363 |
|
364 |
|
365 |
|
366 | chtest("nack", function(ch) {
|
367 | var q = 'test.nack';
|
368 | var msg1 = randomString();
|
369 |
|
370 | return doAll(
|
371 | ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
|
372 | .then(function() {
|
373 | ch.sendToQueue(q, new Buffer(msg1));
|
374 | return waitForMessages(q);})
|
375 | .then(function() {
|
376 | return ch.get(q, {noAck: false})})
|
377 | .then(function(m) {
|
378 | assert.equal(msg1, m.content.toString());
|
379 | ch.nack(m);
|
380 | return waitForMessages(q);})
|
381 | .then(function() {
|
382 | return ch.get(q);})
|
383 | .then(function(m) {
|
384 | assert(m);
|
385 | assert.equal(msg1, m.content.toString());
|
386 | });
|
387 | });
|
388 |
|
389 | chtest("prefetch", function(ch) {
|
390 | var q = 'test.prefetch';
|
391 | return doAll(
|
392 | ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),
|
393 | ch.prefetch(1))
|
394 | .then(function() {
|
395 | ch.sendToQueue(q, new Buffer('foobar'));
|
396 | ch.sendToQueue(q, new Buffer('foobar'));
|
397 | return waitForMessages(q, 2);
|
398 | })
|
399 | .then(function() {
|
400 | var first = defer();
|
401 | return doAll(
|
402 | ch.consume(q, function(m) {
|
403 | first.resolve(m);
|
404 | }, {noAck: false}),
|
405 | first.promise.then(function(m) {
|
406 | first = defer();
|
407 | ch.ack(m);
|
408 | return first.promise.then(function(m) {
|
409 | ch.ack(m);
|
410 | })
|
411 | }));
|
412 | });
|
413 | });
|
414 |
|
415 | });
|
416 |
|
417 | confirmtest = channel_test.bind(null, 'createConfirmChannel');
|
418 |
|
419 | suite("confirms", function() {
|
420 |
|
421 | confirmtest('message is confirmed', function(ch) {
|
422 | var q = 'test.confirm-message';
|
423 | return doAll(
|
424 | ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
|
425 | .then(function() {
|
426 | return ch.sendToQueue(q, new Buffer('bleep'));
|
427 | });
|
428 | });
|
429 |
|
430 |
|
431 |
|
432 |
|
433 |
|
434 |
|
435 | confirmtest('multiple confirms', function(ch) {
|
436 | var q = 'test.multiple-confirms';
|
437 | return doAll(
|
438 | ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))
|
439 | .then(function() {
|
440 | var multipleRainbows = false;
|
441 | ch.on('ack', function(a) {
|
442 | if (a.multiple) multipleRainbows = true;
|
443 | });
|
444 |
|
445 | function prod(num) {
|
446 | var cs = [];
|
447 | for (var i=0; i < num; i++)
|
448 | cs.push(ch.sendToQueue(q, new Buffer('bleep')));
|
449 | return when.all(cs).then(function() {
|
450 | if (multipleRainbows) return true;
|
451 | else if (num > 500) throw new Error(
|
452 | "Couldn't provoke the server" +
|
453 | "into multi-acking with " + num +
|
454 | " messages; giving up");
|
455 | else {
|
456 |
|
457 | return prod(num * 2);
|
458 | }
|
459 | });
|
460 | }
|
461 | return prod(5);
|
462 | });
|
463 | });
|
464 |
|
465 | });
|