1 | #!/usr/bin/env node
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 | const assert = require('assert')
|
34 | const path = require('path')
|
35 | const url = require('url')
|
36 | const processes = require('child_process')
|
37 | const crypto = require('crypto')
|
38 | const fs = require('fs').promises
|
39 | const net = require('net')
|
40 | const os = require('os')
|
41 |
|
42 | const noop = require('nop')
|
43 |
|
44 | const once = require('prospective/once')
|
45 | const callback = require('prospective/callback')
|
46 |
|
47 | const rimraf = require('rimraf')
|
48 |
|
49 | const Queue = require('avenue')
|
50 |
|
51 | const fnv = require('hash.fnv')
|
52 |
|
53 | const Isochronous = require('isochronous')
|
54 |
|
55 |
|
56 | const Interrupt = require('interrupt').create('prolific')
|
57 |
|
58 |
|
59 | const inherit = require('prolific.inherit')
|
60 |
|
61 |
|
62 | const Collector = require('prolific.collector')
|
63 | const Watcher = require('prolific.watcher')
|
64 |
|
65 | const coalesce = require('extant')
|
66 |
|
67 | const Tmp = require('./tmp')
|
68 | const Killer = require('./killer')
|
69 | const Header = require('./header')
|
70 |
|
71 | const Cubbyhole = require('cubbyhole')
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 | const signals = [ 'SIGINT', 'SIGTERM', 'SIGHUP', 'SIGQUIT', 'SIGABRT', 'SIGUSR2' ]
|
92 |
|
93 | require('arguable')(module, { $trap: false }, async arguable => {
|
94 | arguable.helpIf(arguable.ultimate.help)
|
95 |
|
96 | const processor = arguable.ultimate.processor
|
97 |
|
98 | process.env.PROLIFIC_SUPERVISOR_PROCESS_ID = process.pid
|
99 |
|
100 | const sidecars = {}
|
101 |
|
102 |
|
103 | const Destructible = require('destructible')
|
104 | const destructible = new Destructible(1500, 'prolific')
|
105 |
|
106 | const countdown = destructible.ephemeral('countdown')
|
107 | const children = destructible.ephemeral('children')
|
108 | const supervisor = destructible.durable('supervisor')
|
109 |
|
110 | countdown.increment()
|
111 |
|
112 | const supervise = {
|
113 | sidecar: async (sidecar, pid) => {
|
114 | const [ exitCode, signal ] = await once(sidecar, 'exit').promise
|
115 | Interrupt.assert(exitCode == 0, 'sidecar.exit', {
|
116 | exitCode: exitCode,
|
117 | signal: signal,
|
118 | argv: arguable.argv
|
119 | })
|
120 | delete sidecars[pid]
|
121 | countdown.decrement()
|
122 | },
|
123 | child: async (child) => {
|
124 | await exit
|
125 | countdown.decrement()
|
126 | }
|
127 | }
|
128 |
|
129 | const cubbyhole = new Cubbyhole
|
130 |
|
131 | const Printer = require('./printer')
|
132 |
|
133 | const tmp = await Tmp(coalesce(process.env.TMPDIR, '/tmp'), async () => {
|
134 | const [ bytes ] = await callback(callback => crypto.randomBytes(16, callback))
|
135 | return bytes.toString('hex')
|
136 | }, process.pid)
|
137 | supervisor.destruct(() => callback(callback => rimraf(tmp, callback)))
|
138 |
|
139 | const printer = new Printer(supervisor.durable('printer'), lines => {
|
140 | console.log(lines)
|
141 | }, JSON.stringify, 1000)
|
142 |
|
143 | const sockets = new Queue().shifter().paired
|
144 | children.durable('sockets', sockets.shifter.pump(async socket => {
|
145 | if (socket != null) {
|
146 | const header = await Header(socket)
|
147 | printer.say('header', { header })
|
148 | if (header != null) {
|
149 | assert.equal(header.method, 'announce', 'announce missing')
|
150 | killers.start.unwatch(header.pid)
|
151 | await cubbyhole.get(header.pid)
|
152 | cubbyhole.remove(header.pid)
|
153 | printer.say('dispatch', { header, destroyed: socket.destroyed })
|
154 |
|
155 |
|
156 | sidecars[header.pid].send({ module: 'prolific', method: 'socket' }, socket)
|
157 | }
|
158 | }
|
159 | }))
|
160 | children.destruct(() => sockets.shifter.destroy())
|
161 | children.destruct(() => printer.say('destruct.children', {}))
|
162 |
|
163 | const server = net.createServer(socket => {
|
164 | sockets.queue.push(socket)
|
165 | })
|
166 | children.destruct(() => server.close())
|
167 | await callback(callback => server.listen(path.resolve(tmp, 'socket'), callback))
|
168 |
|
169 | await fs.mkdir(path.resolve(tmp, 'stage'))
|
170 | await fs.mkdir(path.resolve(tmp, 'publish'))
|
171 |
|
172 | process.env.PROLIFIC_TMPDIR = tmp
|
173 |
|
174 | const toucher = new Isochronous(60000, true, async () => {
|
175 | const now = Date.now()
|
176 | await fs.utimes(tmp, now, now)
|
177 | await fs.utimes(path.resolve(tmp, 'stage'), now, now)
|
178 | await fs.utimes(path.resolve(tmp, 'publish'), now, now)
|
179 | })
|
180 |
|
181 | children.durable('toucher', toucher.start())
|
182 | children.destruct(() => toucher.stop())
|
183 |
|
184 | const watcher = new Watcher(children.durable('watcher'), buffer => {
|
185 | return fnv(0, buffer, 0, buffer.length)
|
186 | }, path.join(tmp, 'publish'))
|
187 | countdown.destruct(() => watcher.drain())
|
188 |
|
189 | const killers = {
|
190 | start: new Killer(100, printer),
|
191 | exit: new Killer(100, printer)
|
192 | }
|
193 | Error.stackTraceLimit = 32
|
194 | killers.start.on('killed', killers.exit.watch.bind(killers.exit))
|
195 | killers.exit.on('killed', watcher.killed.bind(watcher))
|
196 | children.durable([ 'killer', 'start' ], killers.start.run())
|
197 | children.durable([ 'killer', 'exit' ], killers.exit.run())
|
198 | children.destruct(() => killers.start.destroy())
|
199 | children.destruct(() => killers.exit.destroy())
|
200 |
|
201 | const collector = new Collector
|
202 |
|
203 |
|
204 |
|
205 | watcher.on('data', data => collector.data(data))
|
206 |
|
207 | const argv = arguable.argv.slice()
|
208 | const main = argv.shift()
|
209 |
|
210 | collector.on('data', data => {
|
211 | const pid = data.pid
|
212 | switch (data.body.method) {
|
213 | case 'start': {
|
214 | const sidecar = processes.spawn('node', [
|
215 | path.join(__dirname, 'sidecar.bin.js'),
|
216 | '--processor', processor,
|
217 | '--supervisor', process.pid,
|
218 | '--tmp', tmp,
|
219 | '--child', pid,
|
220 | '--main', main
|
221 | ], {
|
222 | stdio: [ 'ignore', 'inherit', 'inherit', 'ipc' ]
|
223 | })
|
224 | function message (message) {
|
225 | printer.say(message.method, message)
|
226 | switch (message.method) {
|
227 | case 'receiving':
|
228 | cubbyhole.set(message.child, true)
|
229 | break
|
230 | case 'closed':
|
231 | killers.exit.watch(message.child)
|
232 | break
|
233 | }
|
234 | }
|
235 | sidecar.on('message', message)
|
236 | sidecar.on('exit', () => sidecar.removeListener('message', message))
|
237 | countdown.increment()
|
238 | sidecars[pid] = sidecar
|
239 | children.ephemeral([ 'sidecar', sidecar.pid ], supervise.sidecar(sidecar, pid))
|
240 | }
|
241 | break
|
242 | case 'say': {
|
243 | printer.push(data.body)
|
244 | }
|
245 | break
|
246 | case 'eos': {
|
247 |
|
248 |
|
249 | destructible.ephemeral('exit', (async () => {
|
250 | const sidecar = sidecars[pid]
|
251 | const [ code, signal ] = await exit
|
252 | |
253 |
|
254 |
|
255 |
|
256 |
|
257 |
|
258 |
|
259 | sidecar.send({
|
260 | module: 'prolific',
|
261 | method: 'synchronous',
|
262 | body: null
|
263 | })
|
264 | })())
|
265 | }
|
266 | break
|
267 | default: {
|
268 | killers.exit.watch(pid)
|
269 | const sidecar = sidecars[pid]
|
270 |
|
271 | sidecar.send({
|
272 | module: 'prolific',
|
273 | method: 'synchronous',
|
274 | body: data.body
|
275 | })
|
276 | }
|
277 | break
|
278 | }
|
279 | })
|
280 |
|
281 | printer.say('start', {})
|
282 |
|
283 | const stdio = inherit(arguable)
|
284 |
|
285 |
|
286 | const child = processes.spawn(main, argv, { stdio: stdio })
|
287 | killers.start.watch(child.pid)
|
288 |
|
289 | const traps = {}
|
290 | for (const signal of signals) {
|
291 | process.on(signal, traps[signal] = child.kill.bind(child, signal))
|
292 | }
|
293 |
|
294 | const exit = once(child, 'exit').promise
|
295 |
|
296 | children.ephemeral('close', supervise.child(child))
|
297 |
|
298 |
|
299 |
|
300 |
|
301 | await children.destructed
|
302 | supervisor.destroy()
|
303 | await supervisor.destructed
|
304 | const [ code, signal ] = await exit
|
305 | for (const signal in traps) {
|
306 | process.removeListener(signal, traps[signal])
|
307 | }
|
308 | if (signal != null) {
|
309 | setInterval(noop, 1000)
|
310 | setImmediate(() => process.kill(process.pid, signal))
|
311 | }
|
312 | return code
|
313 | })
|