1 | 'use strict'
|
2 |
|
3 | const DEFAULT_OPTIONS = {
|
4 | maxCallsPerWorker : Infinity
|
5 | , maxConcurrentWorkers : require('os').cpus().length
|
6 | , maxConcurrentCallsPerWorker : 10
|
7 | , maxConcurrentCalls : Infinity
|
8 | , maxCallTime : Infinity
|
9 | , maxRetries : Infinity
|
10 | , forcedKillTime : 100
|
11 | , autoStart : false
|
12 | }
|
13 |
|
14 | const extend = require('xtend')
|
15 | , fork = require('./fork')
|
16 | , TimeoutError = require('errno').create('TimeoutError')
|
17 | , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
|
18 | , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
|
19 |
|
20 |
|
21 | function Farm (options, path) {
|
22 | this.options = extend(DEFAULT_OPTIONS, options)
|
23 | this.path = path
|
24 | this.activeCalls = 0
|
25 | }
|
26 |
|
27 |
|
28 |
|
29 | Farm.prototype.mkhandle = function (method) {
|
30 | return function () {
|
31 | let args = Array.prototype.slice.call(arguments)
|
32 | if (this.activeCalls >= this.options.maxConcurrentCalls) {
|
33 | let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')')
|
34 | if (typeof args[args.length - 1] == 'function')
|
35 | return process.nextTick(args[args.length - 1].bind(null, err))
|
36 | throw err
|
37 | }
|
38 | this.addCall({
|
39 | method : method
|
40 | , callback : args.pop()
|
41 | , args : args
|
42 | , retries : 0
|
43 | })
|
44 | }.bind(this)
|
45 | }
|
46 |
|
47 |
|
48 |
|
49 | Farm.prototype.setup = function (methods) {
|
50 | let iface
|
51 | if (!methods) {
|
52 | iface = this.mkhandle()
|
53 | } else {
|
54 | iface = {}
|
55 | methods.forEach(function (m) {
|
56 | iface[m] = this.mkhandle(m)
|
57 | }.bind(this))
|
58 | }
|
59 |
|
60 | this.searchStart = -1
|
61 | this.childId = -1
|
62 | this.children = {}
|
63 | this.activeChildren = 0
|
64 | this.callQueue = []
|
65 |
|
66 | if (this.options.autoStart) {
|
67 | while (this.activeChildren < this.options.maxConcurrentWorkers)
|
68 | this.startChild()
|
69 | }
|
70 |
|
71 | return iface
|
72 | }
|
73 |
|
74 |
|
75 |
|
76 | Farm.prototype.onExit = function (childId) {
|
77 |
|
78 | setTimeout(function () {
|
79 | let doQueue = false
|
80 | if (this.children[childId] && this.children[childId].activeCalls) {
|
81 | this.children[childId].calls.forEach(function (call, i) {
|
82 | if (!call) return
|
83 | else if (call.retries >= this.options.maxRetries) {
|
84 | this.receive({
|
85 | idx : i
|
86 | , child : childId
|
87 | , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
|
88 | })
|
89 | } else {
|
90 | call.retries++
|
91 | this.callQueue.unshift(call)
|
92 | doQueue = true
|
93 | }
|
94 | }.bind(this))
|
95 | }
|
96 | this.stopChild(childId)
|
97 | doQueue && this.processQueue()
|
98 | }.bind(this), 10)
|
99 | }
|
100 |
|
101 |
|
102 |
|
103 | Farm.prototype.startChild = function () {
|
104 | this.childId++
|
105 |
|
106 | let forked = fork(this.path)
|
107 | , id = this.childId
|
108 | , c = {
|
109 | send : forked.send
|
110 | , child : forked.child
|
111 | , calls : []
|
112 | , activeCalls : 0
|
113 | , exitCode : null
|
114 | }
|
115 |
|
116 | forked.child.on('message', this.receive.bind(this))
|
117 | forked.child.once('exit', function (code) {
|
118 | c.exitCode = code
|
119 | this.onExit(id)
|
120 | }.bind(this))
|
121 |
|
122 | this.activeChildren++
|
123 | this.children[id] = c
|
124 | }
|
125 |
|
126 |
|
127 |
|
128 | Farm.prototype.stopChild = function (childId) {
|
129 | let child = this.children[childId]
|
130 | if (child) {
|
131 | child.send('die')
|
132 | setTimeout(function () {
|
133 | if (child.exitCode === null)
|
134 | child.child.kill('SIGKILL')
|
135 | }, this.options.forcedKillTime)
|
136 | ;delete this.children[childId]
|
137 | this.activeChildren--
|
138 | }
|
139 | }
|
140 |
|
141 |
|
142 |
|
143 |
|
144 | Farm.prototype.receive = function (data) {
|
145 | let idx = data.idx
|
146 | , childId = data.child
|
147 | , args = data.args
|
148 | , child = this.children[childId]
|
149 | , call
|
150 |
|
151 | if (!child) {
|
152 | return console.error(
|
153 | 'Worker Farm: Received message for unknown child. '
|
154 | + 'This is likely as a result of premature child death, '
|
155 | + 'the operation will have been re-queued.'
|
156 | )
|
157 | }
|
158 |
|
159 | call = child.calls[idx]
|
160 | if (!call) {
|
161 | return console.error(
|
162 | 'Worker Farm: Received message for unknown index for existing child. '
|
163 | + 'This should not happen!'
|
164 | )
|
165 | }
|
166 |
|
167 | if (this.options.maxCallTime !== Infinity)
|
168 | clearTimeout(call.timer)
|
169 |
|
170 | if (args[0] && args[0].$error == '$error') {
|
171 | let e = args[0]
|
172 | switch (e.type) {
|
173 | case 'TypeError': args[0] = new TypeError(e.message); break
|
174 | case 'RangeError': args[0] = new RangeError(e.message); break
|
175 | case 'EvalError': args[0] = new EvalError(e.message); break
|
176 | case 'ReferenceError': args[0] = new ReferenceError(e.message); break
|
177 | case 'SyntaxError': args[0] = new SyntaxError(e.message); break
|
178 | case 'URIError': args[0] = new URIError(e.message); break
|
179 | default: args[0] = new Error(e.message)
|
180 | }
|
181 | args[0].type = e.type
|
182 | args[0].stack = e.stack
|
183 |
|
184 |
|
185 | Object.keys(e).forEach(function(key) {
|
186 | args[0][key] = e[key];
|
187 | });
|
188 | }
|
189 |
|
190 | process.nextTick(function () {
|
191 | call.callback.apply(null, args)
|
192 | })
|
193 |
|
194 | ;delete child.calls[idx]
|
195 | child.activeCalls--
|
196 | this.activeCalls--
|
197 |
|
198 | if (child.calls.length >= this.options.maxCallsPerWorker
|
199 | && !Object.keys(child.calls).length) {
|
200 |
|
201 | this.stopChild(childId)
|
202 | }
|
203 |
|
204 |
|
205 | this.processQueue()
|
206 | }
|
207 |
|
208 |
|
209 | Farm.prototype.childTimeout = function (childId) {
|
210 | let child = this.children[childId]
|
211 | , i
|
212 |
|
213 | if (!child)
|
214 | return
|
215 |
|
216 | for (i in child.calls) {
|
217 | this.receive({
|
218 | idx : i
|
219 | , child : childId
|
220 | , args : [ new TimeoutError('worker call timed out!') ]
|
221 | })
|
222 | }
|
223 | this.stopChild(childId)
|
224 | }
|
225 |
|
226 |
|
227 |
|
228 | Farm.prototype.send = function (childId, call) {
|
229 | let child = this.children[childId]
|
230 | , idx = child.calls.length
|
231 |
|
232 | child.calls.push(call)
|
233 | child.activeCalls++
|
234 | this.activeCalls++
|
235 |
|
236 | child.send({
|
237 | idx : idx
|
238 | , child : childId
|
239 | , method : call.method
|
240 | , args : call.args
|
241 | })
|
242 |
|
243 | if (this.options.maxCallTime !== Infinity) {
|
244 | call.timer =
|
245 | setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
|
246 | }
|
247 | }
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 | Farm.prototype.childKeys = function () {
|
254 | let cka = Object.keys(this.children)
|
255 | , cks
|
256 |
|
257 | if (this.searchStart >= cka.length - 1)
|
258 | this.searchStart = 0
|
259 | else
|
260 | this.searchStart++
|
261 |
|
262 | cks = cka.splice(0, this.searchStart)
|
263 |
|
264 | return cka.concat(cks)
|
265 | }
|
266 |
|
267 |
|
268 |
|
269 |
|
270 |
|
271 |
|
272 | Farm.prototype.processQueue = function () {
|
273 | let cka, i = 0, childId
|
274 |
|
275 | if (!this.callQueue.length)
|
276 | return this.ending && this.end()
|
277 |
|
278 | if (this.activeChildren < this.options.maxConcurrentWorkers)
|
279 | this.startChild()
|
280 |
|
281 | for (cka = this.childKeys(); i < cka.length; i++) {
|
282 | childId = +cka[i]
|
283 | if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
|
284 | && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
|
285 |
|
286 | this.send(childId, this.callQueue.shift())
|
287 | if (!this.callQueue.length)
|
288 | return this.ending && this.end()
|
289 | } |
290 |
|
291 |
|
292 |
|
293 |
|
294 |
|
295 | }
|
296 |
|
297 | if (this.ending)
|
298 | this.end()
|
299 | }
|
300 |
|
301 |
|
302 |
|
303 | Farm.prototype.addCall = function (call) {
|
304 | if (this.ending)
|
305 | return this.end()
|
306 | this.callQueue.push(call)
|
307 | this.processQueue()
|
308 | }
|
309 |
|
310 |
|
311 |
|
312 | Farm.prototype.end = function (callback) {
|
313 | let complete = true
|
314 | if (this.ending === false)
|
315 | return
|
316 | if (callback)
|
317 | this.ending = callback
|
318 | else if (this.ending == null)
|
319 | this.ending = true
|
320 | Object.keys(this.children).forEach(function (child) {
|
321 | if (!this.children[child])
|
322 | return
|
323 | if (!this.children[child].activeCalls)
|
324 | this.stopChild(child)
|
325 | else
|
326 | complete = false
|
327 | }.bind(this))
|
328 |
|
329 | if (complete && typeof this.ending == 'function') {
|
330 | process.nextTick(function () {
|
331 | this.ending()
|
332 | this.ending = false
|
333 | }.bind(this))
|
334 | }
|
335 | }
|
336 |
|
337 |
|
338 | module.exports = Farm
|
339 | module.exports.TimeoutError = TimeoutError
|