fbp-protocol-client
Version:
Client library for the FBP protocol
154 lines (133 loc) • 4.2 kB
text/coffeescript
isBrowser = () ->
return !(typeof(process) != 'undefined' && process.execPath && process.execPath.indexOf('node') != -1)
EventEmitter = if isBrowser() then require('emitter') else require('events').EventEmitter
WebSocketServer = require('websocket').server
http = require 'http'
path = require 'path'
normalizePorts = (ports) ->
defaults =
type: 'any'
description: ''
addressable: false
required: false
if not ports.length?
ports = [ ports ]
normalizePort = (port) ->
normal = {}
for k,v of normal
normal[k] = v
for k,v of port
normal[k] = v
return normal
return (normalizePort p for p in ports)
# TODO: implement array ports such that each connection gets its own index,
# and that data send on a specific index is only sent to that connection
class PseudoComponent extends EventEmitter
constructor: () ->
= null
=
inPorts: {}
outPorts: {}
inports: (p) ->
.inPorts = normalizePorts p
return this
outports: (p) ->
.outPorts = normalizePorts p
return this
receive: (f) ->
= f
return this
send: (port, event, index, payload) ->
'output', port, event, index, payload
_receive: (port, event, index, payload) ->
send = () =>
.apply this, arguments
port, event, index, payload, send
class PseudoRuntime extends EventEmitter
constructor: (httpServer) ->
= []
= new WebSocketServer { httpServer: httpServer }
.on 'request', (request) =>
connection = request.accept 'noflo', request.origin
.push(connection);
connection.on 'message', (message) =>
message, connection
connection.on 'close', () =>
if .indexOf(connection) == -1
return
.splice .indexOf(connection), 1
handleMessage: (message, connection) ->
return if not message.type == 'utf8'
try
msg = JSON.parse(message.utf8Data);
catch e
return
if msg.protocol == 'runtime' && msg.command == 'getruntime'
rt =
type: 'remote-subgraph-test'
version: '0.4'
capabilities: ['protocol:runtime']
msg = { protocol: 'runtime', command: 'runtime', payload: rt }
connection.sendUTF JSON.stringify msg
else if msg.protocol == 'runtime' && msg.command == 'packet'
msg.payload, connection
setComponent: (component) ->
= component
.on 'output', (port, event, index, payload) =>
packet =
port: port
event: event
payload: payload
index: index
packet
receivePacket: (p) ->
._receive p.port, p.event, p.index, p.payload
sendPacket: (p) ->
msg =
protocol: 'runtime'
command: 'packet'
payload: p
msg
sendPorts: () ->
msg =
protocol: 'runtime'
command: 'ports'
payload: .ports
msg
sendAll: (msg) ->
msg = JSON.stringify msg
for connection in
connection.sendUTF msg
component = (name) ->
c = new PseudoComponent name
return c
Echo = () ->
c = component('Echo')
.inports({ id: 'in', description: 'Data to echo' })
.outports({ id: 'out', description: 'Echoed data' })
.receive (port, index, event, payload, send) ->
send 'out', index, event, payload
createServer = (port, callback) ->
server = new http.Server
runtime = new PseudoRuntime server
runtime.setComponent Echo()
server.listen port, (err) ->
return callback err, server
createNoFloServer = (port, callback) ->
runtime = require('noflo-runtime-websocket');
baseDir = path.join __dirname, '../'
server = http.createServer () ->
options =
baseDir: baseDir
captureOutput: false,
catchExceptions: false
rt = runtime server, options
server.listen port, () ->
return callback null, server
module.exports =
Echo: Echo
Component: PseudoComponent
Server: PseudoRuntime
createServer: createServer
createNoFloServer: createNoFloServer