UNPKG

msgflo

Version:

Polyglot FBP runtime based on message queues

331 lines (273 loc) 10.6 kB
debug = require('debug')('msgflo:coordinator') EventEmitter = require('events').EventEmitter fs = require 'fs' path = require 'path' async = require 'async' setup = require './setup' library = require './library' findPort = (def, type, portName) -> ports = if type == 'inport' then def.inports else def.outports for port in ports return port if port.id == portName return null connId = (fromId, fromPort, toId, toPort) -> return "#{fromId} #{fromPort} -> #{toPort} #{toId}" fromConnId = (id) -> t = id.split ' ' return [ t[0], t[1], t[4], t[3] ] iipId = (part, port) -> return "#{part} #{port}" fromIipId = (id) -> return id.split ' ' waitForParticipant = (coordinator, role, callback) -> existing = coordinator.participantsByRole role return callback null, coordinator.participants[existing[0]] if existing.length onTimeout = () => return callback new Error "Waiting for participant #{role} timed out" timeout = setTimeout onTimeout, 10000 onParticipantAdded = (part) => if part.role == role debug 'onParticipantAdded', part.role # FIXME: take into account multiple participants with same role clearTimeout timeout coordinator.removeListener 'participant-added', onParticipantAdded return callback null coordinator.on 'participant-added', onParticipantAdded class Coordinator extends EventEmitter constructor: (@broker, @options = {}) -> @participants = {} @connections = {} # connId -> { queue: opt String, handler: opt function } @iips = {} # iipId -> value @started = false @processes = {} @library = new library.Library { configfile: @options.library, componentdir: @options.componentdir } @exported = inports: {} outports: {} @on 'participant', @checkParticipantConnections start: (callback) -> @library.load (err) => return callback err if err @broker.connect (err) => debug 'connected', err return callback err if err @broker.subscribeParticipantChange (msg) => @handleFbpMessage msg.data @broker.ackMessage msg @started = true debug 'started', err, @started return callback null stop: (callback) -> @started = false @broker.disconnect (err) => return callback err if err setup.killProcesses @processes, 'SIGTERM', callback handleFbpMessage: (data) -> if data.protocol == 'discovery' and data.command == 'participant' @addParticipant data.payload else throw new Error 'Unknown FBP message' addParticipant: (definition) -> debug 'addParticipant', definition.id @participants[definition.id] = definition @emit 'participant-added', definition @emit 'participant', 'added', definition removeParticipant: (id) -> definition = @participants[id] @emit 'participant-removed', definition @emit 'participant', 'removed', definition addComponent: (name, language, code, callback) -> @library.addComponent name, language, code, callback startParticipant: (node, component, callback) -> iips = {} cmd = @library.componentCommand component, node, iips commands = {} commands[node] = cmd options = broker: @options.broker forward: '' # whether to forward subprocess communication setup.startProcesses commands, options, (err, processes) => return callback err if err for k, v of processes @processes[k] = v waitForParticipant @, node, (err) -> return callback err, processes stopParticipant: (node, component, callback) -> processes = {} for k, v of @processes if k == node processes[k] = v setup.killProcesses processes, 'SIGTERM', (err) -> return callback err for k, v of processes delete @process[k] return callback null, processes sendTo: (participantId, inport, message, callback) -> debug 'sendTo', participantId, inport, message defaultCallback = (err) -> throw err if err callback = defaultCallback if not callback part = @participants[participantId] part = @participants[@participantsByRole(participantId)] if not part? port = findPort part, 'inport', inport return @broker.sendTo 'inqueue', port.queue, message, callback subscribeTo: (participantId, outport, handler, callback) -> defaultCallback = (err) -> throw err if err callback = defaultCallback if not callback part = @participants[participantId] part = @participants[@participantsByRole(participantId)] if not part? debug 'subscribeTo', participantId, outport port = findPort part, 'outport', outport ackHandler = (msg) => return if not @started handler msg @broker.ackMessage msg # Cannot subscribe directly to an outqueue, must create and bind an inqueue readQueue = 'msgflo-export-' + Math.floor(Math.random()*999999) @broker.createQueue 'inqueue', readQueue, (err) => return callback err if err @broker.addBinding {type: 'pubsub', src: port.queue, tgt: readQueue}, (err) => return callback err if err @broker.subscribeToQueue readQueue, ackHandler, (err) -> return callback err, readQueue # caller should teardown readQueue unsubscribeFrom: () -> # FIXME: implement connect: (fromId, fromPort, toId, toName, callback) -> callback = ((err) ->) if not callback # XXX: there is now a mixture of participant id and role used here findQueue = (partId, dir, portName) => part = @participants[partId] part = @participants[@participantsByRole(partId)] if not part? for port in part[dir] return port.queue if port.id == portName # NOTE: adding partial connection info to make checkParticipantConnections logic work edgeId = connId fromId, fromPort, toId, toName edge = fromId: fromId fromPort: fromPort toId: toId toName: toName srcQueue: null tgtQueue: null @connections[edgeId] = edge # might be that it was just added/started, not yet discovered waitForParticipant @, fromId, (err) => return callback err if err waitForParticipant @, toId, (err) => return callback err if err # TODO: support roundtrip @connections[edgeId].srcQueue = findQueue fromId, 'outports', fromPort @connections[edgeId].tgtQueue = findQueue toId, 'inports', toName @broker.addBinding {type: 'pubsub', src:edge.srcQueue, tgt:edge.tgtQueue}, (err) => return callback err # TODO: introduce some "spying functionality" to provide edge messages, add tests disconnect: (fromId, fromPortId, toId, toPortId) -> # FIXME: implement checkParticipantConnections: (action, participant) -> findConnectedPorts = (dir, srcPort) => conn = [] # return conn if not srcPort.queue for id, part of @participants for port in part[dir] continue if not port.queue conn.push { part: part, port: port } if port.queue == srcPort.queue return conn isConnected = (e) => [fromId, fromPort, toId, toPort] = e id = connId fromId, fromPort, toId, toPort return @connections[id]? if action == 'added' id = participant.id # inbound for port in participant.inports matches = findConnectedPorts 'outports', port for m in matches e = [m.part.id, m.port.id, id, port.id] @connect e[0], e[1], e[2], e[3] if not isConnected e # outbound for port in participant.outports matches = findConnectedPorts 'inports', port for m in matches e = [id, port.id, m.part.id, m.port.id] @connect e[0], e[1], e[2], e[3] if not isConnected e else if action == 'removed' null # TODO: implement else null # ignored addInitial: (partId, portId, data) -> id = iipId partId, portId @iips[id] = data @sendTo partId, portId, data if @started removeInitial: (partId, portId) -> # FIXME: implement # Do we need to remove it from the queue?? exportPort: (direction, external, node, internal, callback) -> target = if direction.indexOf("in") == 0 then @exported.inports else @exported.outports target[external] = role: node port: internal subscriber: null queue: null graph = null # FIXME: capture # Wait for target node to exist waitForParticipant @, node, (err) => return callback err if err if direction.indexOf('out') == 0 handler = (msg) => @emit 'exported-port-data', external, msg.data, graph @subscribeTo node, internal, handler, (err, readQueue) -> return callback err if err target[external].subscriber = handler target[external].queue = readQueue return callback null else return callback null unexportPort: () -> # FIXME: implement sendToExportedPort: (port, data, callback) -> # FIXME lookup which node, port this corresponds to internal = @exported.inports[port] debug 'sendToExportedPort', port, internal return callback new Error "Cannot find exported port #{port}" if not internal @sendTo internal.role, internal.port, data, callback startNetwork: (networkId, callback) -> # Don't have a concept of started/stopped so far, no-op setTimeout callback, 10 stopNetwork: (networkId, callback) -> # Don't have a concept of started/stopped so far, no-op setTimeout callback, 10 serializeGraph: (name) -> graph = properties: name: name processes: {} connections: [] inports: [] outports: [] for id, part of @participants graph.processes[id] = component: part.component for id, conn of @connections parts = fromConnId id edge = src: process: parts[0] port: parts[1] tgt: process: parts[2] port: parts[3] graph.connections.push edge return graph loadGraphFile: (path, opts, callback) -> options = graphfile: path libraryfile: @library.configfile for k, v of opts options[k] = v setup.participants options, (err, proc) => return callback err if err @processes = proc setup.bindings options, callback participantsByRole: (role) -> matchRole = (id) => part = @participants[id] return part.role == role m = Object.keys(@participants).filter matchRole return m exports.Coordinator = Coordinator