UNPKG

3.25 kBJavaScriptView Raw
1const path = require('path')
2const cpuCount = require('os').cpus().length
3const cluster = require('cluster')
4const mkdirp = require('mkdirp')
5
6const Worker = require('./../class/Worker')
7const logs = require('./log')
8const killWorkers = require('./killWorkers')
9
10const NOEND = {
11 end: false
12}
13
14module.exports = function startWorkers(config, dir, pack, args = [], env = {}) {
15 const workers = []
16
17 return new Promise((resolve, reject) => {
18 if (pack.name === 'z1') {
19 throw new Error('the name "z1" is invalid')
20 }
21
22 const ports = pack.ports
23 const out = logs.get(pack.name)
24
25 // output path
26 let output = null
27 if (typeof pack.output === 'string') {
28 if (path.isAbsolute(pack.output)) {
29 output = pack.output
30 } else {
31 output = path.join(dir, pack.output)
32 }
33 } else {
34 output = path.join(process.env.HOME, '.z1', pack.name)
35 }
36
37 const workerCount = Math.abs(+pack.workers) || cpuCount
38
39 const e = []
40 const q = []
41 const n = []
42
43 // create output dir
44 n.push(new Promise((resolve, reject) => {
45 mkdirp(output, err => {
46 if (err && err.code !== 'EEXIST') {
47 reject(err)
48 return
49 }
50
51 logs.setup(pack.name, output)
52
53 resolve()
54 })
55 }))
56
57 // setup master
58 cluster.setupMaster({
59 stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
60 args
61 })
62
63 const ENV = Object.assign({}, env, {
64 PWD: dir,
65 APPNAME: pack.name,
66 PORT: ports[0],
67 PORTS: ports.join()
68 })
69
70 for (let i = 0; i < workerCount; i++) {
71 const worker = new Worker(dir, pack.main, pack.name, ports, ENV)
72 const w = worker.w
73 w.process.stdout.pipe(out.log, NOEND)
74 w.process.stderr.pipe(out.err, NOEND)
75 w.on('error', handle)
76
77 workers.push(worker)
78
79 e.push(worker.once('exit').then(code => {
80 throw new Error(`worker of app "${pack.name}" not started (exit code: ${code})`)
81 }))
82 q.push(worker.once('available').then(() => {
83 worker.once('exit', code => {
84 log('worker exit', code, worker.state)
85 if (code && worker.state !== Worker.KILLED) {
86 // revive worker
87 log(`worker ${worker.id} of "${worker.name}" crashed. (code: ${code})`)
88 log(`starting 1 new worker for "${worker.name}"`)
89
90 const app = config.apps.find(app => app.name === worker.name)
91
92 log('found app', app.name)
93
94 if (!app) {
95 return
96 }
97
98 if (!app.reviveCount) {
99 app.reviveCount = 0
100 }
101
102 app.reviveCount += 1
103
104 const pkg = Object.assign({}, pack, {
105 workers: 1
106 })
107
108 startWorkers(config, dir, pkg, args, env).catch(handle)
109 }
110 })
111 }))
112 log('listening for exit')
113 }
114
115 // when all workers are online before an error
116 e.push(Promise.all(q))
117 n.push(Promise.race(e))
118
119 Promise.all(n).then(() => {
120 resolve({
121 app: pack.name,
122 dir,
123 started: workerCount,
124 ports
125 })
126 }).catch(err => {
127 // if one worker crashes => kill all workers
128 killWorkers(workers, 0).then(() => reject(err))
129 })
130 })
131}