1 | #!/usr/bin/env node
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 | const assert = require('assert')
|
34 | const path = require('path')
|
35 | const url = require('url')
|
36 | const processes = require('child_process')
|
37 | const crypto = require('crypto')
|
38 | const fs = require('fs').promises
|
39 | const net = require('net')
|
40 | const os = require('os')
|
41 |
|
42 | const noop = require('nop')
|
43 |
|
44 | const once = require('prospective/once')
|
45 | const callback = require('prospective/callback')
|
46 |
|
47 | const rimraf = require('rimraf')
|
48 |
|
49 | const Queue = require('avenue')
|
50 |
|
51 | const fnv = require('hash.fnv')
|
52 |
|
53 | const Isochronous = require('isochronous')
|
54 |
|
55 |
|
56 | const Interrupt = require('interrupt').create('prolific')
|
57 |
|
58 |
|
59 | const inherit = require('prolific.inherit')
|
60 |
|
61 |
|
62 | const Collector = require('prolific.collector')
|
63 | const Watcher = require('prolific.watcher')
|
64 |
|
65 | const coalesce = require('extant')
|
66 |
|
67 | const Tmp = require('./tmp')
|
68 | const Killer = require('./killer')
|
69 | const Header = require('./header')
|
70 |
|
71 | const Cubbyhole = require('cubbyhole')
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 | const signals = [ 'SIGINT', 'SIGTERM', 'SIGHUP', 'SIGQUIT', 'SIGABRT', 'SIGUSR2' ]
|
92 |
|
93 | require('arguable')(module, { $trap: false }, async arguable => {
|
94 | arguable.helpIf(arguable.ultimate.help)
|
95 |
|
96 | const processor = arguable.ultimate.processor
|
97 |
|
98 | process.env.PROLIFIC_SUPERVISOR_PROCESS_ID = process.pid
|
99 |
|
100 | const sidecars = {}
|
101 |
|
102 |
|
103 | const Destructible = require('destructible')
|
104 | const destructible = new Destructible(1500, 'prolific')
|
105 |
|
106 | const countdown = destructible.ephemeral('countdown')
|
107 | const children = destructible.ephemeral('children')
|
108 | const supervisor = destructible.durable('supervisor')
|
109 |
|
110 | countdown.increment()
|
111 |
|
112 | const supervise = {
|
113 | sidecar: async (sidecar, pid) => {
|
114 | const [ exitCode, signal ] = await once(sidecar, 'exit').promise
|
115 | Interrupt.assert(exitCode == 0, 'sidecar.exit', {
|
116 | exitCode: exitCode,
|
117 | signal: signal,
|
118 | argv: arguable.argv
|
119 | })
|
120 | delete sidecars[pid]
|
121 | countdown.decrement()
|
122 | },
|
123 | child: async (child) => {
|
124 | await exit
|
125 | countdown.decrement()
|
126 | }
|
127 | }
|
128 |
|
129 | const cubbyhole = new Cubbyhole
|
130 |
|
131 | const Printer = require('./printer')
|
132 |
|
133 | const tmp = await Tmp(coalesce(process.env.TMPDIR, '/tmp'), async () => {
|
134 | const [ bytes ] = await callback(callback => crypto.randomBytes(16, callback))
|
135 | return bytes.toString('hex')
|
136 | }, process.pid)
|
137 | supervisor.destruct(() => callback(callback => rimraf(tmp, callback)))
|
138 |
|
139 | const printer = new Printer(supervisor.durable('printer'), lines => {
|
140 | console.log(lines)
|
141 | }, JSON.stringify, 1000)
|
142 |
|
143 | const sockets = new Queue().shifter().paired
|
144 | children.durable('sockets', sockets.shifter.pump(async socket => {
|
145 | if (socket != null) {
|
146 | const header = await Header(socket)
|
147 | printer.say('header', header)
|
148 | assert.equal(header.method, 'announce', 'announce missing')
|
149 | await cubbyhole.get(header.pid)
|
150 | cubbyhole.remove(header.pid)
|
151 | printer.say('dispatch', { header, destroyed: socket.destroyed })
|
152 |
|
153 |
|
154 | sidecars[header.pid].send({
|
155 | module: 'prolific',
|
156 | method: 'socket'
|
157 | }, socket)
|
158 | }
|
159 | }))
|
160 | children.destruct(() => sockets.shifter.destroy())
|
161 | children.destruct(() => printer.say('destruct.children', {}))
|
162 |
|
163 | const server = net.createServer(socket => {
|
164 | sockets.queue.push(socket)
|
165 | })
|
166 | children.destruct(() => server.close())
|
167 | await callback(callback => server.listen(path.resolve(tmp, 'socket'), callback))
|
168 |
|
169 | await fs.mkdir(path.resolve(tmp, 'stage'))
|
170 | await fs.mkdir(path.resolve(tmp, 'publish'))
|
171 |
|
172 | process.env.PROLIFIC_TMPDIR = tmp
|
173 |
|
174 | const toucher = new Isochronous(60000, true, async () => {
|
175 | const now = Date.now()
|
176 | await fs.utimes(tmp, now, now)
|
177 | await fs.utimes(path.resolve(tmp, 'stage'), now, now)
|
178 | await fs.utimes(path.resolve(tmp, 'publish'), now, now)
|
179 | })
|
180 |
|
181 | children.durable('toucher', toucher.start())
|
182 | children.destruct(() => toucher.stop())
|
183 |
|
184 | const killer = new Killer(100)
|
185 | killer.on('killed', pid => watcher.killed(pid))
|
186 | children.durable('killer', killer.run())
|
187 | children.destruct(() => killer.destroy())
|
188 |
|
189 | const watcher = new Watcher(children.durable('watcher'), buffer => {
|
190 | return fnv(0, buffer, 0, buffer.length)
|
191 | }, path.join(tmp, 'publish'))
|
192 |
|
193 | countdown.destruct(() => watcher.drain())
|
194 |
|
195 | const collector = new Collector
|
196 |
|
197 |
|
198 |
|
199 | watcher.on('data', data => collector.data(data))
|
200 |
|
201 | const argv = arguable.argv.slice()
|
202 | const main = argv.shift()
|
203 |
|
204 | collector.on('data', data => {
|
205 | const pid = data.pid
|
206 | switch (data.body.method) {
|
207 | case 'start': {
|
208 | const sidecar = processes.spawn('node', [
|
209 | path.join(__dirname, 'sidecar.bin.js'),
|
210 | '--processor', processor,
|
211 | '--supervisor', process.pid,
|
212 | '--tmp', tmp,
|
213 | '--child', pid,
|
214 | '--main', main
|
215 | ], {
|
216 | stdio: [ 'ignore', 'inherit', 'inherit', 'ipc' ]
|
217 | })
|
218 | function message (message) {
|
219 | assert.equal(`${message.module}:${message.method}`, 'prolific:receiving')
|
220 | printer.say('receiving', message)
|
221 | cubbyhole.set(message.child, true)
|
222 | }
|
223 | sidecar.on('message', message)
|
224 | sidecar.on('exit', () => sidecar.removeListener('message', message))
|
225 | countdown.increment()
|
226 | sidecars[pid] = sidecar
|
227 | children.ephemeral([ 'sidecar', sidecar.pid ], supervise.sidecar(sidecar, pid))
|
228 | }
|
229 | break
|
230 | case 'say': {
|
231 | printer.push(data.body)
|
232 | }
|
233 | break
|
234 | case 'eos': {
|
235 |
|
236 |
|
237 | destructible.ephemeral('exit', (async () => {
|
238 | killer.exited(pid)
|
239 | const sidecar = sidecars[pid]
|
240 | const [ code, signal ] = await exit
|
241 | |
242 |
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 | sidecar.send({
|
249 | module: 'prolific',
|
250 | method: 'synchronous',
|
251 | body: null
|
252 | })
|
253 | })())
|
254 | }
|
255 | break
|
256 | default: {
|
257 | killer.exit(pid)
|
258 | const sidecar = sidecars[pid]
|
259 |
|
260 | sidecar.send({
|
261 | module: 'prolific',
|
262 | method: 'synchronous',
|
263 | body: data.body
|
264 | })
|
265 | }
|
266 | break
|
267 | }
|
268 | })
|
269 |
|
270 | printer.say('start', {})
|
271 |
|
272 | const stdio = inherit(arguable)
|
273 |
|
274 |
|
275 | const child = processes.spawn(main, argv, { stdio: stdio })
|
276 |
|
277 | const traps = {}
|
278 | for (const signal of signals) {
|
279 | process.on(signal, traps[signal] = child.kill.bind(child, signal))
|
280 | }
|
281 |
|
282 | const exit = once(child, 'exit').promise
|
283 |
|
284 | children.ephemeral('close', supervise.child(child))
|
285 |
|
286 |
|
287 |
|
288 |
|
289 | await children.promise
|
290 | supervisor.destroy()
|
291 | await supervisor.promise
|
292 | const [ code, signal ] = await exit
|
293 | for (const signal in traps) {
|
294 | process.removeListener(signal, traps[signal])
|
295 | }
|
296 | if (signal != null) {
|
297 | setInterval(noop, 1000)
|
298 | setImmediate(() => process.kill(process.pid, signal))
|
299 | }
|
300 | return code
|
301 | })
|