UNPKG

17.4 kBJavaScriptView Raw
1/*!
2 * Copyright (c) 2012-2019 Digital Bazaar, Inc. All rights reserved.
3 */
4'use strict';
5
6const brUtil = require('./util');
7const cc = brUtil.config.main.computer();
8const cluster = require('cluster');
9const config = require('./config');
10const cycle = require('cycle');
11const errio = require('errio');
12const events = require('./events');
13const loggers = require('./loggers');
14const path = require('path');
15const pkginfo = require('pkginfo');
16const program = require('commander');
17const {deprecate} = require('util');
18const {BedrockError} = brUtil;
19
20// core API
21const api = {};
22api.config = config;
23api.events = events;
24api.loggers = loggers;
25api.util = brUtil;
26module.exports = api;
27
28// read package.json fields
29pkginfo(module, 'version');
30
31// register error class
32errio.register(BedrockError);
33
34// config paths
35// configured here instead of config.js due to util dependency issues
36// FIXME: v2.0.0: remove when removing warnings below.
37const _warningShown = {
38 cache: false,
39 log: false
40};
41cc({
42 'paths.cache': () => {
43 // FIXME: v2.0.0: remove warning and default and throw exception .
44 //throw new BedrockError(
45 // 'bedrock.config.paths.cache not set.',
46 // 'ConfigError');
47 const cachePath = path.join(__dirname, '..', '.cache');
48 if(!_warningShown.cache) {
49 loggers.get('app').error(
50 `"bedrock.config.paths.cache" not set, using default: "${cachePath}"`);
51 _warningShown.cache = true;
52 }
53 return cachePath;
54 },
55 'paths.log': () => {
56 // FIXME: v2.0.0: remove warning and default and throw exception .
57 //throw new BedrockError(
58 // 'bedrock.config.paths.log not set.',
59 // 'ConfigError');
60 const logPath = path.join('/tmp/bedrock-dev');
61 if(!_warningShown.log) {
62 // Using console since this config value used during logger setup.
63 console.warn('WARNING: ' +
64 `"bedrock.config.paths.log" not set, using default: "${logPath}"`);
65 _warningShown.log = true;
66 }
67 return logPath;
68 }
69});
70
71// expose bedrock program
72api.program = program.version(api.version);
73
74/**
75 * Starts the bedrock application.
76 *
77 * @param options the options to use:
78 * [script] the script to execute when forking bedrock workers, by
79 * default this will be process.argv[1].
80 *
81 * @returns {Promise} Resolves when the application has started or an error has
82 * occured.
83 */
84api.start = async options => {
85 options = options || {};
86
87 const startTime = Date.now();
88
89 function collect(val, memo) {
90 memo.push(val);
91 return memo;
92 }
93
94 // add built-in CLI options
95 program
96 .option('--config <config>',
97 'Load a config file. (repeatable)', collect, [])
98 .option('--log-level <level>',
99 'Console log level: ' +
100 'silly, verbose, debug, info, warning, error, critical.')
101 .option('--log-timestamps <timestamps>',
102 'Override console log timestamps config. (boolean)', brUtil.boolify)
103 .option('--log-colorize <colorize>',
104 'Override console log colorization config. (boolean)', brUtil.boolify)
105 .option('--log-exclude <modules>',
106 'Do not log events from the specified comma separated modules.')
107 .option('--log-only <modules>',
108 'Only log events from the specified comma separated modules.')
109 .option('--log-transports <spec>',
110 'Transport spec. Use category=[-|+]transport[;...][, ...] ' +
111 'eg, access=+console;-access,app=-console')
112 .option('--silent', 'Show no console output.')
113 .option('--workers <num>',
114 'The number of workers to use (0: # of cpus).', parseInt);
115
116 await events.emit('bedrock-cli.init');
117 _parseCommandLine();
118 await events.emit('bedrock-cli.parsed');
119 _loadConfigs();
120 _configureLoggers();
121 _configureWorkers();
122 _configureProcess();
123 await events.emit('bedrock-loggers.init');
124
125 try {
126 await loggers.init();
127 } catch(err) {
128 // can't log, quit
129 console.error('Failed to initialize logging system:', err);
130 process.exit(1);
131 }
132 // run
133 if(cluster.isMaster) {
134 _runMaster(startTime, options);
135 // don't call callback in master process
136 return;
137 }
138 try {
139 await _runWorker(startTime);
140 } catch(err) {
141 await events.emit('bedrock.error', err);
142 throw err;
143 }
144};
145
146/**
147 * Called from workers to set the worker and master process user if it hasn't
148 * already been set.
149 */
150let _switchedProcessUser = false;
151api.setProcessUser = function() {
152 if(_switchedProcessUser) {
153 return;
154 }
155 _switchedProcessUser = true;
156 // switch group
157 if(config.core.running.groupId && process.setgid) {
158 process.setgid(config.core.running.groupId);
159 }
160 // switch user
161 if(config.core.running.userId && process.setuid) {
162 process.setuid(config.core.running.userId);
163 }
164 // send message to master
165 process.send({type: 'bedrock.switchProcessUser'});
166};
167
168/**
169 * Called from a worker to execute the given function in only one worker.
170 *
171 * @param {string} id - a unique identifier for the function to execute.
172 * @param {Function} fn - The async function to execute.
173 * @param {Object} [options={}] the options to use:
174 * [allowOnRestart] true to allow this function to execute again,
175 * but only once, on worker restart; this option is useful for
176 * behavior that persists but should only run in a single worker.
177 *
178 * @returns {Promise} Resolves when the operation completes.
179 */
180api.runOnce = async (id, fn, options = {}) => {
181 const type = 'bedrock.runOnce';
182
183 // notify master to schedule work (if not already scheduled/run)
184 process.send({type, id, options});
185
186 // wait for scheduling result
187 let msg = await _waitForOneMessage({type, id});
188
189 // work completed in another worker, finish
190 if(msg.done) {
191 if(msg.error) {
192 throw errio.fromObject(msg.error, {stack: true});
193 }
194 return;
195 }
196
197 // run in this worker
198 msg = {type, id, done: true};
199 let error;
200 try {
201 await fn();
202 } catch(e) {
203 error = e;
204 msg.error = cycle.decycle(errio.toObject(e, {stack: true}));
205 }
206
207 // notify other workers that work is complete
208 process.send(msg);
209
210 if(error) {
211 throw error;
212 }
213};
214
215/**
216 * **DEPRECATED**: runOnceAsync() is deprecated. Use runOnce() instead.
217 */
218api.runOnceAsync = deprecate(
219 api.runOnce, 'runOnceAsync() is deprecated. Use runOnce() instead.');
220
221/**
222 * Called from a worker to exit gracefully. Typically used by subcommands.
223 */
224api.exit = function() {
225 cluster.worker.kill();
226};
227
228async function _waitForOneMessage({type, id}) {
229 // get coordinated message from master
230 return new Promise(resolve => {
231 // listen to run function once
232 process.on('message', _listenOnce);
233 function _listenOnce(msg) {
234 // ignore other messages
235 if(!(_isMessageType(msg, type) && msg.id === id)) {
236 return;
237 }
238 process.removeListener('message', _listenOnce);
239 resolve(msg);
240 }
241 });
242}
243
244function _parseCommandLine() {
245 program.parse(process.argv);
246 if(config.cli.command === null) {
247 // set default command
248 config.cli.command = new program.Command('bedrock');
249 }
250}
251
252function _loadConfigs() {
253 program.config.forEach(function(cfg) {
254 require(path.resolve(process.cwd(), cfg));
255 });
256}
257
258function _configureLoggers() {
259 // set console log flags
260 if('logLevel' in program) {
261 config.loggers.console.level = program.logLevel;
262 }
263 if('logColorize' in program) {
264 config.loggers.console.colorize = program.logColorize;
265 }
266 if('logTimestamps' in program) {
267 config.loggers.console.timestamp = program.logTimestamps;
268 }
269 if('logExclude' in program) {
270 config.loggers.console.bedrock.excludeModules =
271 program.logExclude.split(',');
272 }
273 if('logOnly' in program) {
274 config.loggers.console.bedrock.onlyModules = program.logOnly.split(',');
275 }
276 // adjust transports
277 if('logTransports' in program) {
278 const t = program.logTransports;
279 const cats = t.split(',');
280 cats.forEach(function(cat) {
281 const catName = cat.split('=')[0];
282 let catTransports;
283 if(catName in config.loggers.categories) {
284 catTransports = config.loggers.categories[catName];
285 } else {
286 catTransports = config.loggers.categories[catName] = [];
287 }
288 const transports = cat.split('=')[1].split(';');
289 transports.forEach(function(transport) {
290 if(transport.indexOf('-') === 0) {
291 const tName = transport.slice(1);
292 const tIndex = catTransports.indexOf(tName);
293 if(tIndex !== -1) {
294 catTransports.splice(tIndex, 1);
295 }
296 } else if(transport.indexOf('+') === 0) {
297 const tName = transport.slice(1);
298 const tIndex = catTransports.indexOf(tName);
299 if(tIndex === -1) {
300 catTransports.push(tName);
301 }
302 } else {
303 const tName = transport;
304 const tIndex = catTransports.indexOf(tName);
305 if(tIndex === -1) {
306 catTransports.push(tName);
307 }
308 }
309 });
310 });
311 }
312 if(program.silent || program.logLevel === 'none') {
313 config.loggers.console.silent = true;
314 }
315}
316
317function _configureWorkers() {
318 if('workers' in program) {
319 config.core.workers = program.workers;
320 }
321 if(config.core.workers <= 0) {
322 config.core.workers = require('os').cpus().length;
323 }
324}
325
326function _configureProcess() {
327 // set no limit on event listeners
328 process.setMaxListeners(0);
329
330 // exit on terminate
331 process.on('SIGTERM', function() {
332 process.exit();
333 });
334
335 if(cluster.isMaster) {
336 cluster.setupMaster({
337 exec: path.join(__dirname, 'worker.js')
338 });
339
340 // set group before initializing loggers
341 if(config.core.starting.groupId && process.setgid) {
342 try {
343 process.setgid(config.core.starting.groupId);
344 } catch(ex) {
345 console.warn('Failed to set master starting gid: ' + ex);
346 }
347 }
348 // set user before initializing loggers
349 if(config.core.starting.userId && process.setuid) {
350 try {
351 process.setuid(config.core.starting.userId);
352 } catch(ex) {
353 console.warn('Failed to set master starting uid: ' + ex);
354 }
355 }
356 }
357}
358
359function _setupUncaughtExceptionHandler(logger, logPrefix) {
360 // log uncaught exception and exit, except in test mode
361 if(config.cli.command.name() !== 'test') {
362 process.on('uncaughtException', function(err) {
363 process.removeAllListeners('uncaughtException');
364 logger.critical(`${logPrefix} uncaught error`, {error: err});
365 process.exit(1);
366 });
367 }
368}
369
370function _runMaster(startTime, options) {
371 // FIXME: use child logger
372 const logger = loggers.get('app');
373 const logPrefix = '[bedrock/master]';
374
375 // setup cluster if running with istanbul coverage
376 if(process.env.running_under_istanbul) {
377 // TODO: does this need adjusting after fixing the worker `cwd` issue?
378 // re-call cover with no reporting and using pid named output
379 cluster.setupMaster({
380 exec: './node_modules/.bin/istanbul',
381 args: [
382 'cover', '--report', 'none', '--print', 'none', '--include-pid',
383 process.argv[1], '--'].concat(process.argv.slice(2))
384 });
385 }
386
387 // set 'ps' title
388 const args = process.argv.slice(2).join(' ');
389 process.title = config.core.master.title + (args ? (' ' + args) : '');
390
391 _setupUncaughtExceptionHandler(logger, logPrefix);
392
393 logger.info(
394 `${logPrefix} starting process "${config.core.master.title}"`,
395 {pid: process.pid});
396
397 // get starting script
398 const script = options.script || process.argv[1];
399
400 // keep track of master state
401 const masterState = {
402 switchedUser: false,
403 runOnce: {}
404 };
405
406 // notify workers to exit if master exits
407 process.on('exit', function() {
408 for(const id in cluster.workers) {
409 cluster.workers[id].send({type: 'bedrock.core', message: 'exit'});
410 }
411 });
412
413 // handle worker exit
414 cluster.on('exit', function(worker, code) {
415 // if the worker called kill() or disconnect(), it was intentional, so exit
416 // the process
417 if(worker.exitedAfterDisconnect) {
418 logger.info(
419 `${logPrefix} worker "${worker.process.pid}" exited on purpose` +
420 ` with code "${code}"; exiting master process.`);
421 process.exit(code);
422 }
423
424 // accidental worker exit (crash)
425 logger.critical(
426 `${logPrefix} worker "${worker.process.pid}" exited with code "${code}"`);
427
428 // if configured, fork a replacement worker
429 if(config.core.worker.restart) {
430 // clear any runOnce records w/allowOnRestart option set
431 for(const id in masterState.runOnce) {
432 if(masterState.runOnce[id].worker === worker.id &&
433 masterState.runOnce[id].options.allowOnRestart) {
434 delete masterState.runOnce[id];
435 }
436 }
437 _startWorker(masterState, script);
438 } else {
439 process.exit(1);
440 }
441 });
442
443 // fork each app process
444 const workers = config.core.workers;
445 for(let i = 0; i < workers; ++i) {
446 _startWorker(masterState, script);
447 }
448 logger.info(`${logPrefix} started`, {timeMs: Date.now() - startTime});
449}
450
451async function _runWorker(startTime) {
452 // FIXME: use child logger
453 const logger = loggers.get('app');
454 const logPrefix = '[bedrock/worker]';
455
456 // set 'ps' title
457 const args = process.argv.slice(2).join(' ');
458 process.title = config.core.worker.title + (args ? (' ' + args) : '');
459
460 _setupUncaughtExceptionHandler(logger, logPrefix);
461
462 logger.info(`${logPrefix} starting process "${config.core.worker.title}"`);
463
464 // listen for master process exit
465 let bedrockStarted = false;
466 process.on('message', function(msg) {
467 if(!_isMessageType(msg, 'bedrock.core') || msg.message !== 'exit') {
468 return;
469 }
470
471 if(!bedrockStarted) {
472 return events.emit('bedrock-cli.exit').finally(() => {
473 process.exit();
474 });
475 }
476
477 return events.emit('bedrock.stop').then(() => {
478 return events.emit('bedrock-cli.exit').finally(() => {
479 process.exit();
480 });
481 });
482 });
483
484 const cliReady = await events.emit('bedrock-cli.ready');
485 // skip default behavior if cancelled (do not emit bedrock core events)
486 // used for CLI commands
487 if(cliReady === false) {
488 return;
489 }
490 bedrockStarted = true;
491 await events.emit('bedrock.configure');
492 await events.emit('bedrock.admin.init');
493 // set process user
494 api.setProcessUser();
495 await events.emit('bedrock.init');
496 await events.emit('bedrock.start');
497 await events.emit('bedrock.ready');
498 await events.emit('bedrock.started');
499 logger.info(`${logPrefix} started`, {timeMs: Date.now() - startTime});
500}
501
502function _startWorker(state, script) {
503 const worker = cluster.fork();
504 loggers.attach(worker);
505
506 // listen to start requests from workers
507 worker.on('message', initWorker);
508
509 // TODO: simplify with cluster.on('online')?
510 function initWorker(msg) {
511 if(!_isMessageType(msg, 'bedrock.worker.started')) {
512 return;
513 }
514 // notify worker to initialize and provide the cwd and script to run
515 worker.removeListener('message', initWorker);
516 worker.send({
517 type: 'bedrock.worker.init',
518 cwd: process.cwd(),
519 script
520 });
521 }
522
523 // listen to exit requests from workers
524 worker.on('message', function(msg) {
525 if(_isMessageType(msg, 'bedrock.core') && msg.message === 'exit') {
526 process.exit(msg.status);
527 }
528 });
529
530 // if app process user hasn't been switched yet, wait for a message
531 // from a worker indicating to do so
532 if(!state.switchedUser) {
533 worker.on('message', switchProcessUserListener);
534 }
535
536 function switchProcessUserListener(msg) {
537 if(!_isMessageType(msg, 'bedrock.switchProcessUser')) {
538 return;
539 }
540 worker.removeListener('message', switchProcessUserListener);
541 if(!state.switchedUser) {
542 state.switchedUser = true;
543 // switch group
544 if(config.core.running.groupId && process.setgid) {
545 process.setgid(config.core.running.groupId);
546 }
547 // switch user
548 if(config.core.running.userId && process.setuid) {
549 process.setuid(config.core.running.userId);
550 }
551 }
552 }
553
554 // listen to schedule run once functions
555 worker.on('message', function(msg) {
556 if(!_isMessageType(msg, 'bedrock.runOnce')) {
557 return;
558 }
559
560 const {type} = msg;
561
562 if(msg.done) {
563 state.runOnce[msg.id].done = true;
564 state.runOnce[msg.id].error = msg.error || null;
565 // notify workers to call callback
566 const notify = state.runOnce[msg.id].notify;
567 while(notify.length > 0) {
568 const id = notify.shift();
569 if(id in cluster.workers) {
570 cluster.workers[id].send({
571 type,
572 id: msg.id,
573 done: true,
574 error: msg.error
575 });
576 }
577 }
578 return;
579 }
580
581 if(msg.id in state.runOnce) {
582 if(state.runOnce[msg.id].done) {
583 // already ran, notify worker immediately
584 worker.send({
585 type,
586 id: msg.id,
587 done: true,
588 error: state.runOnce[msg.id].error
589 });
590 } else {
591 // still running, add worker ID to notify queue for later notification
592 state.runOnce[msg.id].notify.push(worker.id);
593 }
594 return;
595 }
596
597 // run in this worker
598 state.runOnce[msg.id] = {
599 worker: worker.id,
600 notify: [],
601 options: msg.options,
602 done: false,
603 error: null
604 };
605 worker.send({type, id: msg.id, done: false});
606 });
607}
608
609function _isMessageType(msg, type) {
610 return (typeof msg === 'object' && msg.type === type);
611}