UNPKG

10.4 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3/*
4 ___ usage ___ en_US ___
5 usage: prolific <options> <program>
6
7 options:
8
9 -i, --inherit <number>
10 file handles to inherit
11
12 -s, --scram <number>
13 number of seconds to wait for children to exit
14
15 -p, --processor <path>
16 path to processor
17
18 --help
19 display this message
20
21 ___ $ ___ en_US ___
22
23 log is required:
24 the `--log` address and port is a required argument
25
26 port is not an integer:
27 the `--log` port must be an integer
28
29 ___ . ___
30*/
31
32// Node.js API.
33const assert = require('assert')
34const path = require('path')
35const url = require('url')
36const processes = require('child_process')
37const crypto = require('crypto')
38const fs = require('fs').promises
39const net = require('net')
40const os = require('os')
41
42const noop = require('nop')
43
44const once = require('prospective/once')
45const callback = require('prospective/callback')
46
47const rimraf = require('rimraf')
48
49const Queue = require('avenue')
50
51const fnv = require('hash.fnv')
52
53const Isochronous = require('isochronous')
54
55// Exceptions that you can catch by type.
56const Interrupt = require('interrupt').create('prolific')
57
58// Command line and environment interpretation utilities.
59const inherit = require('prolific.inherit')
60
61// Monitoring of streams that contain logging messages.
62const Collector = require('prolific.collector')
63const Watcher = require('prolific.watcher')
64
65const coalesce = require('extant')
66
67const Tmp = require('./tmp')
68const Killer = require('./killer')
69const Header = require('./header')
70
71const Cubbyhole = require('cubbyhole')
72
73// Selected signals for propagation. These would appear to be the sort of
74// signals that a user would send to the monitored child from the terminal.
75// Either that or the sort of signals that a supervisor program would expect
76// the supervised program to respond to. Either way, the Prolific supervisor
77// doesn't really have a plan to handle them, so it forwards them to the
78// supervised child.
79//
80// Other signals would appear to be the sort of signals that the operating
81// system would send directly to the affected process.
82//
83// At some point we may add command line switches to specify with signals to
84// forward, but for now we'll provide this list as a convention and adjust the
85// convention to user feedback. We could give the option to cancel all
86// forwarding and then the user would be responsible for sending signals
87// directly to the supervised child process by its pid.
88//
89// https://nodejs.org/api/process.html#process_signal_events
90//
91const signals = [ 'SIGINT', 'SIGTERM', 'SIGHUP', 'SIGQUIT', 'SIGABRT', 'SIGUSR2' ]
92
93require('arguable')(module, { $trap: false }, async arguable => {
94 arguable.helpIf(arguable.ultimate.help)
95
96 const processor = arguable.ultimate.processor
97
98 process.env.PROLIFIC_SUPERVISOR_PROCESS_ID = process.pid
99
100 const sidecars = {}
101
102 // TODO What do you really want to name this?
103 const Destructible = require('destructible')
104 const destructible = new Destructible(1500, 'prolific')
105
106 const countdown = destructible.ephemeral('countdown')
107 const children = destructible.ephemeral('children')
108 const supervisor = destructible.durable('supervisor')
109
110 countdown.increment()
111
112 const supervise = {
113 sidecar: async (sidecar, pid) => {
114 const [ exitCode, signal ] = await once(sidecar, 'exit').promise
115 Interrupt.assert(exitCode == 0, 'sidecar.exit', {
116 exitCode: exitCode,
117 signal: signal,
118 argv: arguable.argv
119 })
120 delete sidecars[pid]
121 countdown.decrement()
122 },
123 child: async (child) => {
124 await exit
125 countdown.decrement()
126 }
127 }
128
129 const cubbyhole = new Cubbyhole
130
131 const Printer = require('./printer')
132
133 const tmp = await Tmp(coalesce(process.env.TMPDIR, '/tmp'), async () => {
134 const [ bytes ] = await callback(callback => crypto.randomBytes(16, callback))
135 return bytes.toString('hex')
136 }, process.pid)
137 supervisor.destruct(() => callback(callback => rimraf(tmp, callback)))
138
139 const printer = new Printer(supervisor.durable('printer'), lines => {
140 console.log(lines)
141 }, JSON.stringify, 1000)
142
143 const sockets = new Queue().shifter().paired
144 children.durable('sockets', sockets.shifter.pump(async socket => {
145 if (socket != null) {
146 const header = await Header(socket)
147 printer.say('header', { header })
148 if (header != null) {
149 assert.equal(header.method, 'announce', 'announce missing')
150 killers.start.unwatch(header.pid)
151 await cubbyhole.get(header.pid)
152 cubbyhole.remove(header.pid)
153 printer.say('dispatch', { header, destroyed: socket.destroyed })
154 // **TODO** Uncomment to hang unit test.
155 // throw new Error
156 sidecars[header.pid].send({ module: 'prolific', method: 'socket' }, socket)
157 }
158 }
159 }))
160 children.destruct(() => sockets.shifter.destroy())
161 children.destruct(() => printer.say('destruct.children', {}))
162
163 const server = net.createServer(socket => {
164 sockets.queue.push(socket)
165 })
166 children.destruct(() => server.close())
167 await callback(callback => server.listen(path.resolve(tmp, 'socket'), callback))
168
169 await fs.mkdir(path.resolve(tmp, 'stage'))
170 await fs.mkdir(path.resolve(tmp, 'publish'))
171
172 process.env.PROLIFIC_TMPDIR = tmp
173
174 const toucher = new Isochronous(60000, true, async () => {
175 const now = Date.now()
176 await fs.utimes(tmp, now, now)
177 await fs.utimes(path.resolve(tmp, 'stage'), now, now)
178 await fs.utimes(path.resolve(tmp, 'publish'), now, now)
179 })
180
181 children.durable('toucher', toucher.start())
182 children.destruct(() => toucher.stop())
183
184 const watcher = new Watcher(children.durable('watcher'), buffer => {
185 return fnv(0, buffer, 0, buffer.length)
186 }, path.join(tmp, 'publish'))
187 countdown.destruct(() => watcher.drain())
188
189 const killers = {
190 start: new Killer(100, printer),
191 exit: new Killer(100, printer)
192 }
193 Error.stackTraceLimit = 32
194 killers.start.on('killed', killers.exit.watch.bind(killers.exit))
195 killers.exit.on('killed', watcher.killed.bind(watcher))
196 children.durable([ 'killer', 'start' ], killers.start.run())
197 children.durable([ 'killer', 'exit' ], killers.exit.run())
198 children.destruct(() => killers.start.destroy())
199 children.destruct(() => killers.exit.destroy())
200
201 const collector = new Collector
202
203 // watcher.on('notice', notice => console.log(notice))
204
205 watcher.on('data', data => collector.data(data))
206
207 const argv = arguable.argv.slice()
208 const main = argv.shift()
209
210 collector.on('data', data => {
211 const pid = data.pid
212 switch (data.body.method) {
213 case 'start': {
214 const sidecar = processes.spawn('node', [
215 path.join(__dirname, 'sidecar.bin.js'),
216 '--processor', processor,
217 '--supervisor', process.pid,
218 '--tmp', tmp,
219 '--child', pid,
220 '--main', main
221 ], {
222 stdio: [ 'ignore', 'inherit', 'inherit', 'ipc' ]
223 })
224 function message (message) {
225 printer.say(message.method, message)
226 switch (message.method) {
227 case 'receiving':
228 cubbyhole.set(message.child, true)
229 break
230 case 'closed':
231 killers.exit.watch(message.child)
232 break
233 }
234 }
235 sidecar.on('message', message)
236 sidecar.on('exit', () => sidecar.removeListener('message', message))
237 countdown.increment()
238 sidecars[pid] = sidecar
239 children.ephemeral([ 'sidecar', sidecar.pid ], supervise.sidecar(sidecar, pid))
240 }
241 break
242 case 'say': {
243 printer.push(data.body)
244 }
245 break
246 case 'eos': {
247 // TODO No need to `killer.purge()`, we can absolutely remove
248 // the pid from the `Killer` here.
249 destructible.ephemeral('exit', (async () => {
250 const sidecar = sidecars[pid]
251 const [ code, signal ] = await exit
252 /*
253 sidecar.send({
254 module: 'prolific',
255 method: 'synchronous',
256 body: { method: 'exit', code: code, signal: signal }
257 })
258 */
259 sidecar.send({
260 module: 'prolific',
261 method: 'synchronous',
262 body: null
263 })
264 })())
265 }
266 break
267 default: {
268 killers.exit.watch(pid)
269 const sidecar = sidecars[pid]
270 // **TODO** Wait on callback?
271 sidecar.send({
272 module: 'prolific',
273 method: 'synchronous',
274 body: data.body
275 })
276 }
277 break
278 }
279 })
280
281 printer.say('start', {})
282
283 const stdio = inherit(arguable)
284
285 // TODO Restore inheritance.
286 const child = processes.spawn(main, argv, { stdio: stdio })
287 killers.start.watch(child.pid)
288
289 const traps = {}
290 for (const signal of signals) {
291 process.on(signal, traps[signal] = child.kill.bind(child, signal))
292 }
293
294 const exit = once(child, 'exit').promise
295
296 children.ephemeral('close', supervise.child(child))
297
298 // TODO How do we propogate signals?
299 // await arguable.destroyed
300 // destructible.destroy()
301 await children.destructed
302 supervisor.destroy()
303 await supervisor.destructed
304 const [ code, signal ] = await exit
305 for (const signal in traps) {
306 process.removeListener(signal, traps[signal])
307 }
308 if (signal != null) {
309 setInterval(noop, 1000)
310 setImmediate(() => process.kill(process.pid, signal))
311 }
312 return code
313})