1 | // Node.js API.
|
2 | const assert = require('assert')
|
3 | const path = require('path')
|
4 | const events = require('events')
|
5 |
|
6 | const Reconfigurator = require('reconfigure')
|
7 | const BufferConfigurator = require('reconfigure/buffer')
|
8 | const sink = require('prolific.sink')
|
9 | const Evaluator = require('prolific.evaluator')
|
10 |
|
11 | const Destructible = require('destructible')
|
12 |
|
13 | // Construct a processor that will reload it's source from the given source and
|
14 | // call the given function with the new in-process Acceptor soure THIS IS OUT OF
|
15 | // DATE. The in-process Acceptor source oof, blah, blah will be routed to the
|
16 | // child process we're monitoring and injected into it so that the first round
|
17 | // of filtering will happen in-process saving copying time.
|
18 | //
|
19 | // The given destructible is used to manage the lifecycle of the pipeline and
|
20 | // its processors.
|
21 |
|
22 | //
|
23 | class Processor extends events.EventEmitter {
|
24 | constructor (source, main, logger, _Date = Date) {
|
25 | super()
|
26 | this.destroyed = false
|
27 |
|
28 | this._logger = logger
|
29 |
|
30 | this._destructible = new Destructible('prolific/processor')
|
31 | this._destructible.destruct(() => this.destroyed = true)
|
32 |
|
33 | const file = this._file = path.resolve(source)
|
34 |
|
35 | this._version = 0
|
36 | this._versions = []
|
37 |
|
38 | this._backlog = []
|
39 | sink.json = (level, qualifier, label, body, system) => {
|
40 | assert(typeof level == 'string')
|
41 | this._process([{ when: _Date.now(), level: level, qualifier, label, body, system }])
|
42 | }
|
43 | this._processor = {
|
44 | previous: () => {},
|
45 | process: entries => this._backlog.push(entries)
|
46 | }
|
47 |
|
48 | const self = this
|
49 | this._reconfigurator = new Reconfigurator(file, new class extends BufferConfigurator {
|
50 | async configure (buffer) {
|
51 | const source = buffer.toString()
|
52 | const resolved = Evaluator.resolve(file, main)
|
53 | const definition = Evaluator.create(source, resolved)
|
54 | const triage = definition.triage()
|
55 | const process = await definition.process()
|
56 | const processor = typeof process == 'function' ? { process } : process
|
57 | return { buffer, source, triage, processor, resolved }
|
58 | }
|
59 | reload (previous, buffer) {
|
60 | return super.reload(previous.buffer, buffer)
|
61 | }
|
62 | })
|
63 | this._reconfigurator.on('error', this.emit.bind(this, 'error'))
|
64 | this._nextVersion = 0
|
65 |
|
66 | this._destructible.destruct(() => this._reconfigurator.destroy())
|
67 | }
|
68 |
|
69 | destroy () {
|
70 | this._destructible.destroy()
|
71 | }
|
72 |
|
73 | async configure () {
|
74 | const { source, resolved, triage, processor } = await this._reconfigurator.shift()
|
75 | this._processor = {
|
76 | previous: this._processor.previous,
|
77 | process: entries => {
|
78 | return processor.process(entries.filter(entry => {
|
79 | return triage(entry.level, entry.qualifier, entry.label, entry.body, entry.system)
|
80 | }))
|
81 | }
|
82 | }
|
83 | for (const entries of this._backlog) {
|
84 | await this._process(entries)
|
85 | }
|
86 | const version = this._version++
|
87 | this._versions.push({ previous: () => {}, version, processor })
|
88 | this._previous = process
|
89 | this.emit('processor', { version, source, resolved })
|
90 | for await (const { resolved, source, processor } of this._reconfigurator) {
|
91 | const version = this._version++
|
92 | this._versions.push({ previous: this._previous, version, processor })
|
93 | this._previous = process
|
94 | this.emit('processor', { version, source, resolved })
|
95 | }
|
96 | }
|
97 |
|
98 | async _process (entries) {
|
99 | try {
|
100 | await this._processor.process(entries)
|
101 | } catch (error) {
|
102 | this._logger.say('process.error', { stack: error.stack })
|
103 | }
|
104 | }
|
105 |
|
106 | // Async because if we have a version bump we have to wait for the
|
107 | // previous processor to drain to start the new one, however we are not
|
108 | // processing per message, we are getting chunks of messages and those
|
109 | // are processed synchronously.
|
110 | async process (batch) {
|
111 | assert(!this.destroyed)
|
112 | // TODO Here we could wrap in a try/catch and on we could revert to a
|
113 | // fallback. The the triage and processor would be rebuilt, maybe we
|
114 | // hash the source so we don't use it again, we'd have to apply triage
|
115 | // in the monitor as if it where startup, so we might be filtering
|
116 | // incorrectly, the fallback might crash because it is missing messages
|
117 | // (but it shouldn't do that because messages do go missing for many
|
118 | // reasons) but better than crashing and it will even out when the child
|
119 | // gets the new triage shorty, or...
|
120 | //
|
121 | // We just drop messages until the reverted triage is in place.
|
122 | //
|
123 | // Which keeps the program running when it would have crashed.
|
124 | //
|
125 | // But, then, if you really want to fuss, maybe the previous processor
|
126 | // depended on a message emitted at program start, so if the user is not
|
127 | // designing for processing a possibly inconsistent stream of messages,
|
128 | // we're not going to be able to fix that for them here.
|
129 | //
|
130 | // Stretch goal, though.
|
131 | //
|
132 | // Could we get more performance if processors handled messages in a
|
133 | // chunked fashion, send an entire array of entries?
|
134 | //
|
135 | // We could set it up so that we flag our version somewhere earlier and
|
136 | // this method is called only with sync message processing.
|
137 | //
|
138 | // TODO Okay, we're already batching things into arrays, so we can have
|
139 | // a special function in the shuttle that forces a batch of any existing
|
140 | // messages, then creates a single entry array with the control message.
|
141 | // Those messages can be detected at deserialization and a sync process
|
142 | // function can be called, we have an ansyc control function for version
|
143 | // bumps, now processors handle entries in batches, which means that
|
144 | // they don't have to worry about chunking posts to 3rd parties, they're
|
145 | // already kind of chunked.
|
146 | switch (batch && batch.method) {
|
147 | case 'version': {
|
148 | const version = this._versions.shift()
|
149 | assert(this._nextVersion == version.version)
|
150 | assert(this._nextVersion == batch.version)
|
151 | this._nextVersion++
|
152 | const processor = version.processor
|
153 | await this._processor.previous.call(null, null)
|
154 | this._processor = {
|
155 | previous: version.previous,
|
156 | process: entries => processor.process(entries)
|
157 | }
|
158 | }
|
159 | break
|
160 | case 'entries': {
|
161 | await this._process(batch.entries)
|
162 | }
|
163 | break
|
164 | case 'exit': {
|
165 | }
|
166 | break
|
167 | case null: {
|
168 | // TODO Normalize context.
|
169 | await this._process([{
|
170 | when: sink.Date.now(),
|
171 | level: 'panic',
|
172 | qualifier: 'prolific',
|
173 | label: 'eos',
|
174 | body: {},
|
175 | system: sink.properties
|
176 | }])
|
177 | await this._processor.previous.call(null, null)
|
178 | for (const version of this._versions) {
|
179 | if (version.processor.destroy) {
|
180 | await version.processor.destroy()
|
181 | }
|
182 | }
|
183 | this._destructible.destroy()
|
184 | await this._destructible.destructed
|
185 | }
|
186 | break
|
187 | }
|
188 | }
|
189 | }
|
190 |
|
191 | module.exports = Processor
|