1 | const ldmp = require('@dendra-science/csi-ldmp-client')
|
2 | const moment = require('moment')
|
3 | const request = require('request')
|
4 | const {MomentEditor} = require('@dendra-science/utils-moment')
|
5 |
|
6 | function handleRecord (rec) {
|
7 | if (!rec) return
|
8 |
|
9 | const m = this.model
|
10 | const recNbr = rec.recordNumber
|
11 |
|
12 | try {
|
13 |
|
14 |
|
15 |
|
16 | if (typeof recNbr === 'undefined') throw new Error('Record number undefined')
|
17 |
|
18 | if (m.specifyStateAt !== m.stateAt) {
|
19 | this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`)
|
20 | return
|
21 | }
|
22 |
|
23 | const recordDate = moment(rec.timeString).utcOffset(0, true).utc()
|
24 |
|
25 | if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format')
|
26 |
|
27 | const sourceKey = `${rec.station} ${rec.table}`
|
28 | const source = m.sources[sourceKey]
|
29 |
|
30 | if (!source) throw new Error(`No source found for '${sourceKey}'`)
|
31 |
|
32 |
|
33 |
|
34 | |
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 | const row = Object.assign({}, source.load.fields, rec.fields && rec.fields.reduce((obj, field) => {
|
44 | if (field.name) obj[field.name.replace(/\W+/g, '_')] = field.value
|
45 | return obj
|
46 | }, {}))
|
47 | const fieldSet = Object.keys(row).filter(key => {
|
48 | return (typeof row[key] === 'number') && !Number.isNaN(row[key])
|
49 | }).map(key => {
|
50 | return `${key}=${row[key]}`
|
51 | })
|
52 |
|
53 | if (fieldSet.length === 0) throw new Error('Nothing to write')
|
54 |
|
55 | const measurementTagSet = [...source.measurementTagSet]
|
56 |
|
57 |
|
58 | const tagMoment = source.tagTimeEditor ? source.tagTimeEditor.edit(recordDate) : recordDate
|
59 | const timeTags = source.load.time_tags
|
60 | if (timeTags) Object.keys(timeTags).forEach(key => measurementTagSet.push(`${key}=${tagMoment.format(timeTags[key])}`))
|
61 |
|
62 | const time = source.timeEditor ? source.timeEditor.edit(recordDate).valueOf() : recordDate.valueOf()
|
63 |
|
64 | const buf = Buffer.from(`${measurementTagSet.join(',')} ${fieldSet.join(',')} ${time}\n`)
|
65 |
|
66 | const requestOpts = {
|
67 | body: buf,
|
68 | method: 'POST',
|
69 | qs: {
|
70 | db: source.load.database,
|
71 | precision: 'ms'
|
72 | },
|
73 | url: `${this.influxUrl}/write`
|
74 | }
|
75 |
|
76 | request(requestOpts, (err, response) => {
|
77 | if (err) {
|
78 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
|
79 | } else if (response.statusCode !== 204) {
|
80 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: Non-success status code ${response.statusCode}`)
|
81 | } else {
|
82 | this.client.ack().catch(err2 => {
|
83 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err2.message}`)
|
84 | })
|
85 | }
|
86 | })
|
87 | } catch (err) {
|
88 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
|
89 | }
|
90 | }
|
91 |
|
92 | export default {
|
93 | client: {
|
94 | guard (m) {
|
95 | return !m.clientError && !m.private.client
|
96 | },
|
97 | execute (m) {
|
98 | return new ldmp.LDMPClient(m.$app.get('clients').ldmp)
|
99 | },
|
100 | assign (m, res) {
|
101 | const log = m.$app.logger
|
102 |
|
103 | log.info(`Mach [${m.key}]: Client ready`)
|
104 |
|
105 | m.private.client = res
|
106 | m.private.client.on('record', handleRecord.bind({
|
107 | client: res,
|
108 | influxUrl: m.$app.get('apis').influxDB.url,
|
109 | log: m.$app.logger,
|
110 | model: m
|
111 | }))
|
112 | }
|
113 | },
|
114 |
|
115 | connect: require('./tasks/connect').default,
|
116 |
|
117 | connectReset: require('./tasks/connectReset').default,
|
118 |
|
119 | database: {
|
120 | guard (m) {
|
121 | return !m.databaseError &&
|
122 | (m.sourcesStateAt === m.stateAt) &&
|
123 | (m.databaseStateAt !== m.stateAt)
|
124 | },
|
125 | execute (m) {
|
126 | const log = m.$app.logger
|
127 | const influxUrl = m.$app.get('apis').influxDB.url
|
128 | const databases = [...new Set(Object.keys(m.sources).map(key => m.sources[key].load.database))]
|
129 | const requestOpts = {
|
130 | method: 'POST',
|
131 | qs: {
|
132 | q: databases.map(db => `CREATE DATABASE "${db}"`).join(';')
|
133 | },
|
134 | url: `${influxUrl}/query`
|
135 | }
|
136 |
|
137 | log.info(`Mach [${m.key}]: Creating database(s): ${databases.join(', ')}`)
|
138 |
|
139 | return new Promise((resolve, reject) => {
|
140 | request(requestOpts, (err, response) => err ? reject(err) : resolve(response))
|
141 | }).then(response => {
|
142 | if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`)
|
143 |
|
144 | return true
|
145 | }).catch(err => {
|
146 | log.error(`Mach [${m.key}]: ${err.message}`)
|
147 | throw err
|
148 | })
|
149 | },
|
150 | assign (m) {
|
151 | m.databaseStateAt = m.stateAt
|
152 | }
|
153 | },
|
154 |
|
155 | disconnect: require('./tasks/disconnect').default,
|
156 |
|
157 | sources: {
|
158 | guard (m) {
|
159 | return !m.sourcesError &&
|
160 | m.state.sources && (m.state.sources.length > 0) &&
|
161 | (m.sourcesStateAt !== m.stateAt)
|
162 | },
|
163 | execute () { return true },
|
164 | assign (m) {
|
165 | const log = m.$app.logger
|
166 |
|
167 | log.info(`Mach [${m.key}]: Sources ready`)
|
168 |
|
169 | m.sources = m.state.sources.reduce((sources, src) => {
|
170 | if (src.station && src.table && src.load && src.load.database && src.load.measurement) {
|
171 | const sourceKey = `${src.station} ${src.table}`
|
172 | const source = sources[sourceKey] = Object.assign({}, src)
|
173 |
|
174 |
|
175 | const measurementTagSet = source.measurementTagSet = [source.load.measurement]
|
176 |
|
177 |
|
178 | const tags = source.load.tags
|
179 | if (tags) Object.keys(tags).forEach(key => measurementTagSet.push(`${key}=${tags[key]}`))
|
180 |
|
181 | const transform = source.transform
|
182 | if (transform) {
|
183 |
|
184 | if (transform.time_edit) source.timeEditor = new MomentEditor(transform.time_edit)
|
185 | if (transform.reverse_time_edit) source.reverseTimeEditor = new MomentEditor(transform.reverse_time_edit)
|
186 | if (transform.tag_time_edit) source.tagTimeEditor = new MomentEditor(transform.tag_time_edit)
|
187 | }
|
188 | }
|
189 |
|
190 | return sources
|
191 | }, {})
|
192 |
|
193 | m.sourcesStateAt = m.stateAt
|
194 | }
|
195 | },
|
196 |
|
197 | specs: {
|
198 | guard (m) {
|
199 | return !m.specsError &&
|
200 | (m.databaseStateAt === m.stateAt) &&
|
201 | (m.specsStateAt !== m.stateAt)
|
202 | },
|
203 | async execute (m) {
|
204 | const influxUrl = m.$app.get('apis').influxDB.url
|
205 | const specs = []
|
206 |
|
207 | for (let sourceKey of Object.keys(m.sources)) {
|
208 | const {options, station, table, load, reverseTimeEditor} = m.sources[sourceKey]
|
209 | const spec = Object.assign({
|
210 | station,
|
211 | table
|
212 | }, options)
|
213 |
|
214 | if (!spec.start_option) {
|
215 | const requestOpts = {
|
216 | method: 'POST',
|
217 | qs: {
|
218 | db: load.database,
|
219 | q: `SELECT * FROM "${load.measurement}" ORDER BY time DESC LIMIT 1`
|
220 | },
|
221 | url: `${influxUrl}/query`
|
222 | }
|
223 | const response = await new Promise((resolve, reject) => {
|
224 | request(requestOpts, (err, resp) => err ? reject(err) : resolve(resp))
|
225 | })
|
226 |
|
227 | if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`)
|
228 |
|
229 | const body = JSON.parse(response.body)
|
230 |
|
231 | try {
|
232 | const recordDate = moment(body.results[0].series[0].values[0][0]).utc()
|
233 | const timeStamp = reverseTimeEditor ? reverseTimeEditor.edit(recordDate) : recordDate
|
234 | spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS')
|
235 | spec.start_option = 'at-time'
|
236 | } catch (e) {
|
237 | spec.start_option = 'at-oldest'
|
238 | }
|
239 | }
|
240 |
|
241 | specs.push(spec)
|
242 | }
|
243 |
|
244 | return specs
|
245 | },
|
246 | assign (m, res) {
|
247 | const log = m.$app.logger
|
248 |
|
249 | log.info(`Mach [${m.key}]: Specs ready`)
|
250 |
|
251 | m.specs = res
|
252 | m.specsStateAt = m.stateAt
|
253 | }
|
254 | },
|
255 |
|
256 | specify: require('./tasks/specify').default,
|
257 |
|
258 | stateAt: require('./tasks/stateAt').default
|
259 | }
|