UNPKG

2.35 kBtext/coffeescriptView Raw
1cluster = require 'cluster'
2path = require 'path'
3uuid = require 'node-uuid'
4num_cpus = require('os').cpus().length
5
6WorkerError = (e) ->
7 this.message = e.message
8 this.filename = e.filename
9 this.lineNumber = e.lineNumber
10 this.name = e.name
11 this.stack = e.stack
12
13WorkerError.prototype = new Error
14
15module.exports =
16 WorkerError: WorkerError
17
18 create: (num_workers) ->
19 workers = []
20 worker_queue = []
21 empty = []
22 handlers = {}
23 killing = false
24
25 unless cluster.isMaster is true
26 throw new Error 'needs to be cluster master'
27
28 unless typeof num_workers is 'number' and num_workers >= 1
29 num_workers = num_cpus
30
31 cluster.setupMaster exec: path.join(__dirname, 'worker.coffee')
32
33 worker_msg_handler = (num) ->
34 (msg) ->
35 switch msg.type
36 when 'result'
37 handlers[msg.contents.id] msg.contents.callback_params...
38
39 when 'exception'
40 if msg.contents.is_error
41 handlers[msg.contents.id] (new WorkerError msg.contents.error.parameters)
42 else
43 handlers[msg.contents.id] msg.contents.exception
44
45 when 'empty'
46 worker_queue = worker_queue.filter (x) -> x isnt num
47 worker_queue.unshift num
48 empty[num] = true
49
50 if killing
51 workers[num].kill()
52
53
54 for i in [0...num_workers]
55 w = cluster.fork()
56 w.__num__ = i
57 workers.push w
58 worker_queue.unshift i
59 empty.push false
60
61 w.on 'message', worker_msg_handler(i)
62
63 get_worker = ->
64 worker_queue.push worker_queue.shift()
65 workers[worker_queue[worker_queue.length - 1]]
66
67 return {
68 run: (data..., work, callback) ->
69 if killing
70 throw new Error 'kill has been called, no new work accepted'
71
72 unless typeof work is 'function'
73 throw new TypeError 'work parameter must be a function'
74
75 unless typeof callback is 'function'
76 throw new TypeError 'callback parameter must be a function'
77
78 id = uuid.v1()
79 handlers[id] = callback
80
81 w = get_worker()
82
83 w.send
84 id: id
85 data: data
86 work: work.toString()
87
88 empty[w.__num__] = false
89
90 kill: (timeout=0) ->
91 unless typeof timeout is 'number' and timeout >= 0
92 throw new TypeError 'kill timeout must be a non-negative number of milliseconds'
93
94 unless killing
95 killing = true
96
97 for w in workers
98 if timeout is 0 or empty[w.__num__]
99 w.kill()
100 else
101 setTimeout w.kill, timeout
102 }
\No newline at end of file