UNPKG

18.8 kBJavaScriptView Raw
1/*!
2 * Copyright (c) 2012-2019 Digital Bazaar, Inc. All rights reserved.
3 */
4'use strict';
5
6const brUtil = require('./util');
7const cc = brUtil.config.main.computer();
8const cluster = require('cluster');
9const config = require('./config');
10const cycle = require('cycle');
11const errio = require('errio');
12const events = require('./events');
13const lodashGet = require('lodash.get');
14const loggers = require('./loggers');
15const path = require('path');
16const pkginfo = require('pkginfo');
17const program = require('commander');
18const {deprecate} = require('util');
19const {BedrockError} = brUtil;
20
21// core API
22const api = {};
23api.config = config;
24api.events = events;
25api.loggers = loggers;
26api.util = brUtil;
27module.exports = api;
28
29// read package.json fields
30pkginfo(module, 'version');
31
32// register error class
33errio.register(BedrockError);
34
35// config paths
36// configured here instead of config.js due to util dependency issues
37// FIXME: v2.0.0: remove when removing warnings below.
38const _warningShown = {
39 cache: false,
40 log: false
41};
42cc({
43 'paths.cache': () => {
44 // FIXME: v2.0.0: remove warning and default and throw exception .
45 //throw new BedrockError(
46 // 'bedrock.config.paths.cache not set.',
47 // 'ConfigError');
48 const cachePath = path.join(__dirname, '..', '.cache');
49 if(!_warningShown.cache) {
50 loggers.get('app').error(
51 `"bedrock.config.paths.cache" not set, using default: "${cachePath}"`);
52 _warningShown.cache = true;
53 }
54 return cachePath;
55 },
56 'paths.log': () => {
57 // FIXME: v2.0.0: remove warning and default and throw exception .
58 //throw new BedrockError(
59 // 'bedrock.config.paths.log not set.',
60 // 'ConfigError');
61 const logPath = path.join('/tmp/bedrock-dev');
62 if(!_warningShown.log) {
63 // Using console since this config value used during logger setup.
64 console.warn('WARNING: ' +
65 `"bedrock.config.paths.log" not set, using default: "${logPath}"`);
66 _warningShown.log = true;
67 }
68 return logPath;
69 }
70});
71
72// expose bedrock program
73api.program = program.version(api.version);
74
75/**
76 * Starts the bedrock application.
77 *
78 * @param options the options to use:
79 * [script] the script to execute when forking bedrock workers, by
80 * default this will be process.argv[1].
81 *
82 * @returns {Promise} Resolves when the application has started or an error has
83 * occured.
84 */
85api.start = async options => {
86 options = options || {};
87
88 const startTime = Date.now();
89
90 function collect(val, memo) {
91 memo.push(val);
92 return memo;
93 }
94
95 // add built-in CLI options
96 program
97 .option('--config <config>',
98 'Load a config file. (repeatable)', collect, [])
99 .option('--log-level <level>',
100 'Console log level: ' +
101 'silly, verbose, debug, info, warning, error, critical.')
102 .option('--log-timestamps <timestamps>',
103 'Override console log timestamps config. (boolean)', brUtil.boolify)
104 .option('--log-colorize <colorize>',
105 'Override console log colorization config. (boolean)', brUtil.boolify)
106 .option('--log-exclude <modules>',
107 'Do not log events from the specified comma separated modules.')
108 .option('--log-only <modules>',
109 'Only log events from the specified comma separated modules.')
110 .option('--log-transports <spec>',
111 'Transport spec. Use category=[-|+]transport[;...][, ...] ' +
112 'eg, access=+console;-access,app=-console')
113 .option('--silent', 'Show no console output.')
114 .option('--workers <num>',
115 'The number of workers to use (0: # of cpus).', parseInt);
116
117 await events.emit('bedrock-cli.init');
118 _parseCommandLine();
119 await events.emit('bedrock-cli.parsed');
120 _loadConfigs();
121 _configureLoggers();
122 _configureWorkers();
123 _configureProcess();
124 await events.emit('bedrock-loggers.init');
125
126 try {
127 await loggers.init();
128 } catch(err) {
129 // can't log, quit
130 console.error('Failed to initialize logging system:', err);
131 process.exit(1);
132 }
133 // run
134 if(cluster.isMaster) {
135 _runMaster(startTime, options);
136 // don't call callback in master process
137 return;
138 }
139 try {
140 await _runWorker(startTime);
141 } catch(err) {
142 await events.emit('bedrock.error', err);
143 throw err;
144 }
145};
146
147/**
148 * Called from workers to set the worker and master process user if it hasn't
149 * already been set.
150 */
151let _switchedProcessUser = false;
152api.setProcessUser = function() {
153 if(_switchedProcessUser) {
154 return;
155 }
156 _switchedProcessUser = true;
157 // switch group
158 if(config.core.running.groupId && process.setgid) {
159 process.setgid(config.core.running.groupId);
160 }
161 // switch user
162 if(config.core.running.userId && process.setuid) {
163 process.setuid(config.core.running.userId);
164 }
165 // send message to master
166 process.send({type: 'bedrock.switchProcessUser'});
167};
168
169/**
170 * Called from a worker to execute the given function in only one worker.
171 *
172 * @param {string} id - a unique identifier for the function to execute.
173 * @param {Function} fn - The async function to execute.
174 * @param {Object} [options={}] the options to use:
175 * [allowOnRestart] true to allow this function to execute again,
176 * but only once, on worker restart; this option is useful for
177 * behavior that persists but should only run in a single worker.
178 *
179 * @returns {Promise} Resolves when the operation completes.
180 */
181api.runOnce = async (id, fn, options = {}) => {
182 const type = 'bedrock.runOnce';
183
184 // notify master to schedule work (if not already scheduled/run)
185 process.send({type, id, options});
186
187 // wait for scheduling result
188 let msg = await _waitForOneMessage({type, id});
189
190 // work completed in another worker, finish
191 if(msg.done) {
192 if(msg.error) {
193 throw errio.fromObject(msg.error, {stack: true});
194 }
195 return;
196 }
197
198 // run in this worker
199 msg = {type, id, done: true};
200 let error;
201 try {
202 await fn();
203 } catch(e) {
204 error = e;
205 msg.error = cycle.decycle(errio.toObject(e, {stack: true}));
206 }
207
208 // notify other workers that work is complete
209 process.send(msg);
210
211 if(error) {
212 throw error;
213 }
214};
215
216/**
217 * **DEPRECATED**: runOnceAsync() is deprecated. Use runOnce() instead.
218 */
219api.runOnceAsync = deprecate(
220 api.runOnce, 'runOnceAsync() is deprecated. Use runOnce() instead.');
221
222/**
223 * Called from a worker to exit gracefully. Typically used by subcommands.
224 */
225api.exit = function() {
226 cluster.worker.kill();
227};
228
229async function _waitForOneMessage({type, id}) {
230 // get coordinated message from master
231 return new Promise(resolve => {
232 // listen to run function once
233 process.on('message', _listenOnce);
234 function _listenOnce(msg) {
235 // ignore other messages
236 if(!(_isMessageType(msg, type) && msg.id === id)) {
237 return;
238 }
239 process.removeListener('message', _listenOnce);
240 resolve(msg);
241 }
242 });
243}
244
245function _parseCommandLine() {
246 program.parse(process.argv);
247 if(config.cli.command === null) {
248 // set default command
249 config.cli.command = new program.Command('bedrock');
250 }
251}
252
253function _loadConfigs() {
254 program.config.forEach(function(cfg) {
255 require(path.resolve(process.cwd(), cfg));
256 });
257}
258
259function _configureLoggers() {
260 // set console log flags
261 if('logLevel' in program) {
262 config.loggers.console.level = program.logLevel;
263 }
264 if('logColorize' in program) {
265 config.loggers.console.colorize = program.logColorize;
266 }
267 if('logTimestamps' in program) {
268 config.loggers.console.timestamp = program.logTimestamps;
269 }
270 if('logExclude' in program) {
271 config.loggers.console.bedrock.excludeModules =
272 program.logExclude.split(',');
273 }
274 if('logOnly' in program) {
275 config.loggers.console.bedrock.onlyModules = program.logOnly.split(',');
276 }
277 // adjust transports
278 if('logTransports' in program) {
279 const t = program.logTransports;
280 const cats = t.split(',');
281 cats.forEach(function(cat) {
282 const catName = cat.split('=')[0];
283 let catTransports;
284 if(catName in config.loggers.categories) {
285 catTransports = config.loggers.categories[catName];
286 } else {
287 catTransports = config.loggers.categories[catName] = [];
288 }
289 const transports = cat.split('=')[1].split(';');
290 transports.forEach(function(transport) {
291 if(transport.indexOf('-') === 0) {
292 const tName = transport.slice(1);
293 const tIndex = catTransports.indexOf(tName);
294 if(tIndex !== -1) {
295 catTransports.splice(tIndex, 1);
296 }
297 } else if(transport.indexOf('+') === 0) {
298 const tName = transport.slice(1);
299 const tIndex = catTransports.indexOf(tName);
300 if(tIndex === -1) {
301 catTransports.push(tName);
302 }
303 } else {
304 const tName = transport;
305 const tIndex = catTransports.indexOf(tName);
306 if(tIndex === -1) {
307 catTransports.push(tName);
308 }
309 }
310 });
311 });
312 }
313 if(program.silent || program.logLevel === 'none') {
314 config.loggers.console.silent = true;
315 }
316}
317
318function _configureWorkers() {
319 if('workers' in program) {
320 config.core.workers = program.workers;
321 }
322 if(config.core.workers <= 0) {
323 config.core.workers = require('os').cpus().length;
324 }
325}
326
327function _configureProcess() {
328 // set no limit on event listeners
329 process.setMaxListeners(0);
330
331 // exit on terminate
332 process.on('SIGTERM', function() {
333 process.exit();
334 });
335
336 if(cluster.isMaster) {
337 cluster.setupMaster({
338 exec: path.join(__dirname, 'worker.js')
339 });
340
341 // set group before initializing loggers
342 if(config.core.starting.groupId && process.setgid) {
343 try {
344 process.setgid(config.core.starting.groupId);
345 } catch(ex) {
346 console.warn('Failed to set master starting gid: ' + ex);
347 }
348 }
349 // set user before initializing loggers
350 if(config.core.starting.userId && process.setuid) {
351 try {
352 process.setuid(config.core.starting.userId);
353 } catch(ex) {
354 console.warn('Failed to set master starting uid: ' + ex);
355 }
356 }
357 }
358}
359
360function _setupUncaughtExceptionHandler(logger, logPrefix) {
361 // log uncaught exception and exit, except in test mode
362 if(config.cli.command.name() !== 'test') {
363 process.on('uncaughtException', function(err) {
364 process.removeAllListeners('uncaughtException');
365 logger.critical(`${logPrefix} uncaught error`, {error: err});
366 process.exit(1);
367 });
368 }
369}
370
371function _runMaster(startTime, options) {
372 // FIXME: use child logger
373 const logger = loggers.get('app');
374 const logPrefix = '[bedrock/master]';
375
376 // setup cluster if running with istanbul coverage
377 if(process.env.running_under_istanbul) {
378 // TODO: does this need adjusting after fixing the worker `cwd` issue?
379 // re-call cover with no reporting and using pid named output
380 cluster.setupMaster({
381 exec: './node_modules/.bin/istanbul',
382 args: [
383 'cover', '--report', 'none', '--print', 'none', '--include-pid',
384 process.argv[1], '--'].concat(process.argv.slice(2))
385 });
386 }
387
388 // set 'ps' title
389 const args = process.argv.slice(2).join(' ');
390 process.title = config.core.master.title + (args ? (' ' + args) : '');
391
392 _setupUncaughtExceptionHandler(logger, logPrefix);
393
394 logger.info(
395 `${logPrefix} starting process "${config.core.master.title}"`,
396 {pid: process.pid});
397
398 // get starting script
399 const script = options.script || process.argv[1];
400
401 // keep track of master state
402 const masterState = {
403 switchedUser: false,
404 runOnce: {}
405 };
406
407 // notify workers to exit if master exits
408 process.on('exit', function() {
409 for(const id in cluster.workers) {
410 cluster.workers[id].send({type: 'bedrock.core', message: 'exit'});
411 }
412 });
413
414 // handle worker exit
415 cluster.on('exit', function(worker, code, signal) {
416 // if the worker called kill() or disconnect(), it was intentional, so exit
417 // the process
418 if(worker.exitedAfterDisconnect) {
419 logger.info(
420 `${logPrefix} worker "${worker.process.pid}" exited on purpose ` +
421 `with code "${code}" and signal "${signal}"; exiting master process.`);
422 process.exit(code);
423 }
424
425 // accidental worker exit (crash)
426 logger.critical(
427 `${logPrefix} worker "${worker.process.pid}" exited with code ` +
428 `"${code}" and signal "${signal}".`);
429
430 // if configured, fork a replacement worker
431 if(config.core.worker.restart) {
432 // clear any runOnce records w/allowOnRestart option set
433 for(const id in masterState.runOnce) {
434 if(masterState.runOnce[id].worker === worker.id &&
435 masterState.runOnce[id].options.allowOnRestart) {
436 delete masterState.runOnce[id];
437 }
438 }
439 _startWorker(masterState, script);
440 } else {
441 process.exit(1);
442 }
443 });
444
445 // fork each app process
446 const workers = config.core.workers;
447 for(let i = 0; i < workers; ++i) {
448 _startWorker(masterState, script);
449 }
450 logger.info(`${logPrefix} started`, {timeMs: Date.now() - startTime});
451}
452
453async function _runWorker(startTime) {
454 // FIXME: use child logger
455 const logger = loggers.get('app');
456 const logPrefix = '[bedrock/worker]';
457
458 // set 'ps' title
459 const args = process.argv.slice(2).join(' ');
460 process.title = config.core.worker.title + (args ? (' ' + args) : '');
461
462 _setupUncaughtExceptionHandler(logger, logPrefix);
463
464 logger.info(`${logPrefix} starting process "${config.core.worker.title}"`);
465
466 // listen for master process exit
467 let bedrockStarted = false;
468 process.on('message', function(msg) {
469 if(!_isMessageType(msg, 'bedrock.core') || msg.message !== 'exit') {
470 return;
471 }
472
473 if(!bedrockStarted) {
474 return events.emit('bedrock-cli.exit').finally(() => {
475 process.exit();
476 });
477 }
478
479 return events.emit('bedrock.stop').then(() => {
480 return events.emit('bedrock-cli.exit').finally(() => {
481 process.exit();
482 });
483 });
484 });
485
486 const cliReady = await events.emit('bedrock-cli.ready');
487 // skip default behavior if cancelled (do not emit bedrock core events)
488 // used for CLI commands
489 if(cliReady === false) {
490 return;
491 }
492 bedrockStarted = true;
493
494 // snapshot the values of the fields that must be changed
495 // during the `bedrock.configure` event
496 let configOverrideSnapshot;
497 if(config.ensureConfigOverride.enable) {
498 configOverrideSnapshot = _snapshotOverrideFields({
499 config,
500 fields: config.ensureConfigOverride.fields,
501 });
502 }
503
504 await events.emit('bedrock.configure');
505
506 // ensure that the values captured in the snapshot have been changed
507 if(configOverrideSnapshot) {
508 // throws on failure which will prevent application startup
509 _ensureConfigOverride({config, configOverrideSnapshot});
510 }
511
512 await events.emit('bedrock.admin.init');
513 // set process user
514 api.setProcessUser();
515 await events.emit('bedrock.init');
516 await events.emit('bedrock.start');
517 await events.emit('bedrock.ready');
518 await events.emit('bedrock.started');
519 logger.info(`${logPrefix} started`, {timeMs: Date.now() - startTime});
520}
521
522function _startWorker(state, script) {
523 const worker = cluster.fork();
524 loggers.attach(worker);
525
526 // listen to start requests from workers
527 worker.on('message', initWorker);
528
529 // TODO: simplify with cluster.on('online')?
530 function initWorker(msg) {
531 if(!_isMessageType(msg, 'bedrock.worker.started')) {
532 return;
533 }
534 // notify worker to initialize and provide the cwd and script to run
535 worker.removeListener('message', initWorker);
536 worker.send({
537 type: 'bedrock.worker.init',
538 cwd: process.cwd(),
539 script
540 });
541 }
542
543 // listen to exit requests from workers
544 worker.on('message', function(msg) {
545 if(_isMessageType(msg, 'bedrock.core') && msg.message === 'exit') {
546 process.exit(msg.status);
547 }
548 });
549
550 // if app process user hasn't been switched yet, wait for a message
551 // from a worker indicating to do so
552 if(!state.switchedUser) {
553 worker.on('message', switchProcessUserListener);
554 }
555
556 function switchProcessUserListener(msg) {
557 if(!_isMessageType(msg, 'bedrock.switchProcessUser')) {
558 return;
559 }
560 worker.removeListener('message', switchProcessUserListener);
561 if(!state.switchedUser) {
562 state.switchedUser = true;
563 // switch group
564 if(config.core.running.groupId && process.setgid) {
565 process.setgid(config.core.running.groupId);
566 }
567 // switch user
568 if(config.core.running.userId && process.setuid) {
569 process.setuid(config.core.running.userId);
570 }
571 }
572 }
573
574 // listen to schedule run once functions
575 worker.on('message', function(msg) {
576 if(!_isMessageType(msg, 'bedrock.runOnce')) {
577 return;
578 }
579
580 const {type} = msg;
581
582 if(msg.done) {
583 state.runOnce[msg.id].done = true;
584 state.runOnce[msg.id].error = msg.error || null;
585 // notify workers to call callback
586 const notify = state.runOnce[msg.id].notify;
587 while(notify.length > 0) {
588 const id = notify.shift();
589 if(id in cluster.workers) {
590 cluster.workers[id].send({
591 type,
592 id: msg.id,
593 done: true,
594 error: msg.error
595 });
596 }
597 }
598 return;
599 }
600
601 if(msg.id in state.runOnce) {
602 if(state.runOnce[msg.id].done) {
603 // already ran, notify worker immediately
604 worker.send({
605 type,
606 id: msg.id,
607 done: true,
608 error: state.runOnce[msg.id].error
609 });
610 } else {
611 // still running, add worker ID to notify queue for later notification
612 state.runOnce[msg.id].notify.push(worker.id);
613 }
614 return;
615 }
616
617 // run in this worker
618 state.runOnce[msg.id] = {
619 worker: worker.id,
620 notify: [],
621 options: msg.options,
622 done: false,
623 error: null
624 };
625 worker.send({type, id: msg.id, done: false});
626 });
627}
628
629function _isMessageType(msg, type) {
630 return (typeof msg === 'object' && msg.type === type);
631}
632
633function _ensureConfigOverride({config, configOverrideSnapshot}) {
634 const logger = loggers.get('app').child('bedrock/worker');
635 logger.debug('Verifying configuration overrides.');
636 for(const [key, value] of configOverrideSnapshot) {
637 if(lodashGet(config, key) === value) {
638 const error = new Error(
639 `The config field "${key}" must be changed during the ` +
640 '"bedrock.configuration" event.');
641 logger.error(error);
642 throw error;
643 }
644 }
645 logger.debug('Configuration overrides have been verified.', {
646 fields: [...configOverrideSnapshot.keys()]});
647}
648
649function _snapshotOverrideFields({config, fields}) {
650 const snapshot = new Map();
651 for(const field of fields) {
652 snapshot.set(field, lodashGet(config, field));
653 }
654 return snapshot;
655}