UNPKG

9.83 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3/*
4 ___ usage ___ en_US ___
5 usage: prolific <options> <program>
6
7 options:
8
9 -i, --inherit <number>
10 file handles to inherit
11
12 -s, --scram <number>
13 number of seconds to wait for children to exit
14
15 -p, --processor <path>
16 path to processor
17
18 --help
19 display this message
20
21 ___ $ ___ en_US ___
22
23 log is required:
24 the `--log` address and port is a required argument
25
26 port is not an integer:
27 the `--log` port must be an integer
28
29 ___ . ___
30*/
31
32// Node.js API.
33const assert = require('assert')
34const path = require('path')
35const url = require('url')
36const processes = require('child_process')
37const crypto = require('crypto')
38const fs = require('fs').promises
39const net = require('net')
40const os = require('os')
41
42const noop = require('nop')
43
44const once = require('prospective/once')
45const callback = require('prospective/callback')
46
47const rimraf = require('rimraf')
48
49const Queue = require('avenue')
50
51const fnv = require('hash.fnv')
52
53const Isochronous = require('isochronous')
54
55// Exceptions that you can catch by type.
56const Interrupt = require('interrupt').create('prolific')
57
58// Command line and environment interpretation utilities.
59const inherit = require('prolific.inherit')
60
61// Monitoring of streams that contain logging messages.
62const Collector = require('prolific.collector')
63const Watcher = require('prolific.watcher')
64
65const coalesce = require('extant')
66
67const Tmp = require('./tmp')
68const Killer = require('./killer')
69const Header = require('./header')
70
71const Cubbyhole = require('cubbyhole')
72
73// Selected signals for propagation. These would appear to be the sort of
74// signals that a user would send to the monitored child from the terminal.
75// Either that or the sort of signals that a supervisor program would expect
76// the supervised program to respond to. Either way, the Prolific supervisor
77// doesn't really have a plan to handle them, so it forwards them to the
78// supervised child.
79//
80// Other signals would appear to be the sort of signals that the operating
81// system would send directly to the affected process.
82//
83// At some point we may add command line switches to specify with signals to
84// forward, but for now we'll provide this list as a convention and adjust the
85// convention to user feedback. We could give the option to cancel all
86// forwarding and then the user would be responsible for sending signals
87// directly to the supervised child process by its pid.
88//
89// https://nodejs.org/api/process.html#process_signal_events
90//
91const signals = [ 'SIGINT', 'SIGTERM', 'SIGHUP', 'SIGQUIT', 'SIGABRT', 'SIGUSR2' ]
92
93require('arguable')(module, { $trap: false }, async arguable => {
94 arguable.helpIf(arguable.ultimate.help)
95
96 const processor = arguable.ultimate.processor
97
98 process.env.PROLIFIC_SUPERVISOR_PROCESS_ID = process.pid
99
100 const sidecars = {}
101
102 // TODO What do you really want to name this?
103 const Destructible = require('destructible')
104 const destructible = new Destructible(1500, 'prolific')
105
106 const countdown = destructible.ephemeral('countdown')
107 const children = destructible.ephemeral('children')
108 const supervisor = destructible.durable('supervisor')
109
110 countdown.increment()
111
112 const supervise = {
113 sidecar: async (sidecar, pid) => {
114 const [ exitCode, signal ] = await once(sidecar, 'exit').promise
115 Interrupt.assert(exitCode == 0, 'sidecar.exit', {
116 exitCode: exitCode,
117 signal: signal,
118 argv: arguable.argv
119 })
120 delete sidecars[pid]
121 countdown.decrement()
122 },
123 child: async (child) => {
124 await exit
125 countdown.decrement()
126 }
127 }
128
129 const cubbyhole = new Cubbyhole
130
131 const Printer = require('./printer')
132
133 const tmp = await Tmp(coalesce(process.env.TMPDIR, '/tmp'), async () => {
134 const [ bytes ] = await callback(callback => crypto.randomBytes(16, callback))
135 return bytes.toString('hex')
136 }, process.pid)
137 supervisor.destruct(() => callback(callback => rimraf(tmp, callback)))
138
139 const printer = new Printer(supervisor.durable('printer'), lines => {
140 console.log(lines)
141 }, JSON.stringify, 1000)
142
143 const sockets = new Queue().shifter().paired
144 children.durable('sockets', sockets.shifter.pump(async socket => {
145 if (socket != null) {
146 const header = await Header(socket)
147 printer.say('header', header)
148 assert.equal(header.method, 'announce', 'announce missing')
149 await cubbyhole.get(header.pid)
150 cubbyhole.remove(header.pid)
151 printer.say('dispatch', { header, destroyed: socket.destroyed })
152 // **TODO** Uncomment to hang unit test.
153 // throw new Error
154 sidecars[header.pid].send({
155 module: 'prolific',
156 method: 'socket'
157 }, socket)
158 }
159 }))
160 children.destruct(() => sockets.shifter.destroy())
161 children.destruct(() => printer.say('destruct.children', {}))
162
163 const server = net.createServer(socket => {
164 sockets.queue.push(socket)
165 })
166 children.destruct(() => server.close())
167 await callback(callback => server.listen(path.resolve(tmp, 'socket'), callback))
168
169 await fs.mkdir(path.resolve(tmp, 'stage'))
170 await fs.mkdir(path.resolve(tmp, 'publish'))
171
172 process.env.PROLIFIC_TMPDIR = tmp
173
174 const toucher = new Isochronous(60000, true, async () => {
175 const now = Date.now()
176 await fs.utimes(tmp, now, now)
177 await fs.utimes(path.resolve(tmp, 'stage'), now, now)
178 await fs.utimes(path.resolve(tmp, 'publish'), now, now)
179 })
180
181 children.durable('toucher', toucher.start())
182 children.destruct(() => toucher.stop())
183
184 const killer = new Killer(100)
185 killer.on('killed', pid => watcher.killed(pid))
186 children.durable('killer', killer.run())
187 children.destruct(() => killer.destroy())
188
189 const watcher = new Watcher(children.durable('watcher'), buffer => {
190 return fnv(0, buffer, 0, buffer.length)
191 }, path.join(tmp, 'publish'))
192
193 countdown.destruct(() => watcher.drain())
194
195 const collector = new Collector
196
197 // watcher.on('notice', notice => console.log(notice))
198
199 watcher.on('data', data => collector.data(data))
200
201 const argv = arguable.argv.slice()
202 const main = argv.shift()
203
204 collector.on('data', data => {
205 const pid = data.pid
206 switch (data.body.method) {
207 case 'start': {
208 const sidecar = processes.spawn('node', [
209 path.join(__dirname, 'sidecar.bin.js'),
210 '--processor', processor,
211 '--supervisor', process.pid,
212 '--tmp', tmp,
213 '--child', pid,
214 '--main', main
215 ], {
216 stdio: [ 'ignore', 'inherit', 'inherit', 'ipc' ]
217 })
218 function message (message) {
219 assert.equal(`${message.module}:${message.method}`, 'prolific:receiving')
220 printer.say('receiving', message)
221 cubbyhole.set(message.child, true)
222 }
223 sidecar.on('message', message)
224 sidecar.on('exit', () => sidecar.removeListener('message', message))
225 countdown.increment()
226 sidecars[pid] = sidecar
227 children.ephemeral([ 'sidecar', sidecar.pid ], supervise.sidecar(sidecar, pid))
228 }
229 break
230 case 'say': {
231 printer.push(data.body)
232 }
233 break
234 case 'eos': {
235 // TODO No need to `killer.purge()`, we can absolutely remove
236 // the pid from the `Killer` here.
237 destructible.ephemeral('exit', (async () => {
238 killer.exited(pid)
239 const sidecar = sidecars[pid]
240 const [ code, signal ] = await exit
241 /*
242 sidecar.send({
243 module: 'prolific',
244 method: 'synchronous',
245 body: { method: 'exit', code: code, signal: signal }
246 })
247 */
248 sidecar.send({
249 module: 'prolific',
250 method: 'synchronous',
251 body: null
252 })
253 })())
254 }
255 break
256 default: {
257 killer.exit(pid)
258 const sidecar = sidecars[pid]
259 // **TODO** Wait on callback?
260 sidecar.send({
261 module: 'prolific',
262 method: 'synchronous',
263 body: data.body
264 })
265 }
266 break
267 }
268 })
269
270 printer.say('start', {})
271
272 const stdio = inherit(arguable)
273
274 // TODO Restore inheritance.
275 const child = processes.spawn(main, argv, { stdio: stdio })
276
277 const traps = {}
278 for (const signal of signals) {
279 process.on(signal, traps[signal] = child.kill.bind(child, signal))
280 }
281
282 const exit = once(child, 'exit').promise
283
284 children.ephemeral('close', supervise.child(child))
285
286 // TODO How do we propogate signals?
287 // await arguable.destroyed
288 // destructible.destroy()
289 await children.promise
290 supervisor.destroy()
291 await supervisor.promise
292 const [ code, signal ] = await exit
293 for (const signal in traps) {
294 process.removeListener(signal, traps[signal])
295 }
296 if (signal != null) {
297 setInterval(noop, 1000)
298 setImmediate(() => process.kill(process.pid, signal))
299 }
300 return code
301})