1 |
|
2 | chai = require 'chai' unless chai
|
3 | path = require 'path'
|
4 | async = require 'async'
|
5 |
|
6 | transport = require '../src/transport'
|
7 | common = require '../src/common'
|
8 |
|
9 |
|
10 | transports =
|
11 | 'direct': 'direct://broker2'
|
12 | 'MQTT': 'mqtt://localhost'
|
13 | 'AMQP': 'amqp://localhost'
|
14 |
|
15 | randomString = (n) ->
|
16 | text = ""
|
17 | possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
18 | for i in [0...n]
|
19 | idx = Math.floor Math.random()*possible.length
|
20 | text += possible.charAt idx
|
21 | return text
|
22 |
|
23 | zip = () ->
|
24 | lengthArray = (arr.length for arr in arguments)
|
25 | length = Math.min(lengthArray...)
|
26 | for i in [0...length]
|
27 | arr[i] for arr in arguments
|
28 |
|
29 |
|
30 | createConnectClients = (address, names, callback) ->
|
31 | createConnect = (name, cb) ->
|
32 | client = transport.getClient address
|
33 | client.connect (err) ->
|
34 | cb err, client
|
35 |
|
36 | async.map names, createConnect, (err, clients) ->
|
37 | return callback err if err
|
38 | ret = {}
|
39 | for nc in zip names, clients
|
40 | ret[nc[0]] = nc[1]
|
41 | return callback null, ret
|
42 |
|
43 | createQueues = (queueMapping, callback) ->
|
44 | createQueue = (det, cb) ->
|
45 | [client, type, queueName] = det
|
46 | client.removeQueue type, queueName, (err) ->
|
47 | return cb err if err
|
48 | client.createQueue type, queueName, cb
|
49 |
|
50 | async.map queueMapping, createQueue, callback
|
51 |
|
52 | createBindQueues = (broker, queueMapping, callback) ->
|
53 | createBindQueue = (det, cb) ->
|
54 | [client, type, srcQ, tgtQ] = det
|
55 | createQ = if type == 'outqueue' then srcQ else tgtQ
|
56 | client.removeQueue type, createQ, (err) ->
|
57 | return cb err if err
|
58 | client.createQueue type, createQ, (err) ->
|
59 | return cb err if err
|
60 | broker.addBinding {type:'pubsub', src:srcQ, tgt:tgtQ}, cb
|
61 |
|
62 | async.map queueMapping, createBindQueue, callback
|
63 |
|
64 | sendPackets = (packets, callback) ->
|
65 | send = (p, cb) ->
|
66 | [client, queue, data] = p
|
67 | client.sendTo 'outqueue', queue, data, cb
|
68 |
|
69 | async.map packets, send, callback
|
70 |
|
71 | subscribeData = (handlers, callback) ->
|
72 | sub = (h, cb) ->
|
73 | [client, queue, handler] = h
|
74 | ackHandler = (msg) ->
|
75 | client.ackMessage msg
|
76 | return handler msg
|
77 | client.subscribeToQueue queue, ackHandler, cb
|
78 |
|
79 | async.map handlers, sub, callback
|
80 |
|
81 | subscribeDataNoAck = (handlers, callback) ->
|
82 | sub = (h, cb) ->
|
83 | [client, queue, handler] = h
|
84 | client.subscribeToQueue queue, handler, cb
|
85 | async.map handlers, sub, callback
|
86 |
|
87 |
|
88 | setupBindings = (broker, bindings, callback) ->
|
89 | send = (b, cb) ->
|
90 | broker.addBinding b, cb
|
91 |
|
92 | async.map bindings, send, callback
|
93 |
|
94 |
|
95 |
|
96 |
|
97 | transportTests = (type) ->
|
98 | address = transports[type]
|
99 | broker = null
|
100 |
|
101 | describeIfRoundRobinSupport = if type == 'AMQP' then describe else describe.skip
|
102 |
|
103 | beforeEach (done) ->
|
104 | broker = transport.getBroker address
|
105 | broker.connect (err) ->
|
106 | err = null if not err?
|
107 | chai.expect(err).to.be.a 'null'
|
108 | done()
|
109 |
|
110 | afterEach (done) ->
|
111 | broker.disconnect () ->
|
112 | broker = null
|
113 | done()
|
114 |
|
115 | describe 'starting client', ->
|
116 | it 'should not error', (done) ->
|
117 | clientA = transport.getClient address
|
118 | clientA.connect (err) ->
|
119 | done err
|
120 |
|
121 | describe 'outqueue without subscribers', ->
|
122 | it 'sending should not error', (done) ->
|
123 | payload = { foo: 'bar91' }
|
124 | outQueue = 'myoutqueue3344'
|
125 | createConnectClients address, ['sender'], (err, clients) ->
|
126 | createQueues [
|
127 | [ clients.sender, 'outqueue', outQueue ]
|
128 | ], (err) ->
|
129 | chai.expect(err).to.not.exist
|
130 |
|
131 | clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
|
132 | chai.expect(err).to.not.exist
|
133 | done()
|
134 |
|
135 | describe 'inqueue==outqueue without binding', ->
|
136 | it 'sending should be received on other end', (done) ->
|
137 | payload = { foo: 'bar91' }
|
138 | sharedQueue = 'myqueue33'
|
139 | onReceive = (msg) ->
|
140 | chai.expect(msg).to.include.keys 'data'
|
141 | chai.expect(msg.data).to.eql payload
|
142 | done()
|
143 | createConnectClients address, ['sender', 'receiver'], (err, clients) ->
|
144 | createQueues [
|
145 | [ clients.receiver, 'inqueue', sharedQueue ]
|
146 | [ clients.sender, 'outqueue', sharedQueue ]
|
147 | ], (err) ->
|
148 | chai.expect(err).to.not.exist
|
149 |
|
150 | clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
|
151 | chai.expect(err).to.be.a 'null'
|
152 | clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
|
153 | chai.expect(err).to.be.a 'null'
|
154 |
|
155 |
|
156 | describe 'inqueue==outqueue with binding', ->
|
157 | it 'sending should be received on other end', (done) ->
|
158 | payload = { foo: 'bar92' }
|
159 | sharedQueue = 'myqueue35'
|
160 | onReceive = (msg) ->
|
161 | chai.expect(msg).to.include.keys 'data'
|
162 | chai.expect(msg.data).to.eql payload
|
163 | done()
|
164 | createConnectClients address, ['sender', 'receiver'], (err, clients) ->
|
165 | createQueues [
|
166 | [ clients.receiver, 'inqueue', sharedQueue ]
|
167 | [ clients.sender, 'outqueue', sharedQueue ]
|
168 | ], (err) ->
|
169 | chai.expect(err).to.not.exist
|
170 |
|
171 | broker.addBinding {type:'pubsub', src:sharedQueue, tgt:sharedQueue}, (err) ->
|
172 | chai.expect(err).to.be.a 'null'
|
173 |
|
174 | clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
|
175 | chai.expect(err).to.be.a 'null'
|
176 | clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
|
177 | chai.expect(err).to.be.a 'null'
|
178 |
|
179 |
|
180 | describe 'outqueue bound to inqueue', ->
|
181 | it 'sending to inqueue, show up on outqueue', (done) ->
|
182 | payload = { foo: 'bar99' }
|
183 | inQueue = 'inqueue232'
|
184 | outQueue = 'outqueue353'
|
185 | createConnectClients address, ['sender', 'receiver'], (err, clients) ->
|
186 | createQueues [
|
187 | [ clients.receiver, 'inqueue', inQueue ]
|
188 | [ clients.sender, 'outqueue', outQueue ]
|
189 | ], (err) ->
|
190 | chai.expect(err).to.not.exist
|
191 |
|
192 | onReceive = (msg) ->
|
193 | clients.receiver.ackMessage msg
|
194 | chai.expect(msg).to.include.keys 'data'
|
195 | chai.expect(msg.data).to.eql payload
|
196 | done()
|
197 |
|
198 | broker.addBinding {type:'pubsub', src:outQueue, tgt:inQueue}, (err) ->
|
199 | chai.expect(err).to.be.a 'null'
|
200 |
|
201 | clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
|
202 | chai.expect(err).to.be.a 'null'
|
203 | clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
|
204 | chai.expect(err).to.be.a 'null'
|
205 |
|
206 | describe 'outqueue bound to inqueue then removed', ->
|
207 | it 'sending to inqueue, show up on outqueue', (done) ->
|
208 | payload = { foo: 'bar922' }
|
209 | inQueue = 'inqueue922'
|
210 | outQueue = 'outqueue922'
|
211 | createConnectClients address, ['sender', 'receiver'], (err, clients) ->
|
212 | createQueues [
|
213 | [ clients.receiver, 'inqueue', inQueue ]
|
214 | [ clients.sender, 'outqueue', outQueue ]
|
215 | ], (err) ->
|
216 | chai.expect(err).to.not.exist
|
217 |
|
218 | binding = { type:'pubsub', src:outQueue, tgt:inQueue }
|
219 | bindingRemoved = false
|
220 |
|
221 | onReceive = (msg) ->
|
222 | if bindingRemoved
|
223 | done new Error "Received data on removed binding"
|
224 | done = null
|
225 | return
|
226 |
|
227 | clients.receiver.ackMessage msg
|
228 | chai.expect(msg).to.include.keys 'data'
|
229 | chai.expect(msg.data).to.eql payload
|
230 | bindingRemoved = true
|
231 | broker.removeBinding binding, (err) ->
|
232 | chai.expect(err).to.be.a 'null'
|
233 | clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
|
234 | chai.expect(err).to.be.a 'null'
|
235 | setTimeout () ->
|
236 | done null if done
|
237 | done = null
|
238 | return
|
239 | , 300
|
240 |
|
241 | clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
|
242 | chai.expect(err).to.be.a 'null'
|
243 | broker.addBinding binding, (err) ->
|
244 | chai.expect(err).to.be.a 'null'
|
245 | clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
|
246 | chai.expect(err).to.be.a 'null'
|
247 |
|
248 |
|
249 | describe 'multiple outqueues bound to one inqueue', ->
|
250 | it 'all sent on outqueues shows up on inqueue', (done) ->
|
251 | @timeout 3000
|
252 | senders = [ 'sendA', 'sendB', 'sendC' ]
|
253 | clientNames = ['receive']
|
254 | clientNames.push.apply clientNames, senders
|
255 | createConnectClients address, clientNames, (err, clients) ->
|
256 | chai.expect(err).to.be.a 'null'
|
257 |
|
258 | expect = [ {name:'sendA'}, {name:'sendB'}, {name:'sendC'} ]
|
259 |
|
260 | received = []
|
261 | onReceive = (msg) ->
|
262 | clients.receive.ackMessage msg
|
263 | chai.expect(msg).to.include.keys 'data'
|
264 | received.push msg.data
|
265 | if received.length == expect.length
|
266 | received.sort (a,b) ->
|
267 | return -1 if a.name < b.name
|
268 | return 1 if a.name > b.name
|
269 | return 0
|
270 | chai.expect(received).to.eql expect
|
271 | done()
|
272 |
|
273 | inQueue = 'inqueue27'
|
274 |
|
275 | createQueues [ [ clients.receive, 'inqueue', inQueue] ], (err) ->
|
276 | chai.expect(err).to.not.exist
|
277 | clients.receive.subscribeToQueue inQueue, onReceive, (err) ->
|
278 | chai.expect(err).to.not.exist
|
279 |
|
280 |
|
281 | queueMapping = []
|
282 | for name in senders
|
283 | queueMapping.push [ clients[name], 'outqueue', name, inQueue ]
|
284 | createBindQueues broker, queueMapping, (err) ->
|
285 | chai.expect(err).to.not.exist
|
286 |
|
287 | packets = []
|
288 | for name in senders
|
289 | packets.push [ clients[name], name, { name: name } ]
|
290 | sendPackets packets, (err) ->
|
291 | chai.expect(err).to.not.exist
|
292 |
|
293 |
|
294 | describe 'multiple inqueues bound to one outqueue', ->
|
295 | it 'data sent on outqueue shows up on all inqueues', (done) ->
|
296 | @timeout 3000
|
297 | senders = [ 'sender' ]
|
298 | receivers = ['r1', 'r2', 'r3']
|
299 | clientNames = common.clone receivers
|
300 | clientNames.push.apply clientNames, senders
|
301 | createConnectClients address, clientNames, (err, clients) ->
|
302 | chai.expect(err).to.not.exist
|
303 |
|
304 | expect = [ {q:'r1',d:'ident'}, {q:'r2',d:'ident'}, {q:'r3',d:'ident'} ]
|
305 |
|
306 | received = []
|
307 | checkExpected = (q, msg) ->
|
308 | received.push { q: q, d: msg.data.data }
|
309 | if received.length == expect.length
|
310 | received.sort (a,b) ->
|
311 | return -1 if a.q < b.q
|
312 | return 1 if a.q > b.q
|
313 | return 0
|
314 | chai.expect(received).to.eql expect
|
315 | done()
|
316 |
|
317 | onReceives =
|
318 | r1: (msg) -> checkExpected 'r1', msg
|
319 | r2: (msg) -> checkExpected 'r2', msg
|
320 | r3: (msg) -> checkExpected 'r3', msg
|
321 |
|
322 | outQueue2 = 'outqueue39'
|
323 | createQueues [ [clients.sender, 'outqueue', outQueue2] ], (err) ->
|
324 | chai.expect(err).to.not.exist
|
325 |
|
326 |
|
327 | queueMapping = []
|
328 | for name in receivers
|
329 | queueMapping.push [ clients[name], 'inqueue', outQueue2, name ]
|
330 | createBindQueues broker, queueMapping, (err) ->
|
331 | chai.expect(err).to.not.exist
|
332 |
|
333 | handlers = []
|
334 | for name in receivers
|
335 | handlers.push [ clients[name], name, onReceives[name] ]
|
336 | subscribeData handlers, (err) ->
|
337 | chai.expect(err).to.not.exist
|
338 | clients.sender.sendTo 'outqueue', outQueue2, {data: 'ident'}, (err) ->
|
339 | chai.expect(err).to.not.exist
|
340 |
|
341 | describeIfRoundRobinSupport 'Roundrobin binding', ->
|
342 | describe 'sending ACKed message, then NACKed message', ->
|
343 | received = null
|
344 | beforeEach (done) ->
|
345 | received = { worker1: [], worker2: [], deadletter: [] }
|
346 | r = randomString '3'
|
347 | outq =
|
348 | sender: 'outQ-'+r
|
349 | inq =
|
350 | worker1: 'workerQ-'+r
|
351 | worker2: 'workerQ-'+r
|
352 | deadletter: 'deadletterQ-'+r
|
353 | clientNames = Object.keys inq
|
354 | clientNames = clientNames.concat Object.keys(outq)
|
355 | createConnectClients address, clientNames, (err, clients) ->
|
356 | chai.expect(err).to.not.exist
|
357 |
|
358 | queues = []
|
359 | for clientName, queueName of outq
|
360 | queues.push [ clients[clientName], 'outqueue', queueName ]
|
361 | for clientName, queueName of inq
|
362 | queues.push [ clients[clientName], 'inqueue', queueName ]
|
363 |
|
364 | createQueues queues, (err) ->
|
365 | chai.expect(err).to.not.exist
|
366 |
|
367 | bindings = [
|
368 | { type: 'roundrobin', tgt: inq.worker1, deadletter: inq.deadletter }
|
369 | { type: 'roundrobin', src: outq.sender, tgt: inq.worker1 }
|
370 | ]
|
371 | setupBindings broker, bindings, (err) ->
|
372 | chai.expect(err).to.not.exist
|
373 |
|
374 |
|
375 | ackFunc = (data) ->
|
376 | return 'nackMessage' if data.foo == 'nack'
|
377 | return 'ackMessage'
|
378 | onReceives =
|
379 | worker1: (msg) ->
|
380 | received.worker1.push msg.data
|
381 | clients.worker1[ackFunc(msg.data)] msg, () ->
|
382 | worker2: (msg) ->
|
383 | received.worker2.push msg.data
|
384 | clients.worker2[ackFunc(msg.data)] msg, () ->
|
385 | deadletter: (msg) ->
|
386 | received.deadletter.push msg.data
|
387 | clients.deadletter.ackMessage msg, () ->
|
388 | done()
|
389 | handlers = []
|
390 | for name in Object.keys inq
|
391 | handlers.push [ clients[name], inq[name], onReceives[name] ]
|
392 | subscribeDataNoAck handlers, (err) ->
|
393 | chai.expect(err).to.not.exist
|
394 |
|
395 | packets = [
|
396 | [ clients.sender, outq.sender, {foo: 'ack'} ]
|
397 | [ clients.sender, outq.sender, {foo: 'nack'} ]
|
398 | ]
|
399 | sendPackets packets, (err) ->
|
400 | chai.expect(err).to.not.exist
|
401 |
|
402 | it 'each message is only sent to one worker', () ->
|
403 | workerData = received.worker1.concat received.worker2
|
404 | chai.expect(workerData).to.have.length 2
|
405 | it 'only NACKed message is sent to deadletter', ->
|
406 | chai.expect(received.deadletter).to.eql [ { foo: 'nack'} ]
|
407 |
|
408 | describe 'subscribing to bound topics', ->
|
409 | sendQueue = 'sub-send-36'
|
410 | receiveQueue = 'sub-receive-36'
|
411 | binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
|
412 | connectionData = []
|
413 | clients = null
|
414 |
|
415 |
|
416 | setup = (done) ->
|
417 | createConnectClients address, ['sender', 'receiver'], (err, c) ->
|
418 | clients = c
|
419 | createQueues [
|
420 | [ clients.receiver, 'inqueue', receiveQueue ]
|
421 | [ clients.sender, 'outqueue', sendQueue ]
|
422 | ], (err) ->
|
423 | chai.expect(err).to.not.exist
|
424 | broker.addBinding binding, (err) ->
|
425 | chai.expect(err).to.be.a 'null'
|
426 | return done null
|
427 |
|
428 | it 'should provide data sent on connection', (done) ->
|
429 | payloads =
|
430 | one: { foo: 'sub-96' }
|
431 | two: { bar: 'sub-97' }
|
432 |
|
433 | onData = (bind, data) ->
|
434 | chai.expect(bind.src).to.equal binding.src
|
435 | chai.expect(bind.tgt).to.equal binding.tgt
|
436 | connectionData.push data
|
437 |
|
438 | if connectionData.length == 2
|
439 | [one, two] = connectionData
|
440 | chai.expect(one).to.eql payloads.one
|
441 | chai.expect(two).to.eql payloads.two
|
442 | return done null
|
443 | else if connectionData.length > 2
|
444 | return done new Error "Got more data than expected"
|
445 |
|
446 | setup (err) ->
|
447 | return done err if err
|
448 | broker.subscribeData binding, onData, (err) ->
|
449 | return done err if err
|
450 | clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
|
451 | return done err if err
|
452 | clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
|
453 | return done err if err
|
454 |
|
455 | describe 'subscribing to binding with srcQueue==tgtQueue', ->
|
456 | sendQueue = 'sub-shared-37'
|
457 | receiveQueue = sendQueue
|
458 | binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
|
459 | connectionData = []
|
460 | clients = null
|
461 |
|
462 |
|
463 | setup = (done) ->
|
464 | createConnectClients address, ['sender', 'receiver'], (err, c) ->
|
465 | clients = c
|
466 | createQueues [
|
467 | [ clients.receiver, 'inqueue', receiveQueue ]
|
468 | [ clients.sender, 'outqueue', sendQueue ]
|
469 | ], (err) ->
|
470 | chai.expect(err).to.not.exist
|
471 | broker.addBinding binding, (err) ->
|
472 | chai.expect(err).to.be.a 'null'
|
473 | return done null
|
474 |
|
475 | it 'should provide data sent on connection', (done) ->
|
476 | payloads =
|
477 | one: { foo: 'sub-106' }
|
478 | two: { bar: 'sub-107' }
|
479 |
|
480 | onData = (bind, data) ->
|
481 | chai.expect(bind.src).to.equal binding.src
|
482 | chai.expect(bind.tgt).to.equal binding.tgt
|
483 | connectionData.push data
|
484 |
|
485 | if connectionData.length == 2
|
486 | [one, two] = connectionData
|
487 | chai.expect(one).to.eql payloads.one
|
488 | chai.expect(two).to.eql payloads.two
|
489 | return done null
|
490 | else if connectionData.length > 2
|
491 | return done new Error "Got more data than expected"
|
492 |
|
493 | setup (err) ->
|
494 | return done err if err
|
495 | broker.subscribeData binding, onData, (err) ->
|
496 | return done err if err
|
497 | clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
|
498 | return done err if err
|
499 | clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
|
500 | return done err if err
|
501 |
|
502 | describe 'Transport', ->
|
503 | Object.keys(transports).forEach (type) =>
|
504 | describe "#{type}", () ->
|
505 | transportTests type
|
506 |
|