1 |
|
2 | common = require './common'
|
3 | transport = require './transport'
|
4 |
|
5 | path = require 'path'
|
6 | fs = require 'fs'
|
7 | debug = require('debug')('msgflo:participant')
|
8 | chance = require 'chance'
|
9 | async = require 'async'
|
10 | EventEmitter = require('events').EventEmitter
|
11 | uuid = require 'uuid'
|
12 | fbp = require 'fbp'
|
13 |
|
14 | random = new chance.Chance 10202
|
15 |
|
16 | findPort = (def, type, portName) ->
|
17 | ports = if type == 'inport' then def.inports else def.outports
|
18 | for port in ports
|
19 | return port if port.id == portName
|
20 | return null
|
21 |
|
22 | definitionToFbp = (d) ->
|
23 | def = common.clone d
|
24 | portsWithQueue = (ports) ->
|
25 |
|
26 | return ports.filter (p) -> return p.queue?
|
27 |
|
28 | def.inports = portsWithQueue def.inports
|
29 | def.outports = portsWithQueue def.outports
|
30 | return def
|
31 |
|
32 | addQueues = (ports, role) ->
|
33 | for p in ports
|
34 | p.hidden = false if not p.hidden?
|
35 | name = role+'.'+p.id.toUpperCase()
|
36 | p.queue = name if not p.queue and not p.hidden
|
37 |
|
38 | return ports
|
39 |
|
40 | instantiateDefinition = (d, role) ->
|
41 | def = common.clone d
|
42 |
|
43 | id = uuid.v4()
|
44 | def.role = role
|
45 | def.id = "#{def.role}-#{id}" if not def.id
|
46 |
|
47 | def.inports = addQueues def.inports, def.role
|
48 | def.outports = addQueues def.outports, def.role
|
49 |
|
50 | return def
|
51 |
|
52 | defaultDiscoveryPeriod = 60
|
53 | defaultDiscoveryPeriod = parseInt process.env.MSGFLO_DISCOVERY_PERIOD if process.env.MSGFLO_DISCOVERY_PERIOD
|
54 |
|
55 | class Participant extends EventEmitter
|
56 |
|
57 | constructor: (client, def, @func, role, options={}) ->
|
58 | client = transport.getClient(client) if typeof client == 'string'
|
59 | @messaging = client
|
60 | role = 'unknown' if not role
|
61 | @definition = instantiateDefinition def, role
|
62 | @running = false
|
63 | newrelic = require './newrelic'
|
64 | @_transactions = new newrelic.Transactions @definition
|
65 | @options = options
|
66 | @options.discoveryPeriod = defaultDiscoveryPeriod if not @options.discoveryPeriod
|
67 |
|
68 | start: (callback) ->
|
69 | @messaging.connect (err) =>
|
70 |
|
71 | debug 'connected', err
|
72 | return callback err if err
|
73 | @setupPorts (err) =>
|
74 | @running = true
|
75 | return callback err if err
|
76 | @register (err) =>
|
77 | return callback err if err
|
78 | period = @options.discoveryPeriod*1000/2.2
|
79 | setTimeout () =>
|
80 | @register (err) ->
|
81 | console.log 'Could not send discovery message', err if err
|
82 | , period
|
83 | return callback null
|
84 |
|
85 | stop: (callback) ->
|
86 | @running = false
|
87 | @messaging.disconnect callback
|
88 |
|
89 |
|
90 |
|
91 |
|
92 | send: (inport, data, callback = -> ) ->
|
93 | debug 'got msg from send()', inport
|
94 | @func inport, data, (outport, err, data) =>
|
95 | return callback err if err
|
96 | @onResult outport, data, callback
|
97 |
|
98 |
|
99 | emitData: (outport, data) ->
|
100 | @emit 'data', outport, data
|
101 |
|
102 | onResult: (outport, data, callback) =>
|
103 | port = findPort @definition, 'outport', outport
|
104 | @emitData port.id, data
|
105 | if port.queue
|
106 | @messaging.sendTo 'outqueue', port.queue, data, callback
|
107 | else
|
108 | return callback null
|
109 |
|
110 | setupPorts: (callback) ->
|
111 | setupOutPort = (def, callback) =>
|
112 | return callback null if not def.queue
|
113 |
|
114 | options = {}
|
115 | options.persistent = def.persistent if def.persistent?
|
116 | @messaging.createQueue 'outqueue', def.queue, options, callback
|
117 |
|
118 | setupInPort = (def, callback) =>
|
119 | return callback null if not def.queue
|
120 |
|
121 | callFunc = (msg) =>
|
122 | debug 'got msg from queue', def.queue
|
123 | msgid = uuid.v4()
|
124 | @_transactions.open msgid, def.id
|
125 | @func def.id, msg.data, (outport, err, data) =>
|
126 | @_transactions.close msgid, outport
|
127 | debug 'process() error', err.message if err
|
128 | return @messaging.nackMessage msg if err and not data?
|
129 | @onResult outport, data, (sendErr) =>
|
130 | return @messaging.nackMessage msg if err or sendErr
|
131 | @messaging.ackMessage msg if msg
|
132 |
|
133 | options = {}
|
134 | options.persistent = def.persistent if def.persistent?
|
135 | @messaging.createQueue 'inqueue', def.queue, options, (err) =>
|
136 | return callback err if err
|
137 | @messaging.subscribeToQueue def.queue, callFunc, callback
|
138 | debug 'subscribe to', def.queue
|
139 |
|
140 | async.map @definition.outports, setupOutPort, (err) =>
|
141 | return callback err if err
|
142 | async.map @definition.inports, setupInPort, (err) =>
|
143 | return callback err if err
|
144 | return callback null
|
145 |
|
146 | register: (callback) ->
|
147 |
|
148 | debug 'register'
|
149 | definition = definitionToFbp @definition
|
150 | @messaging.registerParticipant definition, (err) =>
|
151 | debug 'registered', err
|
152 | return callback err
|
153 |
|
154 |
|
155 |
|
156 |
|
157 |
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 |
|
164 | startParticipant = (library, client, componentName, id, callback) ->
|
165 | debug 'starting', componentName, id
|
166 |
|
167 | component = library[componentName]
|
168 | return callback new Error "No Participant factory in library for #{componentName}" if not component?
|
169 | part = component client, id
|
170 |
|
171 | part.start (err) ->
|
172 | return callback err, part
|
173 |
|
174 | exports.Participant = Participant
|
175 | exports.startParticipant = startParticipant
|
176 | exports.instantiateDefinition = instantiateDefinition
|