1 |
|
2 |
|
3 |
|
4 |
|
5 | const cluster = require('cluster');
|
6 | const os = require('os');
|
7 | const Promise = require('bluebird');
|
8 |
|
9 | const isMaster = module.exports.isMaster = cluster.isMaster;
|
10 | const isWorker = module.exports.isWorker = cluster.isWorker;
|
11 |
|
12 |
|
13 |
|
14 | module.exports.schedule = function(list,threads,reporting) {
|
15 | let i = 0, last = 0, workers=[], reportInterval;
|
16 | if (!isMaster)
|
17 | throw 'No scheduling from a worker';
|
18 |
|
19 | threads = threads || os.cpus().length;
|
20 | list = [].concat(list);
|
21 |
|
22 | function next(worker) {
|
23 | const item = list.pop();
|
24 | if (!item) {
|
25 | if (reporting) console.log('Worker done',worker.num);
|
26 | worker.disconnect();
|
27 | worker.done.resolve(true);
|
28 | } else
|
29 | worker.send(item);
|
30 | }
|
31 |
|
32 | function createWorker() {
|
33 | const worker = cluster.fork();
|
34 |
|
35 | worker.num = threads;
|
36 | worker.done = Promise.defer();
|
37 | workers.push(worker.done.promise);
|
38 | worker.on('message',function(msg) {
|
39 | if (msg.id === 'done')
|
40 | next(worker);
|
41 |
|
42 | if (msg.id === 'progress')
|
43 | i+= msg.items;
|
44 | });
|
45 | }
|
46 |
|
47 | while (threads--)
|
48 | createWorker();
|
49 |
|
50 | cluster.on('online',next);
|
51 |
|
52 | if (reporting)
|
53 | reportInterval = setInterval(() => {
|
54 | console.log(i-last,last);
|
55 | last = i;
|
56 | },!isNaN(reporting) ? reporting : 1000);
|
57 |
|
58 | return Promise.all(workers)
|
59 | .then(() => {
|
60 | clearInterval(reportInterval);
|
61 | return i;
|
62 | });
|
63 | };
|
64 |
|
65 |
|
66 | module.exports.process = function(d,callback) {
|
67 | callback();
|
68 | };
|
69 |
|
70 | module.exports.progress = function(d) {
|
71 | if (isWorker)
|
72 | process.send({id:'progress',items:d});
|
73 | };
|
74 |
|
75 | if (isWorker)
|
76 | process.on('message',d => {
|
77 | const done = () => process.send({id: 'done'});
|
78 | if (module.exports.process.length > 1)
|
79 | module.exports.process(d,done);
|
80 | else
|
81 | Promise.resolve(module.exports.process(d)).then(done);
|
82 | }); |
\ | No newline at end of file |