UNPKG

8.91 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
8
9const ldmp = require('@dendra-science/csi-ldmp-client');
10const moment = require('moment');
11const request = require('request');
12const { MomentEditor } = require('@dendra-science/utils-moment');
13
14function handleRecord(rec) {
15 if (!rec) return;
16
17 const m = this.model;
18 const recNbr = rec.recordNumber;
19
20 try {
21 //
22 // Begin standard record validation
23 // TODO: Move to helper
24 if (typeof recNbr === 'undefined') throw new Error('Record number undefined');
25
26 if (m.specifyStateAt !== m.stateAt) {
27 this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`);
28 return;
29 }
30
31 const recordDate = moment(rec.timeString).utcOffset(0, true).utc();
32
33 if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format');
34
35 const sourceKey = `${rec.station} ${rec.table}`;
36 const source = m.sources[sourceKey];
37
38 if (!source) throw new Error(`No source found for '${sourceKey}'`);
39 // End standard record validation
40 //
41
42 /*
43 Construct the following in order to write a point to InfluxDB:
44 1. The fieldSet
45 2. The measurementTagSet
46 3. The time (in milliseconds)
47 4. The Line Protocol buffer string (buf)
48 */
49
50 // Allow for static fields to be specified for every point
51 const row = Object.assign({}, source.load.fields, rec.fields && rec.fields.reduce((obj, field) => {
52 if (field.name) obj[field.name.replace(/\W+/g, '_')] = field.value;
53 return obj;
54 }, {}));
55 const fieldSet = Object.keys(row).filter(key => {
56 return typeof row[key] === 'number' && !Number.isNaN(row[key]);
57 }).map(key => {
58 return `${key}=${row[key]}`;
59 });
60
61 if (fieldSet.length === 0) throw new Error('Nothing to write');
62
63 const measurementTagSet = [...source.measurementTagSet];
64
65 // Allow for time-based tags to be specified for every point
66 const tagMoment = source.tagTimeEditor ? source.tagTimeEditor.edit(recordDate) : recordDate;
67 const timeTags = source.load.time_tags;
68 if (timeTags) Object.keys(timeTags).forEach(key => measurementTagSet.push(`${key}=${tagMoment.format(timeTags[key])}`));
69
70 const time = source.timeEditor ? source.timeEditor.edit(recordDate).valueOf() : recordDate.valueOf();
71
72 const buf = Buffer.from(`${measurementTagSet.join(',')} ${fieldSet.join(',')} ${time}\n`);
73
74 const requestOpts = {
75 body: buf,
76 method: 'POST',
77 qs: {
78 db: source.load.database,
79 precision: 'ms'
80 },
81 url: `${this.influxUrl}/write`
82 };
83
84 request(requestOpts, (err, response) => {
85 if (err) {
86 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
87 } else if (response.statusCode !== 204) {
88 this.log.error(`Mach [${m.key}] Rec [${recNbr}: Non-success status code ${response.statusCode}`);
89 } else {
90 this.client.ack().catch(err2 => {
91 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err2.message}`);
92 });
93 }
94 });
95 } catch (err) {
96 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
97 }
98}
99
100exports.default = {
101 client: {
102 guard(m) {
103 return !m.clientError && !m.private.client;
104 },
105 execute(m) {
106 return new ldmp.LDMPClient(m.$app.get('clients').ldmp);
107 },
108 assign(m, res) {
109 const log = m.$app.logger;
110
111 log.info(`Mach [${m.key}]: Client ready`);
112
113 m.private.client = res;
114 m.private.client.on('record', handleRecord.bind({
115 client: res,
116 influxUrl: m.$app.get('apis').influxDB.url,
117 log: m.$app.logger,
118 model: m
119 }));
120 }
121 },
122
123 connect: require('./tasks/connect').default,
124
125 connectReset: require('./tasks/connectReset').default,
126
127 database: {
128 guard(m) {
129 return !m.databaseError && m.sourcesStateAt === m.stateAt && m.databaseStateAt !== m.stateAt;
130 },
131 execute(m) {
132 const log = m.$app.logger;
133 const influxUrl = m.$app.get('apis').influxDB.url;
134 const databases = [...new Set(Object.keys(m.sources).map(key => m.sources[key].load.database))];
135 const requestOpts = {
136 method: 'POST',
137 qs: {
138 q: databases.map(db => `CREATE DATABASE "${db}"`).join(';')
139 },
140 url: `${influxUrl}/query`
141 };
142
143 log.info(`Mach [${m.key}]: Creating database(s): ${databases.join(', ')}`);
144
145 return new Promise((resolve, reject) => {
146 request(requestOpts, (err, response) => err ? reject(err) : resolve(response));
147 }).then(response => {
148 if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`);
149
150 return true;
151 }).catch(err => {
152 log.error(`Mach [${m.key}]: ${err.message}`);
153 throw err;
154 });
155 },
156 assign(m) {
157 m.databaseStateAt = m.stateAt;
158 }
159 },
160
161 disconnect: require('./tasks/disconnect').default,
162
163 sources: {
164 guard(m) {
165 return !m.sourcesError && m.state.sources && m.state.sources.length > 0 && m.sourcesStateAt !== m.stateAt;
166 },
167 execute() {
168 return true;
169 },
170 assign(m) {
171 const log = m.$app.logger;
172
173 log.info(`Mach [${m.key}]: Sources ready`);
174
175 m.sources = m.state.sources.reduce((sources, src) => {
176 if (src.station && src.table && src.load && src.load.database && src.load.measurement) {
177 const sourceKey = `${src.station} ${src.table}`;
178 const source = sources[sourceKey] = Object.assign({}, src);
179
180 // Prepare the leftmost parts of the Line Protocol string for loading
181 const measurementTagSet = source.measurementTagSet = [source.load.measurement];
182
183 // Allow for static tags to be specified for every point
184 const tags = source.load.tags;
185 if (tags) Object.keys(tags).forEach(key => measurementTagSet.push(`${key}=${tags[key]}`));
186
187 const transform = source.transform;
188 if (transform) {
189 // Create MomentEditor instances for adjusting timestamps
190 if (transform.time_edit) source.timeEditor = new MomentEditor(transform.time_edit);
191 if (transform.reverse_time_edit) source.reverseTimeEditor = new MomentEditor(transform.reverse_time_edit);
192 if (transform.tag_time_edit) source.tagTimeEditor = new MomentEditor(transform.tag_time_edit);
193 }
194 }
195
196 return sources;
197 }, {});
198
199 m.sourcesStateAt = m.stateAt;
200 }
201 },
202
203 specs: {
204 guard(m) {
205 return !m.specsError && m.databaseStateAt === m.stateAt && m.specsStateAt !== m.stateAt;
206 },
207 execute(m) {
208 return _asyncToGenerator(function* () {
209 const influxUrl = m.$app.get('apis').influxDB.url;
210 const specs = [];
211
212 for (let sourceKey of Object.keys(m.sources)) {
213 const { options, station, table, load, reverseTimeEditor } = m.sources[sourceKey];
214 const spec = Object.assign({
215 station,
216 table
217 }, options);
218
219 if (!spec.start_option) {
220 const requestOpts = {
221 method: 'POST',
222 qs: {
223 db: load.database,
224 q: `SELECT * FROM "${load.measurement}" ORDER BY time DESC LIMIT 1`
225 },
226 url: `${influxUrl}/query`
227 };
228 const response = yield new Promise(function (resolve, reject) {
229 request(requestOpts, function (err, resp) {
230 return err ? reject(err) : resolve(resp);
231 });
232 });
233
234 if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`);
235
236 const body = JSON.parse(response.body);
237
238 try {
239 const recordDate = moment(body.results[0].series[0].values[0][0]).utc();
240 const timeStamp = reverseTimeEditor ? reverseTimeEditor.edit(recordDate) : recordDate;
241 spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS');
242 spec.start_option = 'at-time';
243 } catch (e) {
244 spec.start_option = 'at-oldest';
245 }
246 }
247
248 specs.push(spec);
249 }
250
251 return specs;
252 })();
253 },
254 assign(m, res) {
255 const log = m.$app.logger;
256
257 log.info(`Mach [${m.key}]: Specs ready`);
258
259 m.specs = res;
260 m.specsStateAt = m.stateAt;
261 }
262 },
263
264 specify: require('./tasks/specify').default,
265
266 stateAt: require('./tasks/stateAt').default
267};
\No newline at end of file