UNPKG

2.12 kBJavaScriptView Raw
1// Worker provides a simple framework around the cluster library to orchestrate a multicore ETL pipeline
2// A tasklist is scheduled with a number of workers and then a process function is defined to process
3// each element
4
5const cluster = require('cluster');
6const os = require('os');
7const Promise = require('bluebird');
8
9const isMaster = module.exports.isMaster = cluster.isMaster;
10const isWorker = module.exports.isWorker = cluster.isWorker;
11
12
13// Schedule a list of jobs to be distributed to workers
14module.exports.schedule = function(list,threads,reporting) {
15 let i = 0, last = 0, workers=[], reportInterval;
16 if (!isMaster)
17 throw 'No scheduling from a worker';
18
19 threads = threads || os.cpus().length;
20 list = [].concat(list);
21
22 function next(worker) {
23 const item = list.pop();
24 if (!item) {
25 if (reporting) console.log('Worker done',worker.num);
26 worker.disconnect();
27 worker.done.resolve(true);
28 } else
29 worker.send(item);
30 }
31
32 function createWorker() {
33 const worker = cluster.fork();
34
35 worker.num = threads;
36 worker.done = Promise.defer();
37 workers.push(worker.done.promise);
38 worker.on('message',function(msg) {
39 if (msg.id === 'done')
40 next(worker);
41
42 if (msg.id === 'progress')
43 i+= msg.items;
44 });
45 }
46
47 while (threads--)
48 createWorker();
49
50 cluster.on('online',next);
51
52 if (reporting)
53 reportInterval = setInterval(() => {
54 console.log(i-last,last);
55 last = i;
56 },!isNaN(reporting) ? reporting : 1000);
57
58 return Promise.all(workers)
59 .then(() => {
60 clearInterval(reportInterval);
61 return i;
62 });
63};
64
65// This function should be overwritten in the worker
66module.exports.process = function(d,callback) {
67 callback();
68};
69
70module.exports.progress = function(d) {
71 if (isWorker)
72 process.send({id:'progress',items:d});
73};
74
75if (isWorker)
76 process.on('message',d => {
77 const done = () => process.send({id: 'done'});
78 if (module.exports.process.length > 1)
79 module.exports.process(d,done);
80 else
81 Promise.resolve(module.exports.process(d)).then(done);
82 });
\No newline at end of file