UNPKG

9.36 kBJavaScriptView Raw
1'use strict'
2
3const DEFAULT_OPTIONS = {
4 maxCallsPerWorker : Infinity
5 , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length
6 , maxConcurrentCallsPerWorker : 10
7 , maxConcurrentCalls : Infinity
8 , maxCallTime : Infinity // exceed this and the whole worker is terminated
9 , maxRetries : Infinity
10 , forcedKillTime : 100
11 , autoStart : false
12 }
13
14const extend = require('xtend')
15 , fork = require('./fork')
16 , TimeoutError = require('errno').create('TimeoutError')
17 , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
18 , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
19
20
21function Farm (options, path) {
22 this.options = extend(DEFAULT_OPTIONS, options)
23 this.path = path
24 this.activeCalls = 0
25}
26
27
28// make a handle to pass back in the form of an external API
29Farm.prototype.mkhandle = function (method) {
30 return function () {
31 let args = Array.prototype.slice.call(arguments)
32 if (this.activeCalls >= this.options.maxConcurrentCalls) {
33 let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')')
34 if (typeof args[args.length - 1] == 'function')
35 return process.nextTick(args[args.length - 1].bind(null, err))
36 throw err
37 }
38 this.addCall({
39 method : method
40 , callback : args.pop()
41 , args : args
42 , retries : 0
43 })
44 }.bind(this)
45}
46
47
48// a constructor of sorts
49Farm.prototype.setup = function (methods) {
50 let iface
51 if (!methods) { // single-function export
52 iface = this.mkhandle()
53 } else { // multiple functions on the export
54 iface = {}
55 methods.forEach(function (m) {
56 iface[m] = this.mkhandle(m)
57 }.bind(this))
58 }
59
60 this.searchStart = -1
61 this.childId = -1
62 this.children = {}
63 this.activeChildren = 0
64 this.callQueue = []
65
66 if (this.options.autoStart) {
67 while (this.activeChildren < this.options.maxConcurrentWorkers)
68 this.startChild()
69 }
70
71 return iface
72}
73
74
75// when a child exits, check if there are any outstanding jobs and requeue them
76Farm.prototype.onExit = function (childId) {
77 // delay this to give any sends a chance to finish
78 setTimeout(function () {
79 let doQueue = false
80 if (this.children[childId] && this.children[childId].activeCalls) {
81 this.children[childId].calls.forEach(function (call, i) {
82 if (!call) return
83 else if (call.retries >= this.options.maxRetries) {
84 this.receive({
85 idx : i
86 , child : childId
87 , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
88 })
89 } else {
90 call.retries++
91 this.callQueue.unshift(call)
92 doQueue = true
93 }
94 }.bind(this))
95 }
96 this.stopChild(childId)
97 doQueue && this.processQueue()
98 }.bind(this), 10)
99}
100
101
102// start a new worker
103Farm.prototype.startChild = function () {
104 this.childId++
105
106 let forked = fork(this.path)
107 , id = this.childId
108 , c = {
109 send : forked.send
110 , child : forked.child
111 , calls : []
112 , activeCalls : 0
113 , exitCode : null
114 }
115
116 forked.child.on('message', this.receive.bind(this))
117 forked.child.once('exit', function (code) {
118 c.exitCode = code
119 this.onExit(id)
120 }.bind(this))
121
122 this.activeChildren++
123 this.children[id] = c
124}
125
126
127// stop a worker, identified by id
128Farm.prototype.stopChild = function (childId) {
129 let child = this.children[childId]
130 if (child) {
131 child.send('die')
132 setTimeout(function () {
133 if (child.exitCode === null)
134 child.child.kill('SIGKILL')
135 }, this.options.forcedKillTime)
136 ;delete this.children[childId]
137 this.activeChildren--
138 }
139}
140
141
142// called from a child process, the data contains information needed to
143// look up the child and the original call so we can invoke the callback
144Farm.prototype.receive = function (data) {
145 let idx = data.idx
146 , childId = data.child
147 , args = data.args
148 , child = this.children[childId]
149 , call
150
151 if (!child) {
152 return console.error(
153 'Worker Farm: Received message for unknown child. '
154 + 'This is likely as a result of premature child death, '
155 + 'the operation will have been re-queued.'
156 )
157 }
158
159 call = child.calls[idx]
160 if (!call) {
161 return console.error(
162 'Worker Farm: Received message for unknown index for existing child. '
163 + 'This should not happen!'
164 )
165 }
166
167 if (this.options.maxCallTime !== Infinity)
168 clearTimeout(call.timer)
169
170 if (args[0] && args[0].$error == '$error') {
171 let e = args[0]
172 switch (e.type) {
173 case 'TypeError': args[0] = new TypeError(e.message); break
174 case 'RangeError': args[0] = new RangeError(e.message); break
175 case 'EvalError': args[0] = new EvalError(e.message); break
176 case 'ReferenceError': args[0] = new ReferenceError(e.message); break
177 case 'SyntaxError': args[0] = new SyntaxError(e.message); break
178 case 'URIError': args[0] = new URIError(e.message); break
179 default: args[0] = new Error(e.message)
180 }
181 args[0].type = e.type
182 args[0].stack = e.stack
183
184 // Copy any custom properties to pass it on.
185 Object.keys(e).forEach(function(key) {
186 args[0][key] = e[key];
187 });
188 }
189
190 process.nextTick(function () {
191 call.callback.apply(null, args)
192 })
193
194 ;delete child.calls[idx]
195 child.activeCalls--
196 this.activeCalls--
197
198 if (child.calls.length >= this.options.maxCallsPerWorker
199 && !Object.keys(child.calls).length) {
200 // this child has finished its run, kill it
201 this.stopChild(childId)
202 }
203
204 // allow any outstanding calls to be processed
205 this.processQueue()
206}
207
208
209Farm.prototype.childTimeout = function (childId) {
210 let child = this.children[childId]
211 , i
212
213 if (!child)
214 return
215
216 for (i in child.calls) {
217 this.receive({
218 idx : i
219 , child : childId
220 , args : [ new TimeoutError('worker call timed out!') ]
221 })
222 }
223 this.stopChild(childId)
224}
225
226
227// send a call to a worker, identified by id
228Farm.prototype.send = function (childId, call) {
229 let child = this.children[childId]
230 , idx = child.calls.length
231
232 child.calls.push(call)
233 child.activeCalls++
234 this.activeCalls++
235
236 child.send({
237 idx : idx
238 , child : childId
239 , method : call.method
240 , args : call.args
241 })
242
243 if (this.options.maxCallTime !== Infinity) {
244 call.timer =
245 setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
246 }
247}
248
249
250// a list of active worker ids, in order, but the starting offset is
251// shifted each time this method is called, so we work our way through
252// all workers when handing out jobs
253Farm.prototype.childKeys = function () {
254 let cka = Object.keys(this.children)
255 , cks
256
257 if (this.searchStart >= cka.length - 1)
258 this.searchStart = 0
259 else
260 this.searchStart++
261
262 cks = cka.splice(0, this.searchStart)
263
264 return cka.concat(cks)
265}
266
267
268// Calls are added to a queue, this processes the queue and is called
269// whenever there might be a chance to send more calls to the workers.
270// The various options all impact on when we're able to send calls,
271// they may need to be kept in a queue until a worker is ready.
272Farm.prototype.processQueue = function () {
273 let cka, i = 0, childId
274
275 if (!this.callQueue.length)
276 return this.ending && this.end()
277
278 if (this.activeChildren < this.options.maxConcurrentWorkers)
279 this.startChild()
280
281 for (cka = this.childKeys(); i < cka.length; i++) {
282 childId = +cka[i]
283 if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
284 && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
285
286 this.send(childId, this.callQueue.shift())
287 if (!this.callQueue.length)
288 return this.ending && this.end()
289 } /*else {
290 console.log(
291 , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
292 , this.children[childId].calls.length < this.options.maxCallsPerWorker
293 , this.children[childId].calls.length , this.options.maxCallsPerWorker)
294 }*/
295 }
296
297 if (this.ending)
298 this.end()
299}
300
301
302// add a new call to the call queue, then trigger a process of the queue
303Farm.prototype.addCall = function (call) {
304 if (this.ending)
305 return this.end() // don't add anything new to the queue
306 this.callQueue.push(call)
307 this.processQueue()
308}
309
310
311// kills child workers when they're all done
312Farm.prototype.end = function (callback) {
313 let complete = true
314 if (this.ending === false)
315 return
316 if (callback)
317 this.ending = callback
318 else if (this.ending == null)
319 this.ending = true
320 Object.keys(this.children).forEach(function (child) {
321 if (!this.children[child])
322 return
323 if (!this.children[child].activeCalls)
324 this.stopChild(child)
325 else
326 complete = false
327 }.bind(this))
328
329 if (complete && typeof this.ending == 'function') {
330 process.nextTick(function () {
331 this.ending()
332 this.ending = false
333 }.bind(this))
334 }
335}
336
337
338module.exports = Farm
339module.exports.TimeoutError = TimeoutError