UNPKG

3.54 kBJavaScriptView Raw
1/*
2 ___ usage ___ en_US ___
3 usage: node sidecar.bin.js
4
5 -p, --processor <string> processor path
6 -s, --supervisor <string> pid of supervisor
7 -t, --tmp <string> path of temporary directory
8 -c, --child <string> monitored pid
9 -m, --main <string> source of child process
10 --help display this message
11
12 ___ . ___
13*/
14require('arguable')(module, {
15 $trap: { SIGINT: 'swallow', SIGTERM: 'swallow' },
16 process: process
17}, async (arguable) => {
18 arguable.required('processor', 'supervisor')
19
20 const Destructible = require('destructible')
21 const destructible = new Destructible('sidecar')
22
23 // Node.js API.
24 const assert = require('assert')
25
26 const Consolidator = require('prolific.consolidator')
27
28 const Logger = require('./logger')
29
30 const tmp = arguable.ultimate.tmp
31
32 const logger = new Logger(destructible.durable('logger'), Date, tmp, process.pid, 1000)
33 logger.say('sidecar.start', { tmp })
34
35 const _logger = require('prolific.logger').create('prolific')
36 function memoryUsage () { _logger.notice('memory', process.memoryUsage()) }
37 memoryUsage()
38 const interval = setInterval(memoryUsage, 1000)
39 destructible.destruct(() => clearInterval(interval))
40
41 const Processor = require('./processor', arguable.ultimate.main)
42
43 const processor = new Processor(arguable.ultimate.processor, arguable.ultimate.main, logger)
44
45 const Queue = require('avenue')
46 const queue = new Queue
47
48 const processors = new Queue().shifter().paired
49
50 async function update (socket) {
51 for await (const processor of processors.shifter.iterator()) {
52 logger.say('sidecar.triage', { processor })
53 socket.write(JSON.stringify({ method: 'triage', processor }) + '\n')
54 }
55 }
56
57 destructible.destruct(() => processors.shifter.destroy())
58
59 processor.on('processor', (processor) => {
60 logger.say('sidecar.processor', { processor })
61 processors.queue.push(processor)
62 })
63
64 // Listen to our asynchronous pipe.
65 const consolidator = new Consolidator(queue)
66
67 destructible.durable('process', queue.shifter().pump(chunk => processor.process(chunk)))
68 destructible.ephemeral('configure', processor.configure())
69
70 function message (message, socket) {
71 switch (`${message.module}:${message.method}`) {
72 case 'prolific:socket':
73 logger.say('sidecar.socket', { message, socket: !! socket })
74 if (socket != null) {
75 socket.on('error', error => {
76 logger.say('socket.error', { stack: error.stack })
77 })
78 destructible.ephemeral('read', update(socket))
79 destructible.ephemeral('asynchronous', consolidator.asynchronous(socket, socket))
80 }
81 break
82 case 'prolific:synchronous':
83 logger.say('sidecar.synchronous', { message })
84 consolidator.synchronous(message.body)
85 break
86 }
87 }
88 arguable.options.process.on('message', message)
89 destructible.destruct(() => arguable.options.process.removeListener('message', message))
90
91 // Let the supervisor know that we're ready. It will send our asynchronous
92 // pipe down to the monitored process.
93 arguable.options.process.send({
94 module: 'prolific',
95 method: 'receiving',
96 child: +arguable.ultimate.child
97 })
98
99 await destructible.promise
100
101 return 0
102})