1 |
|
2 | msgflo_nodejs = require '../'
|
3 | participants = require './fixtures/participants'
|
4 |
|
5 | chai = require 'chai' unless chai
|
6 |
|
7 | describe 'Participant', ->
|
8 | address = 'direct://broker3'
|
9 |
|
10 | describe 'Source participant', ->
|
11 | source = null
|
12 | beforeEach () ->
|
13 | source = participants.FooSource address, 'foo'
|
14 |
|
15 | it 'has inports without queues', ->
|
16 | ports = source.definition.inports
|
17 | chai.expect(ports).to.have.length 1
|
18 | chai.expect(ports[0].id).to.equal 'interval'
|
19 | chai.expect(ports[0].queue).to.be.a 'undefined'
|
20 | it 'has outports with queues', ->
|
21 | ports = source.definition.outports
|
22 | chai.expect(ports).to.have.length 1
|
23 | chai.expect(ports[0].id).to.equal 'out'
|
24 | chai.expect(ports[0].queue).to.be.a 'string'
|
25 | chai.expect(ports[0].queue).to.contain 'foo'
|
26 | chai.expect(ports[0].queue).to.contain 'OUT'
|
27 | describe 'running data', ->
|
28 | messages = []
|
29 | beforeEach (done) ->
|
30 | broker = msgflo_nodejs.transport.getBroker address
|
31 | broker.connect (err) ->
|
32 | chai.expect(err).to.be.a 'null'
|
33 | source.start done
|
34 | it 'does nothing when just started'
|
35 |
|
36 |
|
37 | it 'produces data when sending interval=100', (done) ->
|
38 | onOutput = (msg) ->
|
39 | messages.push msg
|
40 | done() if messages.length == 3
|
41 | observer = msgflo_nodejs.transport.getClient address
|
42 | observer.connect (err) ->
|
43 | chai.expect(err).to.be.a 'null'
|
44 | port = source.definition.outports[0]
|
45 | observer.subscribeToQueue port.queue, onOutput, (err) ->
|
46 | chai.expect(err).to.be.a 'null'
|
47 | source.send 'interval', 100
|
48 | it 'stops producing when sending interval=0'
|
49 |
|
50 |
|
51 | describe 'Transform participant', ->
|
52 | source = null
|
53 | beforeEach () ->
|
54 | source = participants.Hello address, 'hello'
|
55 | it 'has inports with queues', ->
|
56 | ports = source.definition.inports
|
57 | chai.expect(ports).to.have.length 1
|
58 | chai.expect(ports[0].id).to.equal 'name'
|
59 | chai.expect(ports[0].queue).to.be.a 'string'
|
60 | chai.expect(ports[0].queue).to.contain 'NAME'
|
61 | chai.expect(ports[0].queue).to.contain 'hello'
|
62 | it 'has outports with queues', ->
|
63 | ports = source.definition.outports
|
64 | chai.expect(ports).to.have.length 1
|
65 | chai.expect(ports[0].id).to.equal 'out'
|
66 | chai.expect(ports[0].queue).to.be.a 'string'
|
67 | chai.expect(ports[0].queue).to.contain 'OUT'
|
68 | chai.expect(ports[0].queue).to.contain 'hello'
|
69 | describe 'sending data on input queue', ->
|
70 | it 'produces data on output queue'
|
71 |
|
72 |
|
73 | describe 'Sink participant', ->
|
74 | sink = null
|
75 | beforeEach (done) ->
|
76 | sink = participants.DevNullSink address, 'devnull'
|
77 | sink.start done
|
78 | afterEach (done) ->
|
79 | sink.stop done
|
80 |
|
81 | it 'has inports with queues', ->
|
82 | ports = sink.definition.inports
|
83 | chai.expect(ports).to.have.length 1
|
84 | chai.expect(ports[0].id).to.equal 'drop'
|
85 | chai.expect(ports[0].queue).to.be.a 'string'
|
86 | chai.expect(ports[0].queue).to.contain 'DROP'
|
87 | chai.expect(ports[0].queue).to.contain 'devnull'
|
88 | it 'has outports without queues', ->
|
89 | ports = sink.definition.outports
|
90 | chai.expect(ports).to.have.length 1
|
91 | chai.expect(ports[0].id).to.equal 'dropped'
|
92 | chai.expect(ports[0].queue).to.be.a 'undefined'
|
93 | describe 'sending data on input queue', ->
|
94 | it 'produces data on output port', (done) ->
|
95 | sink.on 'data', (outport, data) ->
|
96 | chai.expect(outport).to.equal 'dropped'
|
97 | chai.expect(data).to.equal 'myinput32'
|
98 | done()
|
99 | sink.send 'drop', "myinput32", () ->
|
100 |
|