UNPKG

15.7 kBJavaScriptView Raw
1'use strict'
2
3const tape = require('tape')
4 , child_process = require('child_process')
5 , workerFarm = require('../')
6 , childPath = require.resolve('./child')
7 , fs = require('fs')
8
9function uniq (ar) {
10 let a = [], i, j
11 o: for (i = 0; i < ar.length; ++i) {
12 for (j = 0; j < a.length; ++j) if (a[j] == ar[i]) continue o
13 a[a.length] = ar[i]
14 }
15 return a
16}
17
18
19// a child where module.exports = function ...
20tape('simple, exports=function test', function (t) {
21 t.plan(4)
22
23 let child = workerFarm(childPath)
24 child(0, function (err, pid, rnd) {
25 t.ok(pid > process.pid, 'pid makes sense')
26 t.ok(pid < process.pid + 500, 'pid makes sense')
27 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
28 })
29
30 workerFarm.end(child, function () {
31 t.ok(true, 'workerFarm ended')
32 })
33})
34
35
36// a child where we have module.exports.fn = function ...
37tape('simple, exports.fn test', function (t) {
38 t.plan(4)
39
40 let child = workerFarm(childPath, [ 'run0' ])
41 child.run0(function (err, pid, rnd) {
42 t.ok(pid > process.pid, 'pid makes sense')
43 t.ok(pid < process.pid + 500, 'pid makes sense')
44 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
45 })
46
47 workerFarm.end(child, function () {
48 t.ok(true, 'workerFarm ended')
49 })
50})
51
52
53// use the returned pids to check that we're using a single child process
54// when maxConcurrentWorkers = 1
55tape('single worker', function (t) {
56 t.plan(2)
57
58 let child = workerFarm({ maxConcurrentWorkers: 1 }, childPath)
59 , pids = []
60 , i = 10
61
62 while (i--) {
63 child(0, function (err, pid) {
64 pids.push(pid)
65 if (pids.length == 10) {
66 t.equal(1, uniq(pids).length, 'only a single process (by pid)')
67 } else if (pids.length > 10)
68 t.fail('too many callbacks!')
69 })
70 }
71
72 workerFarm.end(child, function () {
73 t.ok(true, 'workerFarm ended')
74 })
75})
76
77
78// use the returned pids to check that we're using two child processes
79// when maxConcurrentWorkers = 2
80tape('two workers', function (t) {
81 t.plan(2)
82
83 let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath)
84 , pids = []
85 , i = 10
86
87 while (i--) {
88 child(0, function (err, pid) {
89 pids.push(pid)
90 if (pids.length == 10) {
91 t.equal(2, uniq(pids).length, 'only two child processes (by pid)')
92 } else if (pids.length > 10)
93 t.fail('too many callbacks!')
94 })
95 }
96
97 workerFarm.end(child, function () {
98 t.ok(true, 'workerFarm ended')
99 })
100})
101
102
103// use the returned pids to check that we're using a child process per
104// call when maxConcurrentWorkers = 10
105tape('many workers', function (t) {
106 t.plan(2)
107
108 let child = workerFarm({ maxConcurrentWorkers: 10 }, childPath)
109 , pids = []
110 , i = 10
111
112 while (i--) {
113 child(1, function (err, pid) {
114 pids.push(pid)
115 if (pids.length == 10) {
116 t.equal(10, uniq(pids).length, 'pids are all the same (by pid)')
117 } else if (pids.length > 10)
118 t.fail('too many callbacks!')
119 })
120 }
121
122 workerFarm.end(child, function () {
123 t.ok(true, 'workerFarm ended')
124 })
125})
126
127
128tape('auto start workers', function (t) {
129 let child = workerFarm({ maxConcurrentWorkers: 3, autoStart: true }, childPath, ['uptime'])
130 , pids = []
131 , count = 5
132 , i = count
133 , delay = 250
134
135 t.plan(count + 1)
136
137 setTimeout(function() {
138 while (i--)
139 child.uptime(function (err, uptime) {
140 t.ok(uptime > 10, 'child has been up before the request (' + uptime + 'ms)')
141 })
142
143 workerFarm.end(child, function () {
144 t.ok(true, 'workerFarm ended')
145 })
146 }, delay)
147})
148
149
150// use the returned pids to check that we're using a child process per
151// call when we set maxCallsPerWorker = 1 even when we have maxConcurrentWorkers = 1
152tape('single call per worker', function (t) {
153 t.plan(2)
154
155 let child = workerFarm({
156 maxConcurrentWorkers: 1
157 , maxConcurrentCallsPerWorker: Infinity
158 , maxCallsPerWorker: 1
159 , autoStart: true
160 }, childPath)
161 , pids = []
162 , count = 25
163 , i = count
164
165 while (i--) {
166 child(0, function (err, pid) {
167 pids.push(pid)
168 if (pids.length == count) {
169 t.equal(count, uniq(pids).length, 'one process for each call (by pid)')
170 workerFarm.end(child, function () {
171 t.ok(true, 'workerFarm ended')
172 })
173 } else if (pids.length > count)
174 t.fail('too many callbacks!')
175 })
176 }
177})
178
179
180// use the returned pids to check that we're using a child process per
181// two-calls when we set maxCallsPerWorker = 2 even when we have maxConcurrentWorkers = 1
182tape('two calls per worker', function (t) {
183 t.plan(2)
184
185 let child = workerFarm({
186 maxConcurrentWorkers: 1
187 , maxConcurrentCallsPerWorker: Infinity
188 , maxCallsPerWorker: 2
189 , autoStart: true
190 }, childPath)
191 , pids = []
192 , count = 20
193 , i = count
194
195 while (i--) {
196 child(0, function (err, pid) {
197 pids.push(pid)
198 if (pids.length == count) {
199 t.equal(count / 2, uniq(pids).length, 'one process for each call (by pid)')
200 workerFarm.end(child, function () {
201 t.ok(true, 'workerFarm ended')
202 })
203 } else if (pids.length > count)
204 t.fail('too many callbacks!')
205 })
206 }
207})
208
209
210// use timing to confirm that one worker will process calls sequentially
211tape('many concurrent calls', function (t) {
212 t.plan(2)
213
214 let child = workerFarm({
215 maxConcurrentWorkers: 1
216 , maxConcurrentCallsPerWorker: Infinity
217 , maxCallsPerWorker: Infinity
218 , autoStart: true
219 }, childPath)
220 , defer = 200
221 , count = 200
222 , i = count
223 , cbc = 0
224
225 setTimeout(function () {
226 let start = Date.now()
227
228 while (i--) {
229 child(defer, function () {
230 if (++cbc == count) {
231 let time = Date.now() - start
232 // upper-limit not tied to `count` at all
233 t.ok(time > defer && time < (defer * 2.5), 'processed tasks concurrently (' + time + 'ms)')
234 workerFarm.end(child, function () {
235 t.ok(true, 'workerFarm ended')
236 })
237 } else if (cbc > count)
238 t.fail('too many callbacks!')
239 })
240 }
241 }, 250)
242})
243
244
245// use timing to confirm that one child processes calls sequentially with
246// maxConcurrentCallsPerWorker = 1
247tape('single concurrent call', function (t) {
248 t.plan(2)
249
250 let child = workerFarm({
251 maxConcurrentWorkers: 1
252 , maxConcurrentCallsPerWorker: 1
253 , maxCallsPerWorker: Infinity
254 , autoStart: true
255 }, childPath)
256 , defer = 20
257 , count = 100
258 , i = count
259 , cbc = 0
260
261 setTimeout(function () {
262 let start = Date.now()
263
264 while (i--) {
265 child(defer, function () {
266 if (++cbc == count) {
267 let time = Date.now() - start
268 // upper-limit tied closely to `count`, 1.2 is generous but accounts for all the timers
269 // coming back at the same time and the IPC overhead
270 t.ok(time > (defer * count) && time < (defer * count * 1.2), 'processed tasks sequentially (' + time + 'ms)')
271 workerFarm.end(child, function () {
272 t.ok(true, 'workerFarm ended')
273 })
274 } else if (cbc > count)
275 t.fail('too many callbacks!')
276 })
277 }
278 }, 250)
279})
280
281
282// use timing to confirm that one child processes *only* 5 calls concurrently
283tape('multiple concurrent calls', function (t) {
284 t.plan(2)
285
286 let callsPerWorker = 5
287 , child = workerFarm({
288 maxConcurrentWorkers: 1
289 , maxConcurrentCallsPerWorker: callsPerWorker
290 , maxCallsPerWorker: Infinity
291 , autoStart: true
292 }, childPath)
293 , defer = 100
294 , count = 100
295 , i = count
296 , cbc = 0
297
298 setTimeout(function () {
299 let start = Date.now()
300
301 while (i--) {
302 child(defer, function () {
303 if (++cbc == count) {
304 let time = Date.now() - start
305 // (defer * (count / callsPerWorker + 1)) - if precise it'd be count/callsPerWorker
306 // but accounting for IPC and other overhead, we need to give it a bit of extra time,
307 // hence the +1
308 t.ok(time > (defer * 1.5) && time < (defer * (count / callsPerWorker + 1)), 'processed tasks concurrently (' + time + 'ms)')
309 workerFarm.end(child, function () {
310 t.ok(true, 'workerFarm ended')
311 })
312 } else if (cbc > count)
313 t.fail('too many callbacks!')
314 })
315 }
316 }, 250)
317})
318
319
320// call a method that will die with a probability of 0.5 but expect that
321// we'll get results for each of our calls anyway
322tape('durability', function (t) {
323 t.plan(3)
324
325 let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath, [ 'killable' ])
326 , ids = []
327 , pids = []
328 , count = 10
329 , i = count
330
331 while (i--) {
332 child.killable(i, function (err, id, pid) {
333 ids.push(id)
334 pids.push(pid)
335 if (ids.length == count) {
336 t.ok(uniq(pids).length > 2, 'processed by many (' + uniq(pids).length + ') workers, but got there in the end!')
337 t.ok(uniq(ids).length == count, 'received a single result for each unique call')
338 workerFarm.end(child, function () {
339 t.ok(true, 'workerFarm ended')
340 })
341 } else if (ids.length > count)
342 t.fail('too many callbacks!')
343 })
344 }
345})
346
347
348// a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child)
349tape('simple, end callback', function (t) {
350 t.plan(4)
351
352 let child = workerFarm(childPath)
353 child(0, function (err, pid, rnd) {
354 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
355 t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid)
356 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
357 })
358
359 workerFarm.end(child, function() {
360 t.pass('an .end() callback was successfully called')
361 })
362})
363
364
365tape('call timeout test', function (t) {
366 t.plan(3 + 3 + 4 + 4 + 4 + 3 + 1)
367
368 let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath)
369
370 // should come back ok
371 child(50, function (err, pid, rnd) {
372 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
373 t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid)
374 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
375 })
376
377 // should come back ok
378 child(50, function (err, pid, rnd) {
379 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
380 t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid)
381 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
382 })
383
384 // should die
385 child(500, function (err, pid, rnd) {
386 t.ok(err, 'got an error')
387 t.equal(err.type, 'TimeoutError', 'correct error type')
388 t.ok(pid === undefined, 'no pid')
389 t.ok(rnd === undefined, 'no rnd')
390 })
391
392 // should die
393 child(1000, function (err, pid, rnd) {
394 t.ok(err, 'got an error')
395 t.equal(err.type, 'TimeoutError', 'correct error type')
396 t.ok(pid === undefined, 'no pid')
397 t.ok(rnd === undefined, 'no rnd')
398 })
399
400 // should die even though it is only a 100ms task, it'll get caught up
401 // in a dying worker
402 setTimeout(function () {
403 child(100, function (err, pid, rnd) {
404 t.ok(err, 'got an error')
405 t.equal(err.type, 'TimeoutError', 'correct error type')
406 t.ok(pid === undefined, 'no pid')
407 t.ok(rnd === undefined, 'no rnd')
408 })
409 }, 200)
410
411 // should be ok, new worker
412 setTimeout(function () {
413 child(50, function (err, pid, rnd) {
414 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
415 t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid)
416 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
417 })
418 workerFarm.end(child, function () {
419 t.ok(true, 'workerFarm ended')
420 })
421 }, 400)
422})
423
424
425tape('test error passing', function (t) {
426 t.plan(10)
427
428 let child = workerFarm(childPath, [ 'err' ])
429 child.err('Error', 'this is an Error', function (err) {
430 t.ok(err instanceof Error, 'is an Error object')
431 t.equal('Error', err.type, 'correct type')
432 t.equal('this is an Error', err.message, 'correct message')
433 })
434 child.err('TypeError', 'this is a TypeError', function (err) {
435 t.ok(err instanceof Error, 'is a TypeError object')
436 t.equal('TypeError', err.type, 'correct type')
437 t.equal('this is a TypeError', err.message, 'correct message')
438 })
439 child.err('Error', 'this is an Error with custom props', {foo: 'bar', 'baz': 1}, function (err) {
440 t.ok(err instanceof Error, 'is an Error object')
441 t.equal(err.foo, 'bar', 'passes data')
442 t.equal(err.baz, 1, 'passes data')
443 })
444
445 workerFarm.end(child, function () {
446 t.ok(true, 'workerFarm ended')
447 })
448})
449
450
451tape('test maxConcurrentCalls', function (t) {
452 t.plan(10)
453
454 let child = workerFarm({ maxConcurrentCalls: 5 }, childPath)
455
456 child(50, function (err) { t.notOk(err, 'no error') })
457 child(50, function (err) { t.notOk(err, 'no error') })
458 child(50, function (err) { t.notOk(err, 'no error') })
459 child(50, function (err) { t.notOk(err, 'no error') })
460 child(50, function (err) { t.notOk(err, 'no error') })
461 child(50, function (err) {
462 t.ok(err)
463 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
464 })
465 child(50, function (err) {
466 t.ok(err)
467 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
468 })
469
470 workerFarm.end(child, function () {
471 t.ok(true, 'workerFarm ended')
472 })
473})
474
475
476// this test should not keep the process running! if the test process
477// doesn't die then the problem is here
478tape('test timeout kill', function (t) {
479 t.plan(3)
480
481 let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath, [ 'block' ])
482 child.block(function (err) {
483 t.ok(err, 'got an error')
484 t.equal(err.type, 'TimeoutError', 'correct error type')
485 })
486
487 workerFarm.end(child, function () {
488 t.ok(true, 'workerFarm ended')
489 })
490})
491
492
493tape('test max retries after process terminate', function (t) {
494 t.plan(7)
495
496 // temporary file is used to store the number of retries among terminating workers
497 let filepath1 = '.retries1'
498 let child1 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5}, childPath, [ 'stubborn' ])
499 child1.stubborn(filepath1, function (err, result) {
500 t.notOk(err, 'no error')
501 t.equal(result, 12, 'correct result')
502 })
503
504 workerFarm.end(child1, function () {
505 fs.unlinkSync(filepath1)
506 t.ok(true, 'workerFarm ended')
507 })
508
509 let filepath2 = '.retries2'
510 let child2 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 3}, childPath, [ 'stubborn' ])
511 child2.stubborn(filepath2, function (err, result) {
512 t.ok(err, 'got an error')
513 t.equal(err.type, 'ProcessTerminatedError', 'correct error type')
514 t.equal(err.message, 'cancel after 3 retries!', 'correct message and number of retries')
515 })
516
517 workerFarm.end(child2, function () {
518 fs.unlinkSync(filepath2)
519 t.ok(true, 'workerFarm ended')
520 })
521})
522
523
524tape('ensure --debug/--inspect not propagated to children', function (t) {
525 t.plan(3)
526
527 let script = __dirname + '/debug.js'
528 , debugArg = process.version.replace(/^v(\d+)\..*$/, '$1') >= 8 ? '--inspect' : '--debug=8881'
529 , child = child_process.spawn(process.execPath, [ debugArg, script ])
530 , stdout = ''
531
532 child.stdout.on('data', function (data) {
533 stdout += data.toString()
534 })
535
536 child.on('close', function (code) {
537 t.equal(code, 0, 'exited without error (' + code + ')')
538 t.ok(stdout.indexOf('FINISHED') > -1, 'process finished')
539 t.ok(stdout.indexOf('--debug') === -1, 'child does not receive debug flag')
540 })
541})