UNPKG

7.81 kBJavaScriptView Raw
1'use strict';
2
3const debug = require('debug')('pandora:cluster:monitor');
4
5var cluster = require('cluster');
6var os = require('os');
7var util = require('util');
8
9var defer = global.setImmediate || process.nextTick;
10
11module.exports = fork;
12
13/**
14 * cluster fork
15 *
16 * @param {Object} [options]
17 * - {String} exec exec file path
18 * - {Array} [args] exec arguments
19 * - {Array} [slaves] slave processes
20 * - {Boolean} [silent] whether or not to send output to parent's stdio, default is `false`
21 * - {Number} [count] worker num, defualt is `os.cpus().length`
22 * - {Boolean} [refork] refork when disconect and unexpected exit, default is `true`
23 * - {Boolean} [autoCoverage] auto fork with istanbul when `running_under_istanbul` env set, default is `false`
24 * @return {Cluster}
25 */
26
27let bind = false;
28
29function fork(options) {
30
31 const env = options.env;
32
33 if (cluster.isWorker) {
34 return;
35 }
36
37 options = options || {};
38 var count = options.count || os.cpus().length;
39 var refork = options.refork !== false;
40 var limit = options.limit || 60;
41 var duration = options.duration || 60000; // 1 min
42 var reforks = [];
43 var newWorker;
44
45 if (options.exec) {
46 var opts = {
47 exec: options.exec
48 };
49
50 if (options.execArgv !== undefined) {
51 opts.execArgv = options.execArgv;
52 }
53
54 if (options.gid !== undefined) {
55 opts.gid = options.gid;
56 }
57
58 if (options.uid !== undefined) {
59 opts.uid = options.uid;
60 }
61
62 if (options.args !== undefined) {
63 opts.args = options.args;
64 }
65 if (options.silent !== undefined) {
66 opts.silent = options.silent;
67 }
68
69 // https://github.com/gotwarlost/istanbul#multiple-process-usage
70 // Multiple Process under istanbul
71 if (options.autoCoverage && process.env.running_under_istanbul) {
72 // use coverage for forked process
73 // disabled reporting and output for child process
74 // enable pid in child process coverage filename
75 var args = [
76 'cover', '--report', 'none', '--print', 'none', '--include-pid',
77 opts.exec,
78 ];
79 if (opts.args && opts.args.length > 0) {
80 args.push('--');
81 args = args.concat(opts.args);
82 }
83
84 opts.exec = './node_modules/.bin/istanbul';
85 opts.args = args;
86 }
87
88 cluster.setupMaster(opts);
89 }
90
91 var disconnects = {};
92 var disconnectCount = 0;
93 var unexpectedCount = 0;
94
95 if(!bind) {
96 bind = true;
97 cluster.on('disconnect', function (worker) {
98 disconnectCount++;
99 var isDead = worker.isDead && worker.isDead();
100 debug('[%s] [cfork:master:%s] worker:%s disconnect (exitedAfterDisconnect: %s, state: %s, isDead: %s)',
101 Date(), process.pid, worker.process.pid, worker.exitedAfterDisconnect, worker.state, isDead);
102 if (isDead) {
103 // worker has terminated before disconnect
104 debug('[%s] [cfork:master:%s] don\'t fork, because worker:%s exit event emit before disconnect',
105 Date(), process.pid, worker.process.pid);
106 return;
107 }
108
109 disconnects[worker.process.pid] = Date();
110 if (allow(worker)) {
111 newWorker = forkWorker(worker._clusterSettings, env);
112 newWorker._clusterSettings = worker._clusterSettings;
113 debug('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
114 Date(), process.pid, newWorker.process.pid, newWorker.state);
115 } else {
116 debug('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
117 Date(), process.pid, refork);
118 }
119 });
120
121 cluster.on('exit', function (worker, code, signal) {
122 process.nextTick(function () {
123 var isExpected = !!disconnects[worker.process.pid];
124 var isDead = worker.isDead && worker.isDead();
125 debug('[%s] [cfork:master:%s] worker:%s exit (code: %s, exitedAfterDisconnect: %s, state: %s, isDead: %s, isExpected: %s)',
126 Date(), process.pid, worker.process.pid, code, worker.exitedAfterDisconnect, worker.state, isDead, isExpected);
127 if (isExpected) {
128 delete disconnects[worker.process.pid];
129 // worker disconnect first, exit expected
130 return;
131 }
132
133 unexpectedCount++;
134 if (allow(worker)) {
135 newWorker = forkWorker(worker._clusterSettings, env);
136 newWorker._clusterSettings = worker._clusterSettings;
137 debug('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
138 Date(), process.pid, newWorker.process.pid, newWorker.state);
139 } else {
140 debug('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
141 Date(), process.pid, refork);
142 }
143 cluster.emit('unexpectedExit', worker, code, signal);
144 });
145 });
146 }
147
148 // defer to set the listeners
149 // so you can listen this by your own
150 defer(function () {
151 if (process.listeners('uncaughtException').length === 0) {
152 process.on('uncaughtException', onerror);
153 }
154 if (cluster.listeners('unexpectedExit').length === 0) {
155 cluster.on('unexpectedExit', onUnexpected);
156 }
157 if (cluster.listeners('reachReforkLimit').length === 0) {
158 cluster.on('reachReforkLimit', onReachReforkLimit);
159 }
160 });
161
162 for (var i = 0; i < count; i++) {
163 newWorker = forkWorker(null, env);
164 newWorker._clusterSettings = cluster.settings;
165 }
166
167 // fork slaves after workers are forked
168 if (options.slaves) {
169 var slaves = Array.isArray(options.slaves) ? options.slaves : [options.slaves];
170 slaves.map(normalizeSlaveConfig)
171 .forEach(function(settings) {
172 if (settings) {
173 newWorker = forkWorker(settings, env);
174 newWorker._clusterSettings = settings;
175 }
176 });
177 }
178
179 return cluster;
180
181 /**
182 * allow refork
183 */
184 function allow(worker) {
185 if (!refork) {
186 return false;
187 }
188
189 if (worker._refork === false) {
190 return false;
191 }
192
193 var times = reforks.push(Date.now());
194
195 if (times > limit) {
196 reforks.shift();
197 }
198
199 var span = reforks[reforks.length - 1] - reforks[0];
200 var canFork = reforks.length < limit || span > duration;
201
202 if (!canFork) {
203 cluster.emit('reachReforkLimit');
204 }
205
206 return canFork;
207 }
208
209 /**
210 * uncaughtException default handler
211 */
212
213 function onerror(err) {
214 if (!err) {
215 return;
216 }
217 debug('[%s] [cfork:master:%s] master uncaughtException: %s', Date(), process.pid, err.stack);
218 debug(err);
219 debug('(total %d disconnect, %d unexpected exit)', disconnectCount, unexpectedCount);
220 }
221
222 /**
223 * unexpectedExit default handler
224 */
225
226 function onUnexpected(worker, code, signal) {
227 var exitCode = worker.process.exitCode;
228 var err = new Error(util.format('worker:%s died unexpected (code: %s, signal: %s, exitedAfterDisconnect: %s, state: %s)',
229 worker.process.pid, exitCode, signal, worker.exitedAfterDisconnect, worker.state));
230 err.name = 'WorkerDiedUnexpectedError';
231
232 debug('[%s] [cfork:master:%s] (total %d disconnect, %d unexpected exit) %s',
233 Date(), process.pid, disconnectCount, unexpectedCount, err.stack);
234 }
235
236 /**
237 * reachReforkLimit default handler
238 */
239
240 function onReachReforkLimit() {
241 debug('[%s] [cfork:master:%s] worker died too fast (total %d disconnect, %d unexpected exit)',
242 Date(), process.pid, disconnectCount, unexpectedCount);
243 }
244
245 /**
246 * normalize slave config
247 */
248 function normalizeSlaveConfig(opt) {
249 // exec path
250 if (typeof opt === 'string') {
251 opt = { exec: opt };
252 }
253 if (!opt.exec) {
254 return null;
255 } else {
256 return opt;
257 }
258 }
259
260 /**
261 * fork worker with certain settings
262 */
263 function forkWorker(settings, env) {
264 if (settings) {
265 cluster.settings = settings;
266 cluster.setupMaster();
267 }
268 return cluster.fork(env);
269 }
270}