UNPKG

19.4 kBJavaScriptView Raw
1/*!
2 * Copyright (c) 2012-2019 Digital Bazaar, Inc. All rights reserved.
3 */
4'use strict';
5
6const async = require('async');
7const brUtil = require('./util');
8const cc = brUtil.config.main.computer();
9const cluster = require('cluster');
10const config = require('./config');
11const cycle = require('cycle');
12const errio = require('errio');
13const events = require('./events');
14const loggers = require('./loggers');
15const path = require('path');
16const pkginfo = require('pkginfo');
17const program = require('commander');
18const {promisify} = require('util');
19const {BedrockError} = brUtil;
20
21// core API
22const api = {};
23api.config = config;
24api.events = events;
25api.loggers = loggers;
26api.util = brUtil;
27module.exports = api;
28
29// read package.json fields
30pkginfo(module, 'version');
31
32// register error class
33errio.register(BedrockError);
34
35// config paths
36// configured here instead of config.js due to util dependency issues
37// FIXME: v2.0.0: remove when removing warnings below.
38const _warningShown = {
39 cache: false,
40 log: false
41};
42cc({
43 'paths.cache': () => {
44 // FIXME: v2.0.0: remove warning and default and throw exception .
45 //throw new BedrockError(
46 // 'bedrock.config.paths.cache not set.',
47 // 'ConfigError');
48 const cachePath = path.join(__dirname, '..', '.cache');
49 if(!_warningShown.cache) {
50 loggers.get('app').error(
51 `"bedrock.config.paths.cache" not set, using default: "${cachePath}"`);
52 _warningShown.cache = true;
53 }
54 return cachePath;
55 },
56 'paths.log': () => {
57 // FIXME: v2.0.0: remove warning and default and throw exception .
58 //throw new BedrockError(
59 // 'bedrock.config.paths.log not set.',
60 // 'ConfigError');
61 const logPath = path.join('/tmp/bedrock-dev');
62 if(!_warningShown.log) {
63 // Using console since this config value used during logger setup.
64 console.warn('WARNING: ' +
65 `"bedrock.config.paths.log" not set, using default: "${logPath}"`);
66 _warningShown.log = true;
67 }
68 return logPath;
69 }
70});
71
72// expose bedrock program
73api.program = program.version(api.version);
74
75/**
76 * Starts the bedrock application.
77 *
78 * @param options the options to use:
79 * [script] the script to execute when forking bedrock workers, by
80 * default this will be process.argv[1].
81 * @param done(err) called by bedrock workers once the bedrock application has
82 * started or once an error has occured.
83 */
84api.start = function(options, done) {
85 if(typeof options === 'function') {
86 done = options;
87 options = null;
88 }
89 options = options || {};
90
91 const startTime = Date.now();
92
93 function collect(val, memo) {
94 memo.push(val);
95 return memo;
96 }
97
98 // add built-in CLI options
99 program
100 .option('--config <config>',
101 'Load a config file. (repeatable)', collect, [])
102 .option('--log-level <level>',
103 'Console log level: ' +
104 'silly, verbose, debug, info, warning, error, critical.')
105 .option('--log-timestamps <timestamps>',
106 'Override console log timestamps config. (boolean)', brUtil.boolify)
107 .option('--log-colorize <colorize>',
108 'Override console log colorization config. (boolean)', brUtil.boolify)
109 .option('--log-exclude <modules>',
110 'Do not log events from the specified comma separated modules.')
111 .option('--log-only <modules>',
112 'Only log events from the specified comma separated modules.')
113 .option('--log-transports <spec>',
114 'Transport spec. Use category=[-|+]transport[;...][, ...] ' +
115 'eg, access=+console;-access,app=-console')
116 .option('--silent', 'Show no console output.')
117 .option('--workers <num>',
118 'The number of workers to use (0: # of cpus).', parseInt);
119
120 async.auto({
121 beforeInit: function(callback) {
122 events.emit('bedrock-cli.init', callback);
123 },
124 cliParse: ['beforeInit', function(results, callback) {
125 _parseCommandLine();
126 events.emit('bedrock-cli.parsed', callback);
127 }],
128 init: ['cliParse', function(results, callback) {
129 _loadConfigs();
130 _configureLoggers();
131 _configureWorkers();
132 _configureProcess();
133 events.emit('bedrock-loggers.init', callback);
134 }],
135 initLoggers: ['init', function(results, callback) {
136 loggers.init(function(err) {
137 if(err) {
138 // can't log, quit
139 console.error('Failed to initialize logging system:', err);
140 process.exit(1);
141 }
142 callback();
143 });
144 }],
145 run: ['initLoggers', function(results, callback) {
146 if(cluster.isMaster) {
147 _runMaster(options);
148 // don't call callback in master process
149 return;
150 }
151 _runWorker(startTime, function(err) {
152 if(err) {
153 return events.emit('bedrock.error', err, function() {
154 callback(err);
155 });
156 }
157 callback();
158 });
159 }]
160 }, function(err) {
161 if(done) {
162 return done(err);
163 }
164 if(err) {
165 throw err;
166 }
167 });
168};
169
170/**
171 * Called from workers to set the worker and master process user if it hasn't
172 * already been set.
173 */
174let _switchedProcessUser = false;
175api.setProcessUser = function() {
176 if(_switchedProcessUser) {
177 return;
178 }
179 _switchedProcessUser = true;
180 // switch group
181 if(config.core.running.groupId && process.setgid) {
182 process.setgid(config.core.running.groupId);
183 }
184 // switch user
185 if(config.core.running.userId && process.setuid) {
186 process.setuid(config.core.running.userId);
187 }
188 // send message to master
189 process.send({type: 'bedrock.switchProcessUser'});
190};
191
192/**
193 * Called from a worker to execute the given function in only one worker.
194 *
195 * @param id a unique identifier for the function to execute.
196 * @param fn the function to execute; if it takes a parameter, it will be
197 * assumed to be a callback and `fn` will be executed asynchronously.
198 * @param [options] the options to use:
199 * [allowOnRestart] true to allow this function to execute again,
200 * but only once, on worker restart; this option is useful for
201 * behavior that persists but should only run in a single worker.
202 * @param callback(err, ran) called once the operation has completed in
203 * one worker.
204 */
205api.runOnce = function(id, fn, options, callback) {
206 if(typeof options === 'function') {
207 callback = options;
208 options = {};
209 }
210 options = options || {};
211 if(fn.length === 1) {
212 fn = promisify(fn);
213 }
214 _runOnce({type: 'bedrock.runOnce', id, fn, options})
215 .then(ran => callback(null, ran), callback);
216};
217
218/**
219 * Called from a worker to execute the given function in only one worker.
220 *
221 * @param {string} id - a unique identifier for the function to execute.
222 * @param {Function} fn - The async function to execute.
223 * @param {Object} [options={}] the options to use:
224 * [allowOnRestart] true to allow this function to execute again,
225 * but only once, on worker restart; this option is useful for
226 * behavior that persists but should only run in a single worker.
227 *
228 * @returns {Promise} Resolves when the operation completes.
229 */
230api.runOnceAsync = async (id, fn, options = {}) =>
231 _runOnce({type: 'bedrock.runOnceAsync', id, fn, options});
232
233/**
234 * Called from a worker to exit gracefully. Typically used by subcommands.
235 */
236api.exit = function() {
237 cluster.worker.kill();
238};
239
240async function _runOnce({type, id, fn, options}) {
241 // notify master to schedule work (if not already scheduled/run)
242 process.send({type, id, options});
243
244 // wait for scheduling result
245 let msg = await _waitForOneMessage({type, id});
246
247 // work completed in another worker, finish
248 if(msg.done) {
249 if(msg.error) {
250 throw errio.fromObject(msg.error, {stack: true});
251 }
252 return;
253 }
254
255 // run in this worker
256 msg = {type, id, done: true};
257 let error;
258 try {
259 await fn();
260 } catch(e) {
261 error = e;
262 msg.error = cycle.decycle(errio.toObject(e, {stack: true}));
263 }
264
265 // notify other workers that work is complete
266 process.send(msg);
267
268 if(error) {
269 throw error;
270 }
271}
272
273async function _waitForOneMessage({type, id}) {
274 // get coordinated message from master
275 return new Promise(resolve => {
276 // listen to run function once
277 process.on('message', _listenOnce);
278 function _listenOnce(msg) {
279 // ignore other messages
280 if(!(_isMessageType(msg, type) && msg.id === id)) {
281 return;
282 }
283 process.removeListener('message', _listenOnce);
284 resolve(msg);
285 }
286 });
287}
288
289function _parseCommandLine() {
290 program.parse(process.argv);
291 if(config.cli.command === null) {
292 // set default command
293 config.cli.command = new program.Command('bedrock');
294 }
295}
296
297function _loadConfigs() {
298 program.config.forEach(function(cfg) {
299 require(path.resolve(process.cwd(), cfg));
300 });
301}
302
303function _configureLoggers() {
304 // set console log flags
305 if('logLevel' in program) {
306 config.loggers.console.level = program.logLevel;
307 }
308 if('logColorize' in program) {
309 config.loggers.console.colorize = program.logColorize;
310 }
311 if('logTimestamps' in program) {
312 config.loggers.console.timestamp = program.logTimestamps;
313 }
314 if('logExclude' in program) {
315 config.loggers.console.bedrock.excludeModules =
316 program.logExclude.split(',');
317 }
318 if('logOnly' in program) {
319 config.loggers.console.bedrock.onlyModules = program.logOnly.split(',');
320 }
321 // adjust transports
322 if('logTransports' in program) {
323 const t = program.logTransports;
324 const cats = t.split(',');
325 cats.forEach(function(cat) {
326 const catName = cat.split('=')[0];
327 let catTransports;
328 if(catName in config.loggers.categories) {
329 catTransports = config.loggers.categories[catName];
330 } else {
331 catTransports = config.loggers.categories[catName] = [];
332 }
333 const transports = cat.split('=')[1].split(';');
334 transports.forEach(function(transport) {
335 if(transport.indexOf('-') === 0) {
336 const tName = transport.slice(1);
337 const tIndex = catTransports.indexOf(tName);
338 if(tIndex !== -1) {
339 catTransports.splice(tIndex, 1);
340 }
341 } else if(transport.indexOf('+') === 0) {
342 const tName = transport.slice(1);
343 const tIndex = catTransports.indexOf(tName);
344 if(tIndex === -1) {
345 catTransports.push(tName);
346 }
347 } else {
348 const tName = transport;
349 const tIndex = catTransports.indexOf(tName);
350 if(tIndex === -1) {
351 catTransports.push(tName);
352 }
353 }
354 });
355 });
356 }
357 if(program.silent || program.logLevel === 'none') {
358 config.loggers.console.silent = true;
359 }
360}
361
362function _configureWorkers() {
363 if('workers' in program) {
364 config.core.workers = program.workers;
365 }
366 if(config.core.workers <= 0) {
367 config.core.workers = require('os').cpus().length;
368 }
369}
370
371function _configureProcess() {
372 // set no limit on event listeners
373 process.setMaxListeners(0);
374
375 // exit on terminate
376 process.on('SIGTERM', function() {
377 process.exit();
378 });
379
380 if(cluster.isMaster) {
381 cluster.setupMaster({
382 exec: path.join(__dirname, 'worker.js')
383 });
384
385 // set group before initializing loggers
386 if(config.core.starting.groupId && process.setgid) {
387 try {
388 process.setgid(config.core.starting.groupId);
389 } catch(ex) {
390 console.warn('Failed to set master starting gid: ' + ex);
391 }
392 }
393 // set user before initializing loggers
394 if(config.core.starting.userId && process.setuid) {
395 try {
396 process.setuid(config.core.starting.userId);
397 } catch(ex) {
398 console.warn('Failed to set master starting uid: ' + ex);
399 }
400 }
401 }
402}
403
404function _setupUncaughtExceptionHandler(mode, logger) {
405 // log uncaught exception and exit, except in test mode
406 if(config.cli.command.name() !== 'test') {
407 process.on('uncaughtException', function(err) {
408 process.removeAllListeners('uncaughtException');
409 logger.critical(mode + ': uncaught error:', err);
410 process.exit(1);
411 });
412 }
413}
414
415function _runMaster(options) {
416 // get starting script
417 const script = options.script || process.argv[1];
418
419 const logger = loggers.get('app');
420 logger.info('starting bedrock...');
421
422 // setup cluster if running with istanbul coverage
423 if(process.env.running_under_istanbul) {
424 // TODO: does this need adjusting after fixing the worker `cwd` issue?
425 // re-call cover with no reporting and using pid named output
426 cluster.setupMaster({
427 exec: './node_modules/.bin/istanbul',
428 args: [
429 'cover', '--report', 'none', '--print', 'none', '--include-pid',
430 process.argv[1], '--'].concat(process.argv.slice(2))
431 });
432 }
433
434 // set 'ps' title
435 const args = process.argv.slice(2).join(' ');
436 process.title = config.core.master.title + (args ? (' ' + args) : '');
437
438 _setupUncaughtExceptionHandler('master', logger);
439
440 logger.info('running bedrock master process "' +
441 config.core.master.title + '"', {pid: process.pid});
442
443 // keep track of master state
444 const masterState = {
445 switchedUser: false,
446 runOnce: {}
447 };
448
449 // notify workers to exit if master exits
450 process.on('exit', function() {
451 for(const id in cluster.workers) {
452 cluster.workers[id].send({type: 'bedrock.core', message: 'exit'});
453 }
454 });
455
456 // handle worker exit
457 cluster.on('exit', function(worker, code) {
458 // if the worker called kill() or disconnect(), it was intentional, so exit
459 // the process
460 if(worker.exitedAfterDisconnect) {
461 logger.info(
462 'worker ' + worker.process.pid + ' exited on purpose with code ' +
463 code + '; exiting master process.');
464 process.exit(code);
465 }
466
467 // accidental worker exit (crash)
468 logger.critical(
469 'worker ' + worker.process.pid + ' exited with code ' + code);
470
471 // if configured, fork a replacement worker
472 if(config.core.worker.restart) {
473 // clear any runOnce records w/allowOnRestart option set
474 for(const id in masterState.runOnce) {
475 if(masterState.runOnce[id].worker === worker.id &&
476 masterState.runOnce[id].options.allowOnRestart) {
477 delete masterState.runOnce[id];
478 }
479 }
480 _startWorker(masterState, script);
481 } else {
482 process.exit(1);
483 }
484 });
485
486 // fork each app process
487 const workers = config.core.workers;
488 for(let i = 0; i < workers; ++i) {
489 _startWorker(masterState, script);
490 }
491}
492
493function _runWorker(startTime, done) {
494 const logger = loggers.get('app');
495
496 // set 'ps' title
497 const args = process.argv.slice(2).join(' ');
498 process.title = config.core.worker.title + (args ? (' ' + args) : '');
499
500 _setupUncaughtExceptionHandler('worker', logger);
501
502 logger.info('running bedrock worker process "' +
503 config.core.worker.title + '"');
504
505 // listen for master process exit
506 let bedrockStarted = false;
507 process.on('message', function(msg) {
508 if(!_isMessageType(msg, 'bedrock.core') || msg.message !== 'exit') {
509 return;
510 }
511
512 if(!bedrockStarted) {
513 return events.emit('bedrock-cli.exit', function() {
514 process.exit();
515 });
516 }
517
518 events.emit('bedrock.stop', function(err) {
519 if(err) {
520 throw err;
521 }
522 events.emit('bedrock-cli.exit', function() {
523 process.exit();
524 });
525 });
526 });
527
528 async.auto({
529 cliReady: function(callback) {
530 events.emit('bedrock-cli.ready', callback);
531 },
532 configure: ['cliReady', function(results, callback) {
533 if(results.cliReady === false) {
534 // skip default behavior (do not emit bedrock core events)
535 return done();
536 }
537 bedrockStarted = true;
538 events.emit('bedrock.configure', callback);
539 }],
540 adminInit: ['configure', function(results, callback) {
541 events.emit('bedrock.admin.init', callback);
542 }],
543 init: ['adminInit', function(results, callback) {
544 // set process user
545 api.setProcessUser();
546 events.emit('bedrock.init', callback);
547 }],
548 start: ['init', function(results, callback) {
549 events.emit('bedrock.start', callback);
550 }],
551 ready: ['start', function(results, callback) {
552 const dtTime = Date.now() - startTime;
553 logger.info('startup time: ' + dtTime + 'ms');
554 events.emit('bedrock.ready', callback);
555 }],
556 started: ['ready', function(results, callback) {
557 events.emit('bedrock.started', callback);
558 }]
559 }, function(err) {
560 done(err);
561 });
562}
563
564function _startWorker(state, script) {
565 const worker = cluster.fork();
566 loggers.attach(worker);
567
568 // listen to start requests from workers
569 worker.on('message', initWorker);
570
571 // TODO: simplify with cluster.on('online')?
572 function initWorker(msg) {
573 if(!_isMessageType(msg, 'bedrock.worker.started')) {
574 return;
575 }
576 // notify worker to initialize and provide the cwd and script to run
577 worker.removeListener('message', initWorker);
578 worker.send({
579 type: 'bedrock.worker.init',
580 cwd: process.cwd(),
581 script
582 });
583 }
584
585 // listen to exit requests from workers
586 worker.on('message', function(msg) {
587 if(_isMessageType(msg, 'bedrock.core') && msg.message === 'exit') {
588 process.exit(msg.status);
589 }
590 });
591
592 // if app process user hasn't been switched yet, wait for a message
593 // from a worker indicating to do so
594 if(!state.switchedUser) {
595 worker.on('message', switchProcessUserListener);
596 }
597
598 function switchProcessUserListener(msg) {
599 if(!_isMessageType(msg, 'bedrock.switchProcessUser')) {
600 return;
601 }
602 worker.removeListener('message', switchProcessUserListener);
603 if(!state.switchedUser) {
604 state.switchedUser = true;
605 // switch group
606 if(config.core.running.groupId && process.setgid) {
607 process.setgid(config.core.running.groupId);
608 }
609 // switch user
610 if(config.core.running.userId && process.setuid) {
611 process.setuid(config.core.running.userId);
612 }
613 }
614 }
615
616 // listen to schedule run once functions
617 worker.on('message', function(msg) {
618 if(!(_isMessageType(msg, 'bedrock.runOnce') ||
619 _isMessageType(msg, 'bedrock.runOnceAsync'))) {
620 return;
621 }
622
623 const {type} = msg;
624
625 if(msg.done) {
626 state.runOnce[msg.id].done = true;
627 state.runOnce[msg.id].error = msg.error || null;
628 // notify workers to call callback
629 const notify = state.runOnce[msg.id].notify;
630 while(notify.length > 0) {
631 const id = notify.shift();
632 if(id in cluster.workers) {
633 cluster.workers[id].send({
634 type,
635 id: msg.id,
636 done: true,
637 error: msg.error
638 });
639 }
640 }
641 return;
642 }
643
644 if(msg.id in state.runOnce) {
645 if(state.runOnce[msg.id].done) {
646 // already ran, notify worker immediately
647 worker.send({
648 type,
649 id: msg.id,
650 done: true,
651 error: state.runOnce[msg.id].error
652 });
653 } else {
654 // still running, add worker ID to notify queue for later notification
655 state.runOnce[msg.id].notify.push(worker.id);
656 }
657 return;
658 }
659
660 // run in this worker
661 state.runOnce[msg.id] = {
662 worker: worker.id,
663 notify: [],
664 options: msg.options,
665 done: false,
666 error: null
667 };
668 worker.send({type, id: msg.id, done: false});
669 });
670}
671
672function _isMessageType(msg, type) {
673 return (typeof msg === 'object' && msg.type === type);
674}