1 | mockery = require 'mockery'
|
2 | sinon = require 'sinon'
|
3 | expect = require 'expect.js'
|
4 | uuid = require 'node-uuid'
|
5 | path = require 'path'
|
6 | fs = require 'fs'
|
7 | num_cpus = require('os').cpus().length
|
8 |
|
9 | pass = -> undefined
|
10 | reflex = (cb) -> cb()
|
11 | slow_reflex = (cb) -> setTimeout cb, 10
|
12 |
|
13 | describe 'bobbin', ->
|
14 |
|
15 | cluster_mock =
|
16 | isMaster: true
|
17 | setupMaster: (opts) ->
|
18 | expect(fs.existsSync opts.exec).to.be.ok()
|
19 | fork: ->
|
20 | on: pass
|
21 | send: pass
|
22 |
|
23 | bobbin = null
|
24 |
|
25 | before ->
|
26 | mockery.enable useCleanCache: true
|
27 | mockery.registerAllowable '../src/bobbin.coffee', true
|
28 | mockery.registerAllowables ['os', 'node-uuid', 'path']
|
29 | mockery.registerMock 'cluster', cluster_mock
|
30 | bobbin = require '../src/bobbin.coffee'
|
31 |
|
32 | describe '.create()', ->
|
33 |
|
34 | it 'should fail if not cluster.isMaster', ->
|
35 | cluster_mock.isMaster = false
|
36 |
|
37 | expect(-> bobbin.create()).to.throwError()
|
38 |
|
39 | cluster_mock.isMaster = true
|
40 |
|
41 | it 'should create num_cpus processes by default', (done) ->
|
42 | handlers = {}
|
43 | id = undefined
|
44 |
|
45 | i = 0
|
46 |
|
47 | cluster_mock.fork = ->
|
48 | if ++i > num_cpus
|
49 | expect.fail 'bobbin created more processes than cpus'
|
50 |
|
51 | send: pass
|
52 | on: pass
|
53 |
|
54 | bobbin.create()
|
55 |
|
56 |
|
57 | did = false
|
58 |
|
59 | cluster_mock.fork = ->
|
60 | unless did
|
61 | expect(i).to.eql num_cpus
|
62 | done()
|
63 | did = true
|
64 |
|
65 | send: pass
|
66 | on: pass
|
67 |
|
68 | bobbin.create()
|
69 |
|
70 | it 'should create N processes for N >= 1', (done) ->
|
71 | handlers = {}
|
72 | id = undefined
|
73 |
|
74 | i = 0
|
75 |
|
76 | cluster_mock.fork = ->
|
77 | if ++i > 10
|
78 | expect.fail 'bobbin created more processes than cpus'
|
79 |
|
80 | send: pass
|
81 | on: pass
|
82 |
|
83 | bobbin.create(10)
|
84 |
|
85 |
|
86 | did = false
|
87 |
|
88 | cluster_mock.fork = ->
|
89 | unless did
|
90 | expect(i).to.eql 10
|
91 | did = true
|
92 |
|
93 | send: pass
|
94 | on: pass
|
95 |
|
96 | bobbin.create(1)
|
97 |
|
98 | i = 0
|
99 |
|
100 | cluster_mock.fork = ->
|
101 | if ++i > 2
|
102 | expect.fail 'bobbin created more processes than cpus'
|
103 |
|
104 | send: pass
|
105 | on: pass
|
106 |
|
107 | bobbin.create(2)
|
108 |
|
109 |
|
110 | did = false
|
111 |
|
112 | cluster_mock.fork = ->
|
113 | unless did
|
114 | expect(i).to.eql 2
|
115 | done()
|
116 | did = true
|
117 |
|
118 | send: pass
|
119 | on: pass
|
120 |
|
121 | bobbin.create(1)
|
122 |
|
123 | it 'should create num_cpus processes when N < 1', (done) ->
|
124 | handlers = {}
|
125 | id = undefined
|
126 |
|
127 | i = 0
|
128 |
|
129 | cluster_mock.fork = ->
|
130 | if ++i > num_cpus
|
131 | expect.fail 'bobbin created more processes than cpus'
|
132 |
|
133 | send: pass
|
134 | on: pass
|
135 |
|
136 | bobbin.create(0.9)
|
137 |
|
138 |
|
139 | did = false
|
140 |
|
141 | cluster_mock.fork = ->
|
142 | unless did
|
143 | expect(i).to.eql num_cpus
|
144 | done()
|
145 | did = true
|
146 |
|
147 | send: pass
|
148 | on: pass
|
149 |
|
150 | bobbin.create()
|
151 |
|
152 | it 'should create num_cpus processes when N invalid', (done) ->
|
153 | handlers = {}
|
154 | id = undefined
|
155 |
|
156 | i = 0
|
157 |
|
158 | cluster_mock.fork = ->
|
159 | if ++i > num_cpus
|
160 | expect.fail 'bobbin created more processes than cpus'
|
161 |
|
162 | send: pass
|
163 | on: pass
|
164 |
|
165 | bobbin.create('pickles')
|
166 |
|
167 |
|
168 | did = false
|
169 |
|
170 | cluster_mock.fork = ->
|
171 | unless did
|
172 | expect(i).to.eql num_cpus
|
173 | done()
|
174 | did = true
|
175 |
|
176 | send: pass
|
177 | on: pass
|
178 |
|
179 | bobbin.create()
|
180 |
|
181 | it 'should return a pool object (run, kill)', ->
|
182 | pool = bobbin.create()
|
183 | expect(pool).to.be.an 'object'
|
184 | expect(pool.run).to.be.a 'function'
|
185 | expect(pool.kill).to.be.a 'function'
|
186 |
|
187 | describe '[worker pool]', ->
|
188 |
|
189 | describe '.run()', ->
|
190 |
|
191 | it 'should fail unless work param is a function', ->
|
192 | pool = bobbin.create()
|
193 |
|
194 | expect(->
|
195 | pool.run 1, (-> undefined)
|
196 | ).to.throwError (e) -> expect(e).to.be.a TypeError
|
197 |
|
198 | it 'should fail unless callback param is a function', ->
|
199 | pool = bobbin.create()
|
200 |
|
201 | expect(->
|
202 | pool.run (-> undefined), 1
|
203 | ).to.throwError (e) -> expect(e).to.be.a TypeError
|
204 |
|
205 | it 'should send work to another process via cluster', (done) ->
|
206 | data = [true, false, {foo: 1, bar: {baz: 'abc'}, quux: null}]
|
207 | work = (some, fake, params, callback) -> callback null, 'this should be serializable!'
|
208 |
|
209 | cluster_mock.fork = ->
|
210 | on: pass
|
211 | send: (msg) ->
|
212 | expect(msg.data).to.eql data
|
213 | expect(msg.work).to.equal work.toString()
|
214 | expect(msg.id).not.to.be undefined
|
215 | done()
|
216 |
|
217 | bobbin.create().run data..., work, -> undefined
|
218 |
|
219 | it 'should callback with the passed-in function', (done) ->
|
220 | handlers = {}
|
221 | id = undefined
|
222 |
|
223 | cluster_mock.fork = ->
|
224 | send: (msg) ->
|
225 | id = msg.id
|
226 | on: (ev, handler) ->
|
227 | handlers[ev] = handler
|
228 |
|
229 | bobbin.create().run pass, done
|
230 |
|
231 | handlers['message'] {type: 'result', contents: {id: id}}
|
232 |
|
233 | it 'should callback with the parameters returned from the worker', (done) ->
|
234 | handlers = {}
|
235 | id = undefined
|
236 | params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
|
237 |
|
238 | cluster_mock.fork = ->
|
239 | send: (msg) ->
|
240 | id = msg.id
|
241 | on: (ev, handler) ->
|
242 | handlers[ev] = handler
|
243 |
|
244 | bobbin.create().run pass, (data...) ->
|
245 | expect(data).to.eql params
|
246 | done()
|
247 |
|
248 | handlers['message'] {
|
249 | type: 'result'
|
250 | contents: {
|
251 | id: id
|
252 | callback_params: params
|
253 | }
|
254 | }
|
255 |
|
256 | it 'should apply a WorkerError to the callback when a worker throws an Error', (done) ->
|
257 | handlers = {}
|
258 | id = undefined
|
259 | params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
|
260 |
|
261 | cluster_mock.fork = ->
|
262 | send: (msg) ->
|
263 | id = msg.id
|
264 | on: (ev, handler) ->
|
265 | handlers[ev] = handler
|
266 |
|
267 | bobbin.create().run pass, (err, result) ->
|
268 | expect(err).to.be.a bobbin.WorkerError
|
269 | expect(err.message).to.be 'what up'
|
270 | expect(err.name).to.be 'Error'
|
271 | expect(result).to.be undefined
|
272 | done()
|
273 |
|
274 | handlers['message'] {
|
275 | type: 'exception'
|
276 | contents: {
|
277 | id: id
|
278 | is_error: true
|
279 | error: {
|
280 | type: 'Error'
|
281 | parameters: {
|
282 | name: 'Error'
|
283 | message: 'what up'
|
284 | }
|
285 | }
|
286 | }
|
287 | }
|
288 |
|
289 | it 'should apply a thrown non-Error exception to the callback just like an error', (done) ->
|
290 | handlers = {}
|
291 | id = undefined
|
292 | params = ['foo', 'bar', 'baz', {quux: true, ziv: {a: 1, b:2}}]
|
293 |
|
294 | cluster_mock.fork = ->
|
295 | send: (msg) ->
|
296 | id = msg.id
|
297 | on: (ev, handler) ->
|
298 | handlers[ev] = handler
|
299 |
|
300 | bobbin.create().run pass, (err, result) ->
|
301 | expect(err).not.to.be.an Error
|
302 | expect(err).to.be 'bad thing'
|
303 | expect(result).to.be undefined
|
304 | done()
|
305 |
|
306 | handlers['message'] {
|
307 | type: 'exception'
|
308 | contents: {
|
309 | id: id
|
310 | is_error: false
|
311 | exception: 'bad thing'
|
312 | }
|
313 | }
|
314 |
|
315 | it 'should prefer idle workers', (done) ->
|
316 | workers = []
|
317 | count = 0
|
318 |
|
319 | cluster_mock.fork = ->
|
320 | handlers = {}
|
321 | id = undefined
|
322 |
|
323 |
|
324 | workers.push
|
325 | send: (msg) ->
|
326 | id = msg.id
|
327 | count++
|
328 |
|
329 | on: (ev, handler) ->
|
330 | handlers[ev] = handler
|
331 |
|
332 | handler: (ev) ->
|
333 | handlers[ev]
|
334 |
|
335 | id: -> id
|
336 |
|
337 | workers[workers.length - 1]
|
338 |
|
339 |
|
340 |
|
341 | pool = bobbin.create(30)
|
342 | expect(workers.length).to.be 30
|
343 | pool.run(pass, pass) for i in [1..60]
|
344 | expect(count).to.be 60
|
345 |
|
346 |
|
347 |
|
348 |
|
349 | busyfail = (i) ->
|
350 | -> expect().fail "sent work to busy worker #{i} when there was an idle worker available!"
|
351 |
|
352 | for i in [0...30]
|
353 | workers[i].send = busyfail(i)
|
354 |
|
355 |
|
356 |
|
357 |
|
358 | workers[14].handler('message')({
|
359 | type: 'result'
|
360 | contents:
|
361 | id: workers[14].id()
|
362 | callback_params: [null, 'foo']
|
363 | }) for i in [1..2]
|
364 | workers[14].handler('message')({type: 'empty'})
|
365 |
|
366 |
|
367 | workers[14].send = ->
|
368 | workers[22].handler('message')({
|
369 | type: 'result'
|
370 | contents:
|
371 | id: workers[22].id()
|
372 | callback_params: [null, 'foo']
|
373 | }) for i in [1..2]
|
374 | workers[22].handler('message')({type: 'empty'})
|
375 |
|
376 | workers[14].send = busyfail(14)
|
377 | workers[22].send = ->
|
378 | workers[6].handler('message')({
|
379 | type: 'result'
|
380 | contents:
|
381 | id: workers[6].id()
|
382 | callback_params: [null, 'foo']
|
383 | }) for i in [1..2]
|
384 | workers[6].handler('message')({type: 'empty'})
|
385 |
|
386 | workers[22].send = busyfail(22)
|
387 | workers[6].send = ->
|
388 |
|
389 | done()
|
390 |
|
391 | pool.run pass, pass
|
392 |
|
393 | pool.run pass, pass
|
394 |
|
395 | pool.run pass, pass
|
396 |
|
397 |
|
398 |
|
399 | describe '.kill()', ->
|
400 |
|
401 | it 'should kill all workers immediately when called with no timeout', ->
|
402 | spy = sinon.spy()
|
403 |
|
404 | cluster_mock.fork = ->
|
405 | killed = false
|
406 |
|
407 | send: pass
|
408 | on: pass
|
409 | kill: ->
|
410 | unless killed
|
411 | killed = true
|
412 | spy()
|
413 |
|
414 |
|
415 | pool = bobbin.create(10)
|
416 | pool.kill()
|
417 |
|
418 | expect(spy.callCount).to.eql 10
|
419 |
|
420 | it 'should kill workers after work finishes if called with sufficient timeout', (done) ->
|
421 | work_done_count = 0
|
422 | killed_count = 0
|
423 | handlers = {}
|
424 |
|
425 | inc = ->
|
426 | if ++killed_count is 10
|
427 | expect(work_done_count).to.eql 20
|
428 |
|
429 | cluster_mock.fork = ->
|
430 | killed = false
|
431 |
|
432 | send: (msg) ->
|
433 | setTimeout (->
|
434 | handlers['message'] {
|
435 | type: 'result'
|
436 | contents:
|
437 | id: msg.id
|
438 | callback_params: []
|
439 | }
|
440 | ), 10
|
441 | on: (ev, handler) ->
|
442 | handlers[ev] = handler
|
443 | kill: ->
|
444 | unless killed
|
445 | killed = true
|
446 | inc()
|
447 |
|
448 | pool = bobbin.create(10)
|
449 |
|
450 | inc_done = ->
|
451 | if ++work_done_count >= 20
|
452 | done()
|
453 |
|
454 | pool.run((-> 'dummy function -- this test ignores this function'), inc_done) for i in [1..20]
|
455 |
|
456 | pool.kill(1500)
|
457 |
|
458 | it 'should kill workers prematurely if called with small timeout', (done) ->
|
459 | work_done_count = 0
|
460 | killed_count = 0
|
461 | handlers = {}
|
462 |
|
463 | inc = ->
|
464 | if ++killed_count is 10
|
465 | expect(work_done_count).to.eql 0
|
466 | done()
|
467 |
|
468 | cluster_mock.fork = ->
|
469 | killed = false
|
470 |
|
471 | send: (msg) ->
|
472 | setTimeout (->
|
473 | handlers['message'] {
|
474 | type: 'result'
|
475 | contents:
|
476 | id: msg.id
|
477 | callback_params: []
|
478 | }
|
479 | ), 1000
|
480 | on: (ev, handler) ->
|
481 | handlers[ev] = handler
|
482 | kill: ->
|
483 | unless killed
|
484 | killed = true
|
485 | inc()
|
486 |
|
487 | pool = bobbin.create(10)
|
488 |
|
489 | inc_done = ->
|
490 | if ++work_done_count >= 1
|
491 | expect().fail('should not have run inc_done')
|
492 |
|
493 | pool.run((-> 'dummy function -- this test ignores this function'), inc_done) for i in [1..20]
|
494 |
|
495 | pool.kill(1)
|
496 |
|
497 | it 'should raise an error if new work is submitted after kill()', ->
|
498 | pool = bobbin.create()
|
499 | pool.kill()
|
500 |
|
501 | expect(-> pool.run reflex, pass).to.throwError /kill has been called/
|
502 |
|
503 | it 'should raise an error if a bad timeout value is passed', ->
|
504 | pool = bobbin.create()
|
505 | expect(-> pool.kill(-1)).to.throwError /non\-negative/
|
506 | expect(-> pool.kill(false)).to.throwError /non\-negative/
|
507 |
|
508 | after ->
|
509 | mockery.disable() |
\ | No newline at end of file |