UNPKG

5.57 kBtext/coffeescriptView Raw
1# FBP protocol dependent code
2
3common = require './common'
4debug = require('debug')('fbp-spec:protocol')
5
6exports.sendGraph = (runtime, graph , callback) ->
7 main = false # this is a component?
8 return callback new Error "Graph not defined" if not graph
9
10 graphId = graph.name or graph.properties.id
11 graphId = "fixture.#{common.randomString(10)}" if not graphId
12
13 pendingPorts =
14 in: []
15 out: []
16
17 runtime.sendGraph 'clear',
18 id: graphId
19 name: graph.name
20 main: main
21 library: graph.properties.project or ''
22 icon: graph.properties.icon or ''
23 description: graph.properties.description or ''
24 for name, process of graph.processes
25 debug 'adding node', name, process.component
26 runtime.sendGraph 'addnode',
27 id: name
28 component: process.component
29 metadata: process.metadata
30 graph: graphId
31 for connection in graph.connections
32 if connection.src?
33 debug 'connecting edge', connection
34 runtime.sendGraph 'addedge',
35 src:
36 node: connection.src.process
37 port: connection.src.port
38 tgt:
39 node: connection.tgt.process
40 port: connection.tgt.port
41 metadata: connection.metadata?
42 graph: graphId
43 else
44 iip = connection
45 debug 'adding IIP', iip
46 runtime.sendGraph 'addinitial',
47 src:
48 data: iip.data
49 tgt:
50 node: iip.tgt.process
51 port: iip.tgt.port
52 metadata: iip.metadata
53 graph: graphId
54 if graph.inports
55 for pub, priv of graph.inports
56 debug 'exporting inport', pub
57 runtime.sendGraph 'addinport',
58 public: pub
59 node: priv.process
60 port: priv.port
61 graph: graphId
62 pendingPorts.in.push pub
63 if graph.outports
64 for pub, priv of graph.outports
65 debug 'exporting outport', pub
66 runtime.sendGraph 'addoutport',
67 public: pub
68 node: priv.process
69 port: priv.port
70 graph: graphId
71 pendingPorts.out.push pub
72
73 waitForPorts = ({command, payload}) ->
74 return unless command in ['addinport', 'addoutport']
75 debug 'received port', payload.public
76 if command is 'addinport'
77 collection = pendingPorts.in
78 else
79 collection = pendingPorts.out
80 if collection.indexOf(payload.public) is -1
81 debug 'received unknown port', payload.public
82 return
83 collection.splice collection.indexOf(payload.public), 1
84 return if pendingPorts.in.length or pendingPorts.out.length
85 runtime.removeListener 'graph', waitForPorts
86 return callback null, graphId
87
88 debug 'sendGraph waiting for updated exported ports'
89 runtime.on 'graph', waitForPorts
90
91exports.startNetwork = (runtime, graphId, callback) ->
92 debug 'startnetwork', graphId
93
94 waitForStarted = (status) ->
95 debug 'start: runtime status change', status
96 if status.started
97 runtime.removeListener 'execution', waitForStarted
98 return callback null
99
100 runtime.on 'execution', waitForStarted
101
102 runtime.sendNetwork 'start',
103 graph: graphId
104
105exports.stopNetwork = (runtime, graphId, callback) ->
106 debug 'stopnetwork', graphId
107
108 waitForStopped = (status) ->
109 debug 'stop: runtime status change', status
110 if not status.running
111 runtime.removeListener 'execution', waitForStopped
112 return callback null
113 runtime.on 'execution', waitForStopped
114
115 runtime.sendNetwork 'stop',
116 graph: graphId
117
118exports.sendPackets = (client, graphId, packets, callback) ->
119 debug 'sendpackets', graphId, packets
120
121 for port, payload of packets
122 client.sendRuntime 'packet',
123 event: 'data'
124 port: port
125 payload: payload
126 graph: graphId
127
128 return callback null
129
130exports.getComponents = getComponents = (client, callback) ->
131 debug 'get components'
132
133 components = {}
134 gotComponent = (msg) ->
135 { command, payload } = msg
136 debug 'got component?', command
137 if command == 'component'
138 components[payload.name] = payload
139 else if command == 'componentsready'
140 client.removeListener 'component', gotComponent
141 return callback null, components
142
143 client.on 'component', gotComponent
144 client.sendComponent 'list', {}
145
146exports.getCapabilities = (client, callback) ->
147 def = client.definition
148 return callback null, def.capabilities if def?.capabilities?.length
149 onCapabilities = (capabilities) ->
150 client.removeListener 'capabilities', onCapabilities
151 return callback null, capabilities, def
152 client.on 'capabilities', onCapabilities
153
154exports.getComponentTests = (client, callback) ->
155 debug 'get component tests'
156
157 responses = 0
158 expectResponses = 0
159 tests = {}
160 gotComponent = (msg) ->
161 { command, payload } = msg
162 responses += 1
163 debug 'got component source?', command, payload.name, payload.tests?, responses, expectResponses
164 return if command != 'source'
165
166 tests[payload.name] = payload.tests if payload.tests? # not all components have tests
167 if responses == expectResponses
168 debug 'got all component sources', Object.keys(tests).length
169 return complete null, tests
170
171 complete = (err, tests) ->
172 client.removeListener 'component', gotComponent
173 return callback err, tests
174
175 getComponents client, (err, components) ->
176 return complete err if err
177
178 componentNames = Object.keys components
179 expectResponses = componentNames.length
180 return complete null, tests if expectResponses == 0
181
182 debug 'retrieving sources for', expectResponses
183
184 client.on 'component', gotComponent
185 for name in componentNames
186 client.sendComponent 'getsource',
187 name: name
188