UNPKG

3.27 kBJavaScriptView Raw
1const {EventEmitter} = require('events');
2const os = require('os');
3const Farm = require('worker-farm/lib/farm');
4const promisify = require('./utils/promisify');
5const logger = require('./Logger');
6
7let shared = null;
8
9class WorkerFarm extends Farm {
10 constructor(options) {
11 let opts = {
12 maxConcurrentWorkers: getNumWorkers()
13 };
14
15 let workerPath =
16 parseInt(process.versions.node, 10) < 8
17 ? require.resolve('../lib/worker')
18 : require.resolve('../src/worker');
19
20 super(opts, workerPath);
21
22 this.localWorker = this.promisifyWorker(require('./worker'));
23 this.remoteWorker = this.promisifyWorker(this.setup(['init', 'run']));
24
25 this.started = false;
26 this.warmWorkers = 0;
27 this.init(options);
28 }
29
30 init(options) {
31 this.localWorker.init(options);
32 this.initRemoteWorkers(options);
33 }
34
35 promisifyWorker(worker) {
36 let res = {};
37
38 for (let key in worker) {
39 res[key] = promisify(worker[key].bind(worker));
40 }
41
42 return res;
43 }
44
45 async initRemoteWorkers(options) {
46 this.started = false;
47 this.warmWorkers = 0;
48
49 let promises = [];
50 for (let i = 0; i < this.options.maxConcurrentWorkers; i++) {
51 promises.push(this.remoteWorker.init(options));
52 }
53
54 await Promise.all(promises);
55 if (this.options.maxConcurrentWorkers > 0) {
56 this.started = true;
57 }
58 }
59
60 receive(data) {
61 if (data.event) {
62 this.emit(data.event, ...data.args);
63 } else if (data.type === 'logger') {
64 if (this.shouldUseRemoteWorkers()) {
65 logger.handleMessage(data);
66 }
67 } else if (this.children[data.child]) {
68 super.receive(data);
69 }
70 }
71
72 shouldUseRemoteWorkers() {
73 return this.started && this.warmWorkers >= this.activeChildren;
74 }
75
76 async run(...args) {
77 // Child process workers are slow to start (~600ms).
78 // While we're waiting, just run on the main thread.
79 // This significantly speeds up startup time.
80 if (this.shouldUseRemoteWorkers()) {
81 return this.remoteWorker.run(...args, false);
82 } else {
83 // Workers have started, but are not warmed up yet.
84 // Send the job to a remote worker in the background,
85 // but use the result from the local worker - it will be faster.
86 if (this.started) {
87 this.remoteWorker.run(...args, true).then(
88 () => {
89 this.warmWorkers++;
90 },
91 () => {
92 // ignore error
93 }
94 );
95 }
96
97 return this.localWorker.run(...args, false);
98 }
99 }
100
101 end() {
102 // Force kill all children
103 this.ending = true;
104 for (let child in this.children) {
105 this.stopChild(child);
106 }
107
108 this.ending = false;
109 shared = null;
110 }
111
112 static getShared(options) {
113 if (!shared) {
114 shared = new WorkerFarm(options);
115 } else {
116 shared.init(options);
117 }
118
119 return shared;
120 }
121}
122
123for (let key in EventEmitter.prototype) {
124 WorkerFarm.prototype[key] = EventEmitter.prototype[key];
125}
126
127function getNumWorkers() {
128 if (process.env.PARCEL_WORKERS) {
129 return parseInt(process.env.PARCEL_WORKERS, 10);
130 }
131
132 let cores;
133 try {
134 cores = require('physical-cpu-count');
135 } catch (err) {
136 cores = os.cpus().length;
137 }
138 return cores || 1;
139}
140
141module.exports = WorkerFarm;