UNPKG

8.17 kBJavaScriptView Raw
1const ldmp = require('@dendra-science/csi-ldmp-client')
2const moment = require('moment')
3const request = require('request')
4const {MomentEditor} = require('@dendra-science/utils-moment')
5
6function handleRecord (rec) {
7 if (!rec) return
8
9 const m = this.model
10 const recNbr = rec.recordNumber
11
12 try {
13 //
14 // Begin standard record validation
15 // TODO: Move to helper
16 if (typeof recNbr === 'undefined') throw new Error('Record number undefined')
17
18 if (m.specifyStateAt !== m.stateAt) {
19 this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`)
20 return
21 }
22
23 const recordDate = moment(rec.timeString).utcOffset(0, true).utc()
24
25 if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format')
26
27 const sourceKey = `${rec.station} ${rec.table}`
28 const source = m.sources[sourceKey]
29
30 if (!source) throw new Error(`No source found for '${sourceKey}'`)
31 // End standard record validation
32 //
33
34 /*
35 Construct the following in order to write a point to InfluxDB:
36 1. The fieldSet
37 2. The measurementTagSet
38 3. The time (in milliseconds)
39 4. The Line Protocol buffer string (buf)
40 */
41
42 // Allow for static fields to be specified for every point
43 const row = Object.assign({}, source.load.fields, rec.fields && rec.fields.reduce((obj, field) => {
44 if (field.name) obj[field.name.replace(/\W+/g, '_')] = field.value
45 return obj
46 }, {}))
47 const fieldSet = Object.keys(row).filter(key => {
48 return (typeof row[key] === 'number') && !Number.isNaN(row[key])
49 }).map(key => {
50 return `${key}=${row[key]}`
51 })
52
53 if (fieldSet.length === 0) throw new Error('Nothing to write')
54
55 const measurementTagSet = [...source.measurementTagSet]
56
57 // Allow for time-based tags to be specified for every point
58 const tagMoment = source.tagTimeEditor ? source.tagTimeEditor.edit(recordDate) : recordDate
59 const timeTags = source.load.time_tags
60 if (timeTags) Object.keys(timeTags).forEach(key => measurementTagSet.push(`${key}=${tagMoment.format(timeTags[key])}`))
61
62 const time = source.timeEditor ? source.timeEditor.edit(recordDate).valueOf() : recordDate.valueOf()
63
64 const buf = Buffer.from(`${measurementTagSet.join(',')} ${fieldSet.join(',')} ${time}\n`)
65
66 const requestOpts = {
67 body: buf,
68 method: 'POST',
69 qs: {
70 db: source.load.database,
71 precision: 'ms'
72 },
73 url: `${this.influxUrl}/write`
74 }
75
76 request(requestOpts, (err, response) => {
77 if (err) {
78 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
79 } else if (response.statusCode !== 204) {
80 this.log.error(`Mach [${m.key}] Rec [${recNbr}: Non-success status code ${response.statusCode}`)
81 } else {
82 this.client.ack().catch(err2 => {
83 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err2.message}`)
84 })
85 }
86 })
87 } catch (err) {
88 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
89 }
90}
91
92export default {
93 client: {
94 guard (m) {
95 return !m.clientError && !m.private.client
96 },
97 execute (m) {
98 return new ldmp.LDMPClient(m.$app.get('clients').ldmp)
99 },
100 assign (m, res) {
101 const log = m.$app.logger
102
103 log.info(`Mach [${m.key}]: Client ready`)
104
105 m.private.client = res
106 m.private.client.on('record', handleRecord.bind({
107 client: res,
108 influxUrl: m.$app.get('apis').influxDB.url,
109 log: m.$app.logger,
110 model: m
111 }))
112 }
113 },
114
115 connect: require('./tasks/connect').default,
116
117 connectReset: require('./tasks/connectReset').default,
118
119 database: {
120 guard (m) {
121 return !m.databaseError &&
122 (m.sourcesStateAt === m.stateAt) &&
123 (m.databaseStateAt !== m.stateAt)
124 },
125 execute (m) {
126 const log = m.$app.logger
127 const influxUrl = m.$app.get('apis').influxDB.url
128 const databases = [...new Set(Object.keys(m.sources).map(key => m.sources[key].load.database))]
129 const requestOpts = {
130 method: 'POST',
131 qs: {
132 q: databases.map(db => `CREATE DATABASE "${db}"`).join(';')
133 },
134 url: `${influxUrl}/query`
135 }
136
137 log.info(`Mach [${m.key}]: Creating database(s): ${databases.join(', ')}`)
138
139 return new Promise((resolve, reject) => {
140 request(requestOpts, (err, response) => err ? reject(err) : resolve(response))
141 }).then(response => {
142 if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`)
143
144 return true
145 }).catch(err => {
146 log.error(`Mach [${m.key}]: ${err.message}`)
147 throw err
148 })
149 },
150 assign (m) {
151 m.databaseStateAt = m.stateAt
152 }
153 },
154
155 disconnect: require('./tasks/disconnect').default,
156
157 sources: {
158 guard (m) {
159 return !m.sourcesError &&
160 m.state.sources && (m.state.sources.length > 0) &&
161 (m.sourcesStateAt !== m.stateAt)
162 },
163 execute () { return true },
164 assign (m) {
165 const log = m.$app.logger
166
167 log.info(`Mach [${m.key}]: Sources ready`)
168
169 m.sources = m.state.sources.reduce((sources, src) => {
170 if (src.station && src.table && src.load && src.load.database && src.load.measurement) {
171 const sourceKey = `${src.station} ${src.table}`
172 const source = sources[sourceKey] = Object.assign({}, src)
173
174 // Prepare the leftmost parts of the Line Protocol string for loading
175 const measurementTagSet = source.measurementTagSet = [source.load.measurement]
176
177 // Allow for static tags to be specified for every point
178 const tags = source.load.tags
179 if (tags) Object.keys(tags).forEach(key => measurementTagSet.push(`${key}=${tags[key]}`))
180
181 const transform = source.transform
182 if (transform) {
183 // Create MomentEditor instances for adjusting timestamps
184 if (transform.time_edit) source.timeEditor = new MomentEditor(transform.time_edit)
185 if (transform.reverse_time_edit) source.reverseTimeEditor = new MomentEditor(transform.reverse_time_edit)
186 if (transform.tag_time_edit) source.tagTimeEditor = new MomentEditor(transform.tag_time_edit)
187 }
188 }
189
190 return sources
191 }, {})
192
193 m.sourcesStateAt = m.stateAt
194 }
195 },
196
197 specs: {
198 guard (m) {
199 return !m.specsError &&
200 (m.databaseStateAt === m.stateAt) &&
201 (m.specsStateAt !== m.stateAt)
202 },
203 async execute (m) {
204 const influxUrl = m.$app.get('apis').influxDB.url
205 const specs = []
206
207 for (let sourceKey of Object.keys(m.sources)) {
208 const {options, station, table, load, reverseTimeEditor} = m.sources[sourceKey]
209 const spec = Object.assign({
210 station,
211 table
212 }, options)
213
214 if (!spec.start_option) {
215 const requestOpts = {
216 method: 'POST',
217 qs: {
218 db: load.database,
219 q: `SELECT * FROM "${load.measurement}" ORDER BY time DESC LIMIT 1`
220 },
221 url: `${influxUrl}/query`
222 }
223 const response = await new Promise((resolve, reject) => {
224 request(requestOpts, (err, resp) => err ? reject(err) : resolve(resp))
225 })
226
227 if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`)
228
229 const body = JSON.parse(response.body)
230
231 try {
232 const recordDate = moment(body.results[0].series[0].values[0][0]).utc()
233 const timeStamp = reverseTimeEditor ? reverseTimeEditor.edit(recordDate) : recordDate
234 spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS')
235 spec.start_option = 'at-time'
236 } catch (e) {
237 spec.start_option = 'at-oldest'
238 }
239 }
240
241 specs.push(spec)
242 }
243
244 return specs
245 },
246 assign (m, res) {
247 const log = m.$app.logger
248
249 log.info(`Mach [${m.key}]: Specs ready`)
250
251 m.specs = res
252 m.specsStateAt = m.stateAt
253 }
254 },
255
256 specify: require('./tasks/specify').default,
257
258 stateAt: require('./tasks/stateAt').default
259}