UNPKG

3.61 kBtext/coffeescriptView Raw
1
2msgflo_nodejs = require '../'
3participants = require './fixtures/participants'
4
5chai = require 'chai' unless chai
6
7describe 'Participant', ->
8 address = 'direct://broker3'
9
10 describe 'Source participant', ->
11 source = null
12 beforeEach () ->
13 source = participants.FooSource address, 'foo'
14
15 it 'has inports without queues', ->
16 ports = source.definition.inports
17 chai.expect(ports).to.have.length 1
18 chai.expect(ports[0].id).to.equal 'interval'
19 chai.expect(ports[0].queue).to.be.a 'undefined'
20 it 'has outports with queues', ->
21 ports = source.definition.outports
22 chai.expect(ports).to.have.length 1
23 chai.expect(ports[0].id).to.equal 'out'
24 chai.expect(ports[0].queue).to.be.a 'string'
25 chai.expect(ports[0].queue).to.contain 'foo'
26 chai.expect(ports[0].queue).to.contain 'OUT'
27 describe 'running data', ->
28 messages = []
29 beforeEach (done) ->
30 broker = msgflo_nodejs.transport.getBroker address
31 broker.connect (err) ->
32 chai.expect(err).to.be.a 'null'
33 source.start done
34 it 'does nothing when just started'
35
36
37 it 'produces data when sending interval=100', (done) ->
38 onOutput = (msg) ->
39 messages.push msg
40 done() if messages.length == 3
41 observer = msgflo_nodejs.transport.getClient address
42 observer.connect (err) ->
43 chai.expect(err).to.be.a 'null'
44 port = source.definition.outports[0] # out
45 observer.subscribeToQueue port.queue, onOutput, (err) ->
46 chai.expect(err).to.be.a 'null'
47 source.send 'interval', 100
48 it 'stops producing when sending interval=0'
49
50
51 describe 'Transform participant', ->
52 source = null
53 beforeEach () ->
54 source = participants.Hello address, 'hello'
55 it 'has inports with queues', ->
56 ports = source.definition.inports
57 chai.expect(ports).to.have.length 1
58 chai.expect(ports[0].id).to.equal 'name'
59 chai.expect(ports[0].queue).to.be.a 'string'
60 chai.expect(ports[0].queue).to.contain 'NAME'
61 chai.expect(ports[0].queue).to.contain 'hello'
62 it 'has outports with queues', ->
63 ports = source.definition.outports
64 chai.expect(ports).to.have.length 1
65 chai.expect(ports[0].id).to.equal 'out'
66 chai.expect(ports[0].queue).to.be.a 'string'
67 chai.expect(ports[0].queue).to.contain 'OUT'
68 chai.expect(ports[0].queue).to.contain 'hello'
69 describe 'sending data on input queue', ->
70 it 'produces data on output queue'
71
72
73 describe 'Sink participant', ->
74 sink = null
75 beforeEach (done) ->
76 sink = participants.DevNullSink address, 'devnull'
77 sink.start done
78 afterEach (done) ->
79 sink.stop done
80
81 it 'has inports with queues', ->
82 ports = sink.definition.inports
83 chai.expect(ports).to.have.length 1
84 chai.expect(ports[0].id).to.equal 'drop'
85 chai.expect(ports[0].queue).to.be.a 'string'
86 chai.expect(ports[0].queue).to.contain 'DROP'
87 chai.expect(ports[0].queue).to.contain 'devnull'
88 it 'has outports without queues', ->
89 ports = sink.definition.outports
90 chai.expect(ports).to.have.length 1
91 chai.expect(ports[0].id).to.equal 'dropped'
92 chai.expect(ports[0].queue).to.be.a 'undefined'
93 describe 'sending data on input queue', ->
94 it 'produces data on output port', (done) ->
95 sink.on 'data', (outport, data) ->
96 chai.expect(outport).to.equal 'dropped'
97 chai.expect(data).to.equal 'myinput32'
98 done()
99 sink.send 'drop', "myinput32", () ->
100