1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | require('arguable')(module, {
|
15 | $trap: { SIGINT: 'swallow', SIGTERM: 'swallow' },
|
16 | process: process
|
17 | }, async (arguable) => {
|
18 | arguable.required('processor', 'supervisor')
|
19 |
|
20 | const Destructible = require('destructible')
|
21 | const destructible = new Destructible('sidecar')
|
22 |
|
23 |
|
24 | const assert = require('assert')
|
25 |
|
26 | const Consolidator = require('prolific.consolidator')
|
27 |
|
28 | const Logger = require('./logger')
|
29 |
|
30 | const tmp = arguable.ultimate.tmp
|
31 |
|
32 | const logger = new Logger(destructible.durable('logger'), Date, tmp, process.pid, 1000)
|
33 | logger.say('sidecar.start', { tmp })
|
34 |
|
35 | const _logger = require('prolific.logger').create('prolific')
|
36 | function memoryUsage () { _logger.notice('memory', process.memoryUsage()) }
|
37 | memoryUsage()
|
38 | const interval = setInterval(memoryUsage, 1000)
|
39 | destructible.destruct(() => clearInterval(interval))
|
40 |
|
41 | const Processor = require('./processor', arguable.ultimate.main)
|
42 |
|
43 | const processor = new Processor(arguable.ultimate.processor, arguable.ultimate.main, logger)
|
44 |
|
45 | const Queue = require('avenue')
|
46 | const queue = new Queue
|
47 |
|
48 | const processors = new Queue().shifter().paired
|
49 |
|
50 | async function update (socket) {
|
51 | for await (const processor of processors.shifter.iterator()) {
|
52 | logger.say('sidecar.triage', { processor })
|
53 | socket.write(JSON.stringify({ method: 'triage', processor }) + '\n')
|
54 | }
|
55 | }
|
56 |
|
57 | destructible.destruct(() => processors.shifter.destroy())
|
58 |
|
59 | processor.on('processor', (processor) => {
|
60 | logger.say('sidecar.processor', { processor })
|
61 | processors.queue.push(processor)
|
62 | })
|
63 |
|
64 |
|
65 | const consolidator = new Consolidator(queue)
|
66 |
|
67 | destructible.durable('process', queue.shifter().pump(chunk => processor.process(chunk)))
|
68 | destructible.ephemeral('configure', processor.configure())
|
69 |
|
70 | function message (message, socket) {
|
71 | switch (`${message.module}:${message.method}`) {
|
72 | case 'prolific:socket':
|
73 | logger.say('sidecar.socket', { message, socket: !! socket })
|
74 | if (socket != null) {
|
75 | socket.on('error', error => {
|
76 | logger.say('socket.error', { stack: error.stack })
|
77 | })
|
78 | destructible.ephemeral('read', update(socket))
|
79 | destructible.ephemeral('asynchronous', consolidator.asynchronous(socket, socket))
|
80 | }
|
81 | break
|
82 | case 'prolific:synchronous':
|
83 | logger.say('sidecar.synchronous', { message })
|
84 | consolidator.synchronous(message.body)
|
85 | break
|
86 | }
|
87 | }
|
88 | arguable.options.process.on('message', message)
|
89 | destructible.destruct(() => arguable.options.process.removeListener('message', message))
|
90 |
|
91 |
|
92 |
|
93 | arguable.options.process.send({
|
94 | module: 'prolific',
|
95 | method: 'receiving',
|
96 | child: +arguable.ultimate.child
|
97 | })
|
98 |
|
99 | await destructible.promise
|
100 |
|
101 | return 0
|
102 | })
|