UNPKG

4.17 kBJavaScriptView Raw
1/*
2 ___ usage ___ en_US ___
3 usage: node sidecar.bin.js
4
5 -p, --processor <string> processor path
6 -s, --supervisor <string> pid of supervisor
7 -t, --tmp <string> path of temporary directory
8 -c, --child <string> monitored pid
9 -m, --main <string> source of child process
10 --help display this message
11
12 ___ . ___
13*/
14require('arguable')(module, {
15 $trap: { SIGINT: 'swallow', SIGTERM: 'swallow' },
16 process: process
17}, async (arguable) => {
18 arguable.required('processor', 'supervisor')
19
20 const Destructible = require('destructible')
21 const destructible = new Destructible('sidecar')
22
23 // Node.js API.
24 const assert = require('assert')
25
26 const Consolidator = require('prolific.consolidator')
27
28 const Logger = require('./logger')
29
30 const tmp = arguable.ultimate.tmp
31
32 const logger = new Logger(destructible.durable('logger'), Date, tmp, process.pid, 1000)
33 logger.say('sidecar.start', { tmp })
34
35 const _logger = require('prolific.logger').create('prolific')
36 function memoryUsage () { _logger.notice('memory', process.memoryUsage()) }
37 memoryUsage()
38 const interval = setInterval(memoryUsage, 1000)
39 destructible.destruct(() => clearInterval(interval))
40
41 const Processor = require('./processor', arguable.ultimate.main)
42
43 const processor = new Processor(arguable.ultimate.processor, arguable.ultimate.main, logger)
44
45 const Queue = require('avenue')
46 const queue = new Queue
47
48 const processors = new Queue().shifter().paired
49
50 async function update (socket) {
51 for await (const processor of processors.shifter.iterator()) {
52 logger.say('sidecar.triage', { processor })
53 socket.write(JSON.stringify({ method: 'triage', processor }) + '\n')
54 }
55 }
56
57 destructible.destruct(() => processors.shifter.destroy())
58
59 processor.on('processor', (processor) => {
60 logger.say('sidecar.processor', { processor })
61 processors.queue.push(processor)
62 })
63
64 // Listen to our asynchronous pipe.
65 const consolidator = new Consolidator(queue)
66
67 destructible.durable('process', queue.shifter().pump(chunk => processor.process(chunk)))
68 destructible.ephemeral('configure', processor.configure())
69
70 function message (message, socket) {
71 switch (`${message.module}:${message.method}`) {
72 case 'prolific:socket':
73 const listeners = [ 'end', 'close' ]
74 const closed = () => {
75 for (const listener of listeners) {
76 socket.removeListener(listener, closed)
77 }
78 arguable.options.process.send({
79 module: 'prolific',
80 method: 'closed',
81 child: +arguable.ultimate.child
82 })
83 }
84 logger.say('sidecar.socket', { message, socket: !! socket })
85 if (socket == null) {
86 listeners.length = 0
87 closed()
88 } else {
89 for (const listener of listeners) {
90 socket.on(listener, closed)
91 }
92 socket.on('error', error => {
93 logger.say('socket.error', { stack: error.stack })
94 })
95 destructible.ephemeral('read', update(socket))
96 destructible.ephemeral('asynchronous', consolidator.asynchronous(socket, socket))
97 }
98 break
99 case 'prolific:synchronous':
100 logger.say('sidecar.synchronous', { message })
101 consolidator.synchronous(message.body)
102 break
103 }
104 }
105 arguable.options.process.on('message', message)
106 destructible.destruct(() => arguable.options.process.removeListener('message', message))
107
108 // Let the supervisor know that we're ready. It will send our asynchronous
109 // pipe down to the monitored process.
110 arguable.options.process.send({
111 module: 'prolific',
112 method: 'receiving',
113 child: +arguable.ultimate.child
114 })
115
116 await destructible.destructed
117
118 return 0
119})