UNPKG

7.82 kBJavaScriptView Raw
1// Node.js API.
2const assert = require('assert')
3const path = require('path')
4const events = require('events')
5
6const Reconfigurator = require('reconfigure')
7const BufferConfigurator = require('reconfigure/buffer')
8const sink = require('prolific.sink')
9const Evaluator = require('prolific.evaluator')
10
11const Destructible = require('destructible')
12
13// Construct a processor that will reload it's source from the given source and
14// call the given function with the new in-process Acceptor soure THIS IS OUT OF
15// DATE. The in-process Acceptor source oof, blah, blah will be routed to the
16// child process we're monitoring and injected into it so that the first round
17// of filtering will happen in-process saving copying time.
18//
19// The given destructible is used to manage the lifecycle of the pipeline and
20// its processors.
21
22//
23class Processor extends events.EventEmitter {
24 constructor (source, main, logger, _Date = Date) {
25 super()
26 this.destroyed = false
27
28 this._logger = logger
29
30 this._destructible = new Destructible('prolific/processor')
31 this._destructible.destruct(() => this.destroyed = true)
32
33 const file = this._file = path.resolve(source)
34
35 this._version = 0
36 this._versions = []
37
38 this._backlog = []
39 sink.json = (level, qualifier, label, body, system) => {
40 assert(typeof level == 'string')
41 this._process([{ when: _Date.now(), level: level, qualifier, label, body, system }])
42 }
43 this._processor = {
44 previous: () => {},
45 process: entries => this._backlog.push(entries)
46 }
47
48 const self = this
49 this._reconfigurator = new Reconfigurator(file, new class extends BufferConfigurator {
50 async configure (buffer) {
51 const source = buffer.toString()
52 const resolved = Evaluator.resolve(file, main)
53 const definition = Evaluator.create(source, resolved)
54 const triage = definition.triage()
55 const process = await definition.process()
56 const processor = typeof process == 'function' ? { process } : process
57 return { buffer, source, triage, processor, resolved }
58 }
59 reload (previous, buffer) {
60 return super.reload(previous.buffer, buffer)
61 }
62 })
63 this._reconfigurator.on('error', this.emit.bind(this, 'error'))
64 this._nextVersion = 0
65
66 this._destructible.destruct(() => this._reconfigurator.destroy())
67 }
68
69 destroy () {
70 this._destructible.destroy()
71 }
72
73 async configure () {
74 const { source, resolved, triage, processor } = await this._reconfigurator.shift()
75 this._processor = {
76 previous: this._processor.previous,
77 process: entries => {
78 return processor.process(entries.filter(entry => {
79 return triage(entry.level, entry.qualifier, entry.label, entry.body, entry.system)
80 }))
81 }
82 }
83 for (const entries of this._backlog) {
84 await this._process(entries)
85 }
86 const version = this._version++
87 this._versions.push({ previous: () => {}, version, processor })
88 this._previous = process
89 this.emit('processor', { version, source, resolved })
90 for await (const { resolved, source, processor } of this._reconfigurator) {
91 const version = this._version++
92 this._versions.push({ previous: this._previous, version, processor })
93 this._previous = process
94 this.emit('processor', { version, source, resolved })
95 }
96 }
97
98 async _process (entries) {
99 try {
100 await this._processor.process(entries)
101 } catch (error) {
102 this._logger.say('process.error', { stack: error.stack })
103 }
104 }
105
106 // Async because if we have a version bump we have to wait for the
107 // previous processor to drain to start the new one, however we are not
108 // processing per message, we are getting chunks of messages and those
109 // are processed synchronously.
110 async process (batch) {
111 assert(!this.destroyed)
112 // TODO Here we could wrap in a try/catch and on we could revert to a
113 // fallback. The the triage and processor would be rebuilt, maybe we
114 // hash the source so we don't use it again, we'd have to apply triage
115 // in the monitor as if it where startup, so we might be filtering
116 // incorrectly, the fallback might crash because it is missing messages
117 // (but it shouldn't do that because messages do go missing for many
118 // reasons) but better than crashing and it will even out when the child
119 // gets the new triage shorty, or...
120 //
121 // We just drop messages until the reverted triage is in place.
122 //
123 // Which keeps the program running when it would have crashed.
124 //
125 // But, then, if you really want to fuss, maybe the previous processor
126 // depended on a message emitted at program start, so if the user is not
127 // designing for processing a possibly inconsistent stream of messages,
128 // we're not going to be able to fix that for them here.
129 //
130 // Stretch goal, though.
131 //
132 // Could we get more performance if processors handled messages in a
133 // chunked fashion, send an entire array of entries?
134 //
135 // We could set it up so that we flag our version somewhere earlier and
136 // this method is called only with sync message processing.
137 //
138 // TODO Okay, we're already batching things into arrays, so we can have
139 // a special function in the shuttle that forces a batch of any existing
140 // messages, then creates a single entry array with the control message.
141 // Those messages can be detected at deserialization and a sync process
142 // function can be called, we have an ansyc control function for version
143 // bumps, now processors handle entries in batches, which means that
144 // they don't have to worry about chunking posts to 3rd parties, they're
145 // already kind of chunked.
146 switch (batch && batch.method) {
147 case 'version': {
148 const version = this._versions.shift()
149 assert(this._nextVersion == version.version)
150 assert(this._nextVersion == batch.version)
151 this._nextVersion++
152 const processor = version.processor
153 await this._processor.previous.call(null, null)
154 this._processor = {
155 previous: version.previous,
156 process: entries => processor.process(entries)
157 }
158 }
159 break
160 case 'entries': {
161 await this._process(batch.entries)
162 }
163 break
164 case 'exit': {
165 }
166 break
167 case null: {
168 // TODO Normalize context.
169 await this._process([{
170 when: sink.Date.now(),
171 level: 'panic',
172 qualifier: 'prolific',
173 label: 'eos',
174 body: {},
175 system: sink.properties
176 }])
177 await this._processor.previous.call(null, null)
178 for (const version of this._versions) {
179 if (version.processor.destroy) {
180 await version.processor.destroy()
181 }
182 }
183 this._destructible.destroy()
184 await this._destructible.promise
185 }
186 break
187 }
188 }
189}
190
191module.exports = Processor