UNPKG

17.9 kBtext/coffeescriptView Raw
1
2chai = require 'chai' unless chai
3path = require 'path'
4async = require 'async'
5
6transport = require '../src/transport'
7common = require '../src/common'
8
9# Note: most require running an external broker service
10transports =
11 'direct': 'direct://broker2'
12 'MQTT': 'mqtt://localhost'
13 'AMQP': 'amqp://localhost'
14
15randomString = (n) ->
16 text = ""
17 possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
18 for i in [0...n]
19 idx = Math.floor Math.random()*possible.length
20 text += possible.charAt idx
21 return text
22
23zip = () ->
24 lengthArray = (arr.length for arr in arguments)
25 length = Math.min(lengthArray...)
26 for i in [0...length]
27 arr[i] for arr in arguments
28
29#
30createConnectClients = (address, names, callback) ->
31 createConnect = (name, cb) ->
32 client = transport.getClient address
33 client.connect (err) ->
34 cb err, client
35
36 async.map names, createConnect, (err, clients) ->
37 return callback err if err
38 ret = {}
39 for nc in zip names, clients
40 ret[nc[0]] = nc[1]
41 return callback null, ret
42
43createQueues = (queueMapping, callback) ->
44 createQueue = (det, cb) ->
45 [client, type, queueName] = det
46 client.removeQueue type, queueName, (err) ->
47 return cb err if err
48 client.createQueue type, queueName, cb
49
50 async.map queueMapping, createQueue, callback
51
52createBindQueues = (broker, queueMapping, callback) ->
53 createBindQueue = (det, cb) ->
54 [client, type, srcQ, tgtQ] = det
55 createQ = if type == 'outqueue' then srcQ else tgtQ
56 client.removeQueue type, createQ, (err) ->
57 return cb err if err
58 client.createQueue type, createQ, (err) ->
59 return cb err if err
60 broker.addBinding {type:'pubsub', src:srcQ, tgt:tgtQ}, cb
61
62 async.map queueMapping, createBindQueue, callback
63
64sendPackets = (packets, callback) ->
65 send = (p, cb) ->
66 [client, queue, data] = p
67 client.sendTo 'outqueue', queue, data, cb
68
69 async.map packets, send, callback
70
71subscribeData = (handlers, callback) ->
72 sub = (h, cb) ->
73 [client, queue, handler] = h
74 ackHandler = (msg) ->
75 client.ackMessage msg
76 return handler msg
77 client.subscribeToQueue queue, ackHandler, cb
78
79 async.map handlers, sub, callback
80
81subscribeDataNoAck = (handlers, callback) ->
82 sub = (h, cb) ->
83 [client, queue, handler] = h
84 client.subscribeToQueue queue, handler, cb
85 async.map handlers, sub, callback
86
87
88setupBindings = (broker, bindings, callback) ->
89 send = (b, cb) ->
90 broker.addBinding b, cb
91
92 async.map bindings, send, callback
93# End utils
94
95
96# Tests
97transportTests = (type) ->
98 address = transports[type]
99 broker = null
100
101 describeIfRoundRobinSupport = if type == 'AMQP' then describe else describe.skip
102
103 beforeEach (done) ->
104 broker = transport.getBroker address
105 broker.connect (err) ->
106 err = null if not err?
107 chai.expect(err).to.be.a 'null'
108 done()
109
110 afterEach (done) ->
111 broker.disconnect () ->
112 broker = null
113 done()
114
115 describe 'starting client', ->
116 it 'should not error', (done) ->
117 clientA = transport.getClient address
118 clientA.connect (err) ->
119 done err
120
121 describe 'outqueue without subscribers', ->
122 it 'sending should not error', (done) ->
123 payload = { foo: 'bar91' }
124 outQueue = 'myoutqueue3344'
125 createConnectClients address, ['sender'], (err, clients) ->
126 createQueues [
127 [ clients.sender, 'outqueue', outQueue ]
128 ], (err) ->
129 chai.expect(err).to.not.exist
130
131 clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
132 chai.expect(err).to.not.exist
133 done()
134
135 describe 'inqueue==outqueue without binding', ->
136 it 'sending should be received on other end', (done) ->
137 payload = { foo: 'bar91' }
138 sharedQueue = 'myqueue33'
139 onReceive = (msg) ->
140 chai.expect(msg).to.include.keys 'data'
141 chai.expect(msg.data).to.eql payload
142 done()
143 createConnectClients address, ['sender', 'receiver'], (err, clients) ->
144 createQueues [
145 [ clients.receiver, 'inqueue', sharedQueue ]
146 [ clients.sender, 'outqueue', sharedQueue ]
147 ], (err) ->
148 chai.expect(err).to.not.exist
149
150 clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
151 chai.expect(err).to.be.a 'null'
152 clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
153 chai.expect(err).to.be.a 'null'
154
155
156 describe 'inqueue==outqueue with binding', ->
157 it 'sending should be received on other end', (done) ->
158 payload = { foo: 'bar92' }
159 sharedQueue = 'myqueue35'
160 onReceive = (msg) ->
161 chai.expect(msg).to.include.keys 'data'
162 chai.expect(msg.data).to.eql payload
163 done()
164 createConnectClients address, ['sender', 'receiver'], (err, clients) ->
165 createQueues [
166 [ clients.receiver, 'inqueue', sharedQueue ]
167 [ clients.sender, 'outqueue', sharedQueue ]
168 ], (err) ->
169 chai.expect(err).to.not.exist
170
171 broker.addBinding {type:'pubsub', src:sharedQueue, tgt:sharedQueue}, (err) ->
172 chai.expect(err).to.be.a 'null'
173
174 clients.receiver.subscribeToQueue sharedQueue, onReceive, (err) ->
175 chai.expect(err).to.be.a 'null'
176 clients.sender.sendTo 'outqueue', sharedQueue, payload, (err) ->
177 chai.expect(err).to.be.a 'null'
178
179
180 describe 'outqueue bound to inqueue', ->
181 it 'sending to inqueue, show up on outqueue', (done) ->
182 payload = { foo: 'bar99' }
183 inQueue = 'inqueue232'
184 outQueue = 'outqueue353'
185 createConnectClients address, ['sender', 'receiver'], (err, clients) ->
186 createQueues [
187 [ clients.receiver, 'inqueue', inQueue ]
188 [ clients.sender, 'outqueue', outQueue ]
189 ], (err) ->
190 chai.expect(err).to.not.exist
191
192 onReceive = (msg) ->
193 clients.receiver.ackMessage msg
194 chai.expect(msg).to.include.keys 'data'
195 chai.expect(msg.data).to.eql payload
196 done()
197
198 broker.addBinding {type:'pubsub', src:outQueue, tgt:inQueue}, (err) ->
199 chai.expect(err).to.be.a 'null'
200
201 clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
202 chai.expect(err).to.be.a 'null'
203 clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
204 chai.expect(err).to.be.a 'null'
205
206 describe 'outqueue bound to inqueue then removed', ->
207 it 'sending to inqueue, show up on outqueue', (done) ->
208 payload = { foo: 'bar922' }
209 inQueue = 'inqueue922'
210 outQueue = 'outqueue922'
211 createConnectClients address, ['sender', 'receiver'], (err, clients) ->
212 createQueues [
213 [ clients.receiver, 'inqueue', inQueue ]
214 [ clients.sender, 'outqueue', outQueue ]
215 ], (err) ->
216 chai.expect(err).to.not.exist
217
218 binding = { type:'pubsub', src:outQueue, tgt:inQueue }
219 bindingRemoved = false
220
221 onReceive = (msg) ->
222 if bindingRemoved
223 done new Error "Received data on removed binding"
224 done = null
225 return
226
227 clients.receiver.ackMessage msg
228 chai.expect(msg).to.include.keys 'data'
229 chai.expect(msg.data).to.eql payload
230 bindingRemoved = true
231 broker.removeBinding binding, (err) ->
232 chai.expect(err).to.be.a 'null'
233 clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
234 chai.expect(err).to.be.a 'null'
235 setTimeout () ->
236 done null if done
237 done = null
238 return
239 , 300
240
241 clients.receiver.subscribeToQueue inQueue, onReceive, (err) ->
242 chai.expect(err).to.be.a 'null'
243 broker.addBinding binding, (err) ->
244 chai.expect(err).to.be.a 'null'
245 clients.sender.sendTo 'outqueue', outQueue, payload, (err) ->
246 chai.expect(err).to.be.a 'null'
247
248
249 describe 'multiple outqueues bound to one inqueue', ->
250 it 'all sent on outqueues shows up on inqueue', (done) ->
251 @timeout 3000
252 senders = [ 'sendA', 'sendB', 'sendC' ]
253 clientNames = ['receive']
254 clientNames.push.apply clientNames, senders
255 createConnectClients address, clientNames, (err, clients) ->
256 chai.expect(err).to.be.a 'null'
257
258 expect = [ {name:'sendA'}, {name:'sendB'}, {name:'sendC'} ]
259
260 received = []
261 onReceive = (msg) ->
262 clients.receive.ackMessage msg
263 chai.expect(msg).to.include.keys 'data'
264 received.push msg.data
265 if received.length == expect.length
266 received.sort (a,b) ->
267 return -1 if a.name < b.name
268 return 1 if a.name > b.name
269 return 0
270 chai.expect(received).to.eql expect
271 done()
272
273 inQueue = 'inqueue27'
274
275 createQueues [ [ clients.receive, 'inqueue', inQueue] ], (err) ->
276 chai.expect(err).to.not.exist
277 clients.receive.subscribeToQueue inQueue, onReceive, (err) ->
278 chai.expect(err).to.not.exist
279
280 # Bind all outqueues to same inqueue
281 queueMapping = []
282 for name in senders
283 queueMapping.push [ clients[name], 'outqueue', name, inQueue ]
284 createBindQueues broker, queueMapping, (err) ->
285 chai.expect(err).to.not.exist
286
287 packets = []
288 for name in senders
289 packets.push [ clients[name], name, { name: name } ]
290 sendPackets packets, (err) ->
291 chai.expect(err).to.not.exist
292
293
294 describe 'multiple inqueues bound to one outqueue', ->
295 it 'data sent on outqueue shows up on all inqueues', (done) ->
296 @timeout 3000
297 senders = [ 'sender' ]
298 receivers = ['r1', 'r2', 'r3']
299 clientNames = common.clone receivers
300 clientNames.push.apply clientNames, senders
301 createConnectClients address, clientNames, (err, clients) ->
302 chai.expect(err).to.not.exist
303
304 expect = [ {q:'r1',d:'ident'}, {q:'r2',d:'ident'}, {q:'r3',d:'ident'} ]
305
306 received = []
307 checkExpected = (q, msg) ->
308 received.push { q: q, d: msg.data.data }
309 if received.length == expect.length
310 received.sort (a,b) ->
311 return -1 if a.q < b.q
312 return 1 if a.q > b.q
313 return 0
314 chai.expect(received).to.eql expect
315 done()
316
317 onReceives =
318 r1: (msg) -> checkExpected 'r1', msg
319 r2: (msg) -> checkExpected 'r2', msg
320 r3: (msg) -> checkExpected 'r3', msg
321
322 outQueue2 = 'outqueue39'
323 createQueues [ [clients.sender, 'outqueue', outQueue2] ], (err) ->
324 chai.expect(err).to.not.exist
325
326 # Bind same outqueue to all inqueues
327 queueMapping = []
328 for name in receivers
329 queueMapping.push [ clients[name], 'inqueue', outQueue2, name ]
330 createBindQueues broker, queueMapping, (err) ->
331 chai.expect(err).to.not.exist
332
333 handlers = []
334 for name in receivers
335 handlers.push [ clients[name], name, onReceives[name] ]
336 subscribeData handlers, (err) ->
337 chai.expect(err).to.not.exist
338 clients.sender.sendTo 'outqueue', outQueue2, {data: 'ident'}, (err) ->
339 chai.expect(err).to.not.exist
340
341 describeIfRoundRobinSupport 'Roundrobin binding', ->
342 describe 'sending ACKed message, then NACKed message', ->
343 received = null
344 beforeEach (done) ->
345 received = { worker1: [], worker2: [], deadletter: [] }
346 r = randomString '3'
347 outq =
348 sender: 'outQ-'+r
349 inq =
350 worker1: 'workerQ-'+r
351 worker2: 'workerQ-'+r
352 deadletter: 'deadletterQ-'+r
353 clientNames = Object.keys inq
354 clientNames = clientNames.concat Object.keys(outq)
355 createConnectClients address, clientNames, (err, clients) ->
356 chai.expect(err).to.not.exist
357
358 queues = []
359 for clientName, queueName of outq
360 queues.push [ clients[clientName], 'outqueue', queueName ]
361 for clientName, queueName of inq
362 queues.push [ clients[clientName], 'inqueue', queueName ]
363
364 createQueues queues, (err) ->
365 chai.expect(err).to.not.exist
366
367 bindings = [
368 { type: 'roundrobin', tgt: inq.worker1, deadletter: inq.deadletter }
369 { type: 'roundrobin', src: outq.sender, tgt: inq.worker1 }
370 ]
371 setupBindings broker, bindings, (err) ->
372 chai.expect(err).to.not.exist
373
374 # Setup queue subscribers
375 ackFunc = (data) ->
376 return 'nackMessage' if data.foo == 'nack'
377 return 'ackMessage'
378 onReceives =
379 worker1: (msg) ->
380 received.worker1.push msg.data
381 clients.worker1[ackFunc(msg.data)] msg, () ->
382 worker2: (msg) ->
383 received.worker2.push msg.data
384 clients.worker2[ackFunc(msg.data)] msg, () ->
385 deadletter: (msg) ->
386 received.deadletter.push msg.data
387 clients.deadletter.ackMessage msg, () ->
388 done()
389 handlers = []
390 for name in Object.keys inq
391 handlers.push [ clients[name], inq[name], onReceives[name] ]
392 subscribeDataNoAck handlers, (err) ->
393 chai.expect(err).to.not.exist
394
395 packets = [
396 [ clients.sender, outq.sender, {foo: 'ack'} ]
397 [ clients.sender, outq.sender, {foo: 'nack'} ]
398 ]
399 sendPackets packets, (err) ->
400 chai.expect(err).to.not.exist
401
402 it 'each message is only sent to one worker', () ->
403 workerData = received.worker1.concat received.worker2
404 chai.expect(workerData).to.have.length 2
405 it 'only NACKed message is sent to deadletter', ->
406 chai.expect(received.deadletter).to.eql [ { foo: 'nack'} ]
407
408 describe 'subscribing to bound topics', ->
409 sendQueue = 'sub-send-36'
410 receiveQueue = 'sub-receive-36'
411 binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
412 connectionData = []
413 clients = null
414
415 # Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
416 setup = (done) ->
417 createConnectClients address, ['sender', 'receiver'], (err, c) ->
418 clients = c
419 createQueues [
420 [ clients.receiver, 'inqueue', receiveQueue ]
421 [ clients.sender, 'outqueue', sendQueue ]
422 ], (err) ->
423 chai.expect(err).to.not.exist
424 broker.addBinding binding, (err) ->
425 chai.expect(err).to.be.a 'null'
426 return done null
427
428 it 'should provide data sent on connection', (done) ->
429 payloads =
430 one: { foo: 'sub-96' }
431 two: { bar: 'sub-97' }
432
433 onData = (bind, data) ->
434 chai.expect(bind.src).to.equal binding.src
435 chai.expect(bind.tgt).to.equal binding.tgt
436 connectionData.push data
437 # wait until we've gotten two packets
438 if connectionData.length == 2
439 [one, two] = connectionData
440 chai.expect(one).to.eql payloads.one
441 chai.expect(two).to.eql payloads.two
442 return done null
443 else if connectionData.length > 2
444 return done new Error "Got more data than expected"
445
446 setup (err) ->
447 return done err if err
448 broker.subscribeData binding, onData, (err) ->
449 return done err if err
450 clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
451 return done err if err
452 clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
453 return done err if err
454
455 describe 'subscribing to binding with srcQueue==tgtQueue', ->
456 sendQueue = 'sub-shared-37'
457 receiveQueue = sendQueue
458 binding = { type:'pubsub', src:sendQueue, tgt:receiveQueue }
459 connectionData = []
460 clients = null
461
462 # Should be a before, but the 'beforeEach' of higher scope are ran afterwards...
463 setup = (done) ->
464 createConnectClients address, ['sender', 'receiver'], (err, c) ->
465 clients = c
466 createQueues [
467 [ clients.receiver, 'inqueue', receiveQueue ]
468 [ clients.sender, 'outqueue', sendQueue ]
469 ], (err) ->
470 chai.expect(err).to.not.exist
471 broker.addBinding binding, (err) ->
472 chai.expect(err).to.be.a 'null'
473 return done null
474
475 it 'should provide data sent on connection', (done) ->
476 payloads =
477 one: { foo: 'sub-106' }
478 two: { bar: 'sub-107' }
479
480 onData = (bind, data) ->
481 chai.expect(bind.src).to.equal binding.src
482 chai.expect(bind.tgt).to.equal binding.tgt
483 connectionData.push data
484 # wait until we've gotten two packets
485 if connectionData.length == 2
486 [one, two] = connectionData
487 chai.expect(one).to.eql payloads.one
488 chai.expect(two).to.eql payloads.two
489 return done null
490 else if connectionData.length > 2
491 return done new Error "Got more data than expected"
492
493 setup (err) ->
494 return done err if err
495 broker.subscribeData binding, onData, (err) ->
496 return done err if err
497 clients.sender.sendTo 'outqueue', sendQueue, payloads.one, (err) ->
498 return done err if err
499 clients.sender.sendTo 'outqueue', sendQueue, payloads.two, (err) ->
500 return done err if err
501
502describe 'Transport', ->
503 Object.keys(transports).forEach (type) =>
504 describe "#{type}", () ->
505 transportTests type
506