UNPKG

5.85 kBtext/coffeescriptView Raw
1
2common = require './common'
3transport = require './transport'
4
5path = require 'path'
6fs = require 'fs'
7debug = require('debug')('msgflo:participant')
8chance = require 'chance'
9async = require 'async'
10EventEmitter = require('events').EventEmitter
11uuid = require 'uuid'
12fbp = require 'fbp'
13
14random = new chance.Chance 10202
15
16findPort = (def, type, portName) ->
17 ports = if type == 'inport' then def.inports else def.outports
18 for port in ports
19 return port if port.id == portName
20 return null
21
22definitionToFbp = (d) ->
23 def = common.clone d
24 portsWithQueue = (ports) ->
25 # these cannot be wired, so should not show. For Sources/Sinks
26 return ports.filter (p) -> return p.queue?
27
28 def.inports = portsWithQueue def.inports
29 def.outports = portsWithQueue def.outports
30 return def
31
32addQueues = (ports, role) ->
33 for p in ports
34 p.hidden = false if not p.hidden?
35 name = role+'.'+p.id.toUpperCase()
36 p.queue = name if not p.queue and not p.hidden
37
38 return ports
39
40instantiateDefinition = (d, role) ->
41 def = common.clone d
42
43 id = uuid.v4()
44 def.role = role
45 def.id = "#{def.role}-#{id}" if not def.id
46
47 def.inports = addQueues def.inports, def.role
48 def.outports = addQueues def.outports, def.role
49
50 return def
51
52defaultDiscoveryPeriod = 60
53defaultDiscoveryPeriod = parseInt process.env.MSGFLO_DISCOVERY_PERIOD if process.env.MSGFLO_DISCOVERY_PERIOD
54
55class Participant extends EventEmitter
56 # @func gets called with inport, , and should return outport, outdata
57 constructor: (client, def, @func, role, options={}) ->
58 client = transport.getClient(client) if typeof client == 'string'
59 @messaging = client
60 role = 'unknown' if not role
61 @definition = instantiateDefinition def, role
62 @running = false
63 newrelic = require './newrelic'
64 @_transactions = new newrelic.Transactions @definition
65 @options = options
66 @options.discoveryPeriod = defaultDiscoveryPeriod if not @options.discoveryPeriod # seconds
67
68 start: (callback) ->
69 @messaging.connect (err) =>
70
71 debug 'connected', err
72 return callback err if err
73 @setupPorts (err) =>
74 @running = true
75 return callback err if err
76 @register (err) =>
77 return callback err if err
78 period = @options.discoveryPeriod*1000/2.2 # try to send 2 messages before deadline
79 setTimeout () =>
80 @register (err) ->
81 console.log 'Could not send discovery message', err if err
82 , period
83 return callback null
84
85 stop: (callback) ->
86 @running = false
87 @messaging.disconnect callback
88
89 # Send data on inport
90 # Normally only used directly for Source type participants
91 # For Transform or Sink type, is called on data from input queue
92 send: (inport, data, callback = -> ) ->
93 debug 'got msg from send()', inport
94 @func inport, data, (outport, err, data) =>
95 return callback err if err
96 @onResult outport, data, callback
97
98 # Emit data on outport
99 emitData: (outport, data) ->
100 @emit 'data', outport, data
101
102 onResult: (outport, data, callback) =>
103 port = findPort @definition, 'outport', outport
104 @emitData port.id, data
105 if port.queue
106 @messaging.sendTo 'outqueue', port.queue, data, callback
107 else
108 return callback null
109
110 setupPorts: (callback) ->
111 setupOutPort = (def, callback) =>
112 return callback null if not def.queue
113
114 options = {}
115 options.persistent = def.persistent if def.persistent?
116 @messaging.createQueue 'outqueue', def.queue, options, callback
117
118 setupInPort = (def, callback) =>
119 return callback null if not def.queue
120
121 callFunc = (msg) =>
122 debug 'got msg from queue', def.queue
123 msgid = uuid.v4() # need something cross-transport, only AMQP has deliveryTag
124 @_transactions.open msgid, def.id
125 @func def.id, msg.data, (outport, err, data) =>
126 @_transactions.close msgid, outport
127 debug 'process() error', err.message if err
128 return @messaging.nackMessage msg if err and not data?
129 @onResult outport, data, (sendErr) =>
130 return @messaging.nackMessage msg if err or sendErr
131 @messaging.ackMessage msg if msg
132
133 options = {}
134 options.persistent = def.persistent if def.persistent?
135 @messaging.createQueue 'inqueue', def.queue, options, (err) =>
136 return callback err if err
137 @messaging.subscribeToQueue def.queue, callFunc, callback
138 debug 'subscribe to', def.queue
139
140 async.map @definition.outports, setupOutPort, (err) =>
141 return callback err if err
142 async.map @definition.inports, setupInPort, (err) =>
143 return callback err if err
144 return callback null
145
146 register: (callback) ->
147 # Send discovery package to broker on 'fbp' queue
148 debug 'register'
149 definition = definitionToFbp @definition
150 @messaging.registerParticipant definition, (err) =>
151 debug 'registered', err
152 return callback err
153
154# TODO: consider making component api a bit more like NoFlo.WirePattern
155#
156# inputs = { portA: { data: dataA1, groups: ['A', '1'] }, portB: { data: B1 } }
157# outfunc = (type, outputs) -> # type can be 'data', 'end'
158# process(inputs, outfunc)
159#
160# Core ideas:
161# groups attached to the packet, avoids separate lifetime handling, but still allows modification
162# should one enforce use of promises? calling process returns a promise?
163
164startParticipant = (library, client, componentName, id, callback) ->
165 debug 'starting', componentName, id
166
167 component = library[componentName]
168 return callback new Error "No Participant factory in library for #{componentName}" if not component?
169 part = component client, id
170
171 part.start (err) ->
172 return callback err, part
173
174exports.Participant = Participant
175exports.startParticipant = startParticipant
176exports.instantiateDefinition = instantiateDefinition