UNPKG

11.1 kBtext/coffeescriptView Raw
1mockery = require 'mockery'
2sinon = require 'sinon'
3expect = require 'expect.js'
4uuid = require 'node-uuid'
5path = require 'path'
6fs = require 'fs'
7num_cpus = require('os').cpus().length
8
9pass = -> undefined
10reflex = (cb) -> cb()
11slow_reflex = (cb) -> setTimeout cb, 10
12
13describe 'bobbin', ->
14
15 cluster_mock =
16 isMaster: true
17 setupMaster: (opts) ->
18 expect(fs.existsSync opts.exec).to.be.ok()
19 fork: ->
20 on: pass
21 send: pass
22
23 bobbin = null
24
25 before ->
26 mockery.enable useCleanCache: true
27 mockery.registerAllowable '../src/bobbin.coffee', true
28 mockery.registerAllowables ['os', 'node-uuid', 'path']
29 mockery.registerMock 'cluster', cluster_mock
30 bobbin = require '../src/bobbin.coffee'
31
32 describe '.create()', ->
33
34 it 'should fail if not cluster.isMaster', ->
35 cluster_mock.isMaster = false
36
37 expect(-> bobbin.create()).to.throwError()
38
39 cluster_mock.isMaster = true
40
41 it 'should create num_cpus processes by default', (done) ->
42 handlers = {}
43 id = undefined
44
45 i = 0
46
47 cluster_mock.fork = ->
48 if ++i > num_cpus
49 expect.fail 'bobbin created more processes than cpus'
50
51 send: pass
52 on: pass
53
54 bobbin.create()
55
56 # to cleanly end the test
57 did = false
58
59 cluster_mock.fork = ->
60 unless did
61 expect(i).to.eql num_cpus
62 done()
63 did = true
64
65 send: pass
66 on: pass
67
68 bobbin.create()
69
70 it 'should create N processes for N >= 1', (done) ->
71 handlers = {}
72 id = undefined
73
74 i = 0
75
76 cluster_mock.fork = ->
77 if ++i > 10
78 expect.fail 'bobbin created more processes than cpus'
79
80 send: pass
81 on: pass
82
83 bobbin.create(10)
84
85 # to cleanly end the test
86 did = false
87
88 cluster_mock.fork = ->
89 unless did
90 expect(i).to.eql 10
91 did = true
92
93 send: pass
94 on: pass
95
96 bobbin.create(1)
97
98 i = 0
99
100 cluster_mock.fork = ->
101 if ++i > 2
102 expect.fail 'bobbin created more processes than cpus'
103
104 send: pass
105 on: pass
106
107 bobbin.create(2)
108
109 # to cleanly end the test
110 did = false
111
112 cluster_mock.fork = ->
113 unless did
114 expect(i).to.eql 2
115 done()
116 did = true
117
118 send: pass
119 on: pass
120
121 bobbin.create(1)
122
123 it 'should create num_cpus processes when N < 1', (done) ->
124 handlers = {}
125 id = undefined
126
127 i = 0
128
129 cluster_mock.fork = ->
130 if ++i > num_cpus
131 expect.fail 'bobbin created more processes than cpus'
132
133 send: pass
134 on: pass
135
136 bobbin.create(0.9)
137
138 # to cleanly end the test
139 did = false
140
141 cluster_mock.fork = ->
142 unless did
143 expect(i).to.eql num_cpus
144 done()
145 did = true
146
147 send: pass
148 on: pass
149
150 bobbin.create()
151
152 it 'should create num_cpus processes when N invalid', (done) ->
153 handlers = {}
154 id = undefined
155
156 i = 0
157
158 cluster_mock.fork = ->
159 if ++i > num_cpus
160 expect.fail 'bobbin created more processes than cpus'
161
162 send: pass
163 on: pass
164
165 bobbin.create('pickles')
166
167 # to cleanly end the test
168 did = false
169
170 cluster_mock.fork = ->
171 unless did
172 expect(i).to.eql num_cpus
173 done()
174 did = true
175
176 send: pass
177 on: pass
178
179 bobbin.create()
180
181 it 'should return a pool object (run, kill)', ->
182 pool = bobbin.create()
183 expect(pool).to.be.an 'object'
184 expect(pool.run).to.be.a 'function'
185 expect(pool.kill).to.be.a 'function'
186
187 describe '[worker pool]', ->
188
189 describe '.run()', ->
190
191 it 'should fail unless work param is a function', ->
192 pool = bobbin.create()
193
194 expect(->
195 pool.run 1, (-> undefined)
196 ).to.throwError (e) -> expect(e).to.be.a TypeError
197
198 it 'should fail unless callback param is a function', ->
199 pool = bobbin.create()
200
201 expect(->
202 pool.run (-> undefined), 1
203 ).to.throwError (e) -> expect(e).to.be.a TypeError
204
205 it 'should send work to another process via cluster', (done) ->
206 data = [true, false, {foo: 1, bar: {baz: 'abc'}, quux: null}]
207 work = (some, fake, params, callback) -> callback null, 'this should be serializable!'
208
209 cluster_mock.fork = ->
210 on: pass
211 send: (msg) ->
212 expect(msg.data).to.eql data
213 expect(msg.work).to.equal work.toString()
214 expect(msg.id).not.to.be undefined
215 done()
216
217 bobbin.create().run data..., work, -> undefined
218
219 it 'should callback with the passed-in function', (done) ->
220 handlers = {}
221 id = undefined
222
223 cluster_mock.fork = ->
224 send: (msg) ->
225 id = msg.id
226 on: (ev, handler) ->
227 handlers[ev] = handler
228
229 bobbin.create().run pass, done
230
231 handlers['message'] {type: 'result', contents: {id: id}}
232
233 it 'should callback with the parameters returned from the worker', (done) ->
234 handlers = {}
235 id = undefined
236 params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
237
238 cluster_mock.fork = ->
239 send: (msg) ->
240 id = msg.id
241 on: (ev, handler) ->
242 handlers[ev] = handler
243
244 bobbin.create().run pass, (data...) ->
245 expect(data).to.eql params
246 done()
247
248 handlers['message'] {
249 type: 'result'
250 contents: {
251 id: id
252 callback_params: params
253 }
254 }
255
256 it 'should apply a WorkerError to the callback when a worker throws an Error', (done) ->
257 handlers = {}
258 id = undefined
259 params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
260
261 cluster_mock.fork = ->
262 send: (msg) ->
263 id = msg.id
264 on: (ev, handler) ->
265 handlers[ev] = handler
266
267 bobbin.create().run pass, (err, result) ->
268 expect(err).to.be.a bobbin.WorkerError
269 expect(err.message).to.be 'what up'
270 expect(err.name).to.be 'Error'
271 expect(result).to.be undefined
272 done()
273
274 handlers['message'] {
275 type: 'exception'
276 contents: {
277 id: id
278 is_error: true
279 error: {
280 type: 'Error'
281 parameters: {
282 name: 'Error'
283 message: 'what up'
284 }
285 }
286 }
287 }
288
289 it 'should apply a thrown non-Error exception to the callback just like an error', (done) ->
290 handlers = {}
291 id = undefined
292 params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
293
294 cluster_mock.fork = ->
295 send: (msg) ->
296 id = msg.id
297 on: (ev, handler) ->
298 handlers[ev] = handler
299
300 bobbin.create().run pass, (err, result) ->
301 expect(err).not.to.be.an Error
302 expect(err).to.be 'bad thing'
303 expect(result).to.be undefined
304 done()
305
306 handlers['message'] {
307 type: 'exception'
308 contents: {
309 id: id
310 is_error: false
311 exception: 'bad thing'
312 }
313 }
314
315 it 'should prefer idle workers', (done) ->
316 workers = []
317 count = 0
318
319 cluster_mock.fork = ->
320 handlers = {}
321 id = undefined
322
323 # extra handler and id accessors for twiddling with stuff later on
324 workers.push
325 send: (msg) ->
326 id = msg.id
327 count++
328
329 on: (ev, handler) ->
330 handlers[ev] = handler
331
332 handler: (ev) ->
333 handlers[ev]
334
335 id: -> id
336
337 workers[workers.length - 1]
338
339
340 # create 30 workers and give them each 2 jobs that will never finish
341 pool = bobbin.create(30)
342 expect(workers.length).to.be 30
343 pool.run(pass, pass) for i in [1..60]
344 expect(count).to.be 60
345
346
347
348 # fail if busy workers get work
349 busyfail = (i) ->
350 -> expect().fail "sent work to busy worker #{i} when there was an idle worker available!"
351
352 for i in [0...30]
353 workers[i].send = busyfail(i)
354
355
356
357 # emulate 'empty' message for worker 14
358 workers[14].handler('message')({
359 type: 'result'
360 contents:
361 id: workers[14].id()
362 callback_params: [null, 'foo']
363 }) for i in [1..2]
364 workers[14].handler('message')({type: 'empty'})
365
366 # once the #14 test passes, do it again with 22 and 6 for good measure
367 workers[14].send = ->
368 workers[22].handler('message')({
369 type: 'result'
370 contents:
371 id: workers[22].id()
372 callback_params: [null, 'foo']
373 }) for i in [1..2]
374 workers[22].handler('message')({type: 'empty'})
375
376 workers[14].send = busyfail(14)
377 workers[22].send = ->
378 workers[6].handler('message')({
379 type: 'result'
380 contents:
381 id: workers[6].id()
382 callback_params: [null, 'foo']
383 }) for i in [1..2]
384 workers[6].handler('message')({type: 'empty'})
385
386 workers[22].send = busyfail(22)
387 workers[6].send = ->
388 # if we got here without a busyfail going off, things worked correctly
389 done()
390
391 pool.run pass, pass
392
393 pool.run pass, pass
394
395 pool.run pass, pass
396
397
398
399 describe '.kill()', ->
400
401 it 'should kill all workers immediately when called with no timeout', ->
402 spy = sinon.spy()
403
404 cluster_mock.fork = ->
405 killed = false
406
407 send: pass
408 on: pass
409 kill: ->
410 unless killed
411 killed = true
412 spy()
413
414
415 pool = bobbin.create(10)
416 pool.kill()
417
418 expect(spy.callCount).to.eql 10
419
420 it 'should kill workers after work finishes if called with sufficient timeout', (done) ->
421 work_done_count = 0
422 killed_count = 0
423 handlers = {}
424
425 inc = ->
426 if ++killed_count is 10
427 expect(work_done_count).to.eql 20
428
429 cluster_mock.fork = ->
430 killed = false
431
432 send: (msg) ->
433 setTimeout (->
434 handlers['message'] {
435 type: 'result'
436 contents:
437 id: msg.id
438 callback_params: []
439 }
440 ), 10
441 on: (ev, handler) ->
442 handlers[ev] = handler
443 kill: ->
444 unless killed
445 killed = true
446 inc()
447
448 pool = bobbin.create(10)
449
450 inc_done = ->
451 if ++work_done_count >= 20
452 done()
453
454 pool.run((-> 'dummy function -- this test ignores this function'), inc_done) for i in [1..20]
455
456 pool.kill(1500)
457
458 it 'should kill workers prematurely if called with small timeout', (done) ->
459 work_done_count = 0
460 killed_count = 0
461 handlers = {}
462
463 inc = ->
464 if ++killed_count is 10
465 expect(work_done_count).to.eql 0
466 done()
467
468 cluster_mock.fork = ->
469 killed = false
470
471 send: (msg) ->
472 setTimeout (->
473 handlers['message'] {
474 type: 'result'
475 contents:
476 id: msg.id
477 callback_params: []
478 }
479 ), 1000
480 on: (ev, handler) ->
481 handlers[ev] = handler
482 kill: ->
483 unless killed
484 killed = true
485 inc()
486
487 pool = bobbin.create(10)
488
489 inc_done = ->
490 if ++work_done_count >= 1
491 expect().fail('should not have run inc_done')
492
493 pool.run((-> 'dummy function -- this test ignores this function'), inc_done) for i in [1..20]
494
495 pool.kill(1)
496
497 it 'should raise an error if new work is submitted after kill()', ->
498 pool = bobbin.create()
499 pool.kill()
500
501 expect(-> pool.run reflex, pass).to.throwError /kill has been called/
502
503 it 'should raise an error if a bad timeout value is passed', ->
504 pool = bobbin.create()
505 expect(-> pool.kill(-1)).to.throwError /non\-negative/
506 expect(-> pool.kill(false)).to.throwError /non\-negative/
507
508 after ->
509 mockery.disable()
\No newline at end of file