1 | 'use strict';
|
2 |
|
3 | Object.defineProperty(exports, "__esModule", {
|
4 | value: true
|
5 | });
|
6 |
|
7 | function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
|
8 |
|
9 | const ldmp = require('@dendra-science/csi-ldmp-client');
|
10 | const moment = require('moment');
|
11 | const request = require('request');
|
12 | const { MomentEditor } = require('@dendra-science/utils-moment');
|
13 |
|
14 | function handleRecord(rec) {
|
15 | if (!rec) return;
|
16 |
|
17 | const m = this.model;
|
18 | const recNbr = rec.recordNumber;
|
19 |
|
20 | try {
|
21 |
|
22 |
|
23 |
|
24 | if (typeof recNbr === 'undefined') throw new Error('Record number undefined');
|
25 |
|
26 | if (m.specifyStateAt !== m.stateAt) {
|
27 | this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`);
|
28 | return;
|
29 | }
|
30 |
|
31 | const recordDate = moment(rec.timeString).utcOffset(0, true).utc();
|
32 |
|
33 | if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format');
|
34 |
|
35 | const sourceKey = `${rec.station} ${rec.table}`;
|
36 | const source = m.sources[sourceKey];
|
37 |
|
38 | if (!source) throw new Error(`No source found for '${sourceKey}'`);
|
39 |
|
40 |
|
41 |
|
42 | |
43 |
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 | const row = Object.assign({}, source.load.fields, rec.fields && rec.fields.reduce((obj, field) => {
|
52 | if (field.name) obj[field.name.replace(/\W+/g, '_')] = field.value;
|
53 | return obj;
|
54 | }, {}));
|
55 | const fieldSet = Object.keys(row).filter(key => {
|
56 | return typeof row[key] === 'number' && !Number.isNaN(row[key]);
|
57 | }).map(key => {
|
58 | return `${key}=${row[key]}`;
|
59 | });
|
60 |
|
61 | if (fieldSet.length === 0) throw new Error('Nothing to write');
|
62 |
|
63 | const measurementTagSet = [...source.measurementTagSet];
|
64 |
|
65 |
|
66 | const tagMoment = source.tagTimeEditor ? source.tagTimeEditor.edit(recordDate) : recordDate;
|
67 | const timeTags = source.load.time_tags;
|
68 | if (timeTags) Object.keys(timeTags).forEach(key => measurementTagSet.push(`${key}=${tagMoment.format(timeTags[key])}`));
|
69 |
|
70 | const time = source.timeEditor ? source.timeEditor.edit(recordDate).valueOf() : recordDate.valueOf();
|
71 |
|
72 | const buf = Buffer.from(`${measurementTagSet.join(',')} ${fieldSet.join(',')} ${time}\n`);
|
73 |
|
74 | const requestOpts = {
|
75 | body: buf,
|
76 | method: 'POST',
|
77 | qs: {
|
78 | db: source.load.database,
|
79 | precision: 'ms'
|
80 | },
|
81 | url: `${this.influxUrl}/write`
|
82 | };
|
83 |
|
84 | request(requestOpts, (err, response) => {
|
85 | if (err) {
|
86 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
|
87 | } else if (response.statusCode !== 204) {
|
88 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: Non-success status code ${response.statusCode}`);
|
89 | } else {
|
90 | this.client.ack().catch(err2 => {
|
91 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err2.message}`);
|
92 | });
|
93 | }
|
94 | });
|
95 | } catch (err) {
|
96 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
|
97 | }
|
98 | }
|
99 |
|
100 | exports.default = {
|
101 | client: {
|
102 | guard(m) {
|
103 | return !m.clientError && !m.private.client;
|
104 | },
|
105 | execute(m) {
|
106 | return new ldmp.LDMPClient(m.$app.get('clients').ldmp);
|
107 | },
|
108 | assign(m, res) {
|
109 | const log = m.$app.logger;
|
110 |
|
111 | log.info(`Mach [${m.key}]: Client ready`);
|
112 |
|
113 | m.private.client = res;
|
114 | m.private.client.on('record', handleRecord.bind({
|
115 | client: res,
|
116 | influxUrl: m.$app.get('apis').influxDB.url,
|
117 | log: m.$app.logger,
|
118 | model: m
|
119 | }));
|
120 | }
|
121 | },
|
122 |
|
123 | connect: require('./tasks/connect').default,
|
124 |
|
125 | connectReset: require('./tasks/connectReset').default,
|
126 |
|
127 | database: {
|
128 | guard(m) {
|
129 | return !m.databaseError && m.sourcesStateAt === m.stateAt && m.databaseStateAt !== m.stateAt;
|
130 | },
|
131 | execute(m) {
|
132 | const log = m.$app.logger;
|
133 | const influxUrl = m.$app.get('apis').influxDB.url;
|
134 | const databases = [...new Set(Object.keys(m.sources).map(key => m.sources[key].load.database))];
|
135 | const requestOpts = {
|
136 | method: 'POST',
|
137 | qs: {
|
138 | q: databases.map(db => `CREATE DATABASE "${db}"`).join(';')
|
139 | },
|
140 | url: `${influxUrl}/query`
|
141 | };
|
142 |
|
143 | log.info(`Mach [${m.key}]: Creating database(s): ${databases.join(', ')}`);
|
144 |
|
145 | return new Promise((resolve, reject) => {
|
146 | request(requestOpts, (err, response) => err ? reject(err) : resolve(response));
|
147 | }).then(response => {
|
148 | if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`);
|
149 |
|
150 | return true;
|
151 | }).catch(err => {
|
152 | log.error(`Mach [${m.key}]: ${err.message}`);
|
153 | throw err;
|
154 | });
|
155 | },
|
156 | assign(m) {
|
157 | m.databaseStateAt = m.stateAt;
|
158 | }
|
159 | },
|
160 |
|
161 | disconnect: require('./tasks/disconnect').default,
|
162 |
|
163 | sources: {
|
164 | guard(m) {
|
165 | return !m.sourcesError && m.state.sources && m.state.sources.length > 0 && m.sourcesStateAt !== m.stateAt;
|
166 | },
|
167 | execute() {
|
168 | return true;
|
169 | },
|
170 | assign(m) {
|
171 | const log = m.$app.logger;
|
172 |
|
173 | log.info(`Mach [${m.key}]: Sources ready`);
|
174 |
|
175 | m.sources = m.state.sources.reduce((sources, src) => {
|
176 | if (src.station && src.table && src.load && src.load.database && src.load.measurement) {
|
177 | const sourceKey = `${src.station} ${src.table}`;
|
178 | const source = sources[sourceKey] = Object.assign({}, src);
|
179 |
|
180 |
|
181 | const measurementTagSet = source.measurementTagSet = [source.load.measurement];
|
182 |
|
183 |
|
184 | const tags = source.load.tags;
|
185 | if (tags) Object.keys(tags).forEach(key => measurementTagSet.push(`${key}=${tags[key]}`));
|
186 |
|
187 | const transform = source.transform;
|
188 | if (transform) {
|
189 |
|
190 | if (transform.time_edit) source.timeEditor = new MomentEditor(transform.time_edit);
|
191 | if (transform.reverse_time_edit) source.reverseTimeEditor = new MomentEditor(transform.reverse_time_edit);
|
192 | if (transform.tag_time_edit) source.tagTimeEditor = new MomentEditor(transform.tag_time_edit);
|
193 | }
|
194 | }
|
195 |
|
196 | return sources;
|
197 | }, {});
|
198 |
|
199 | m.sourcesStateAt = m.stateAt;
|
200 | }
|
201 | },
|
202 |
|
203 | specs: {
|
204 | guard(m) {
|
205 | return !m.specsError && m.databaseStateAt === m.stateAt && m.specsStateAt !== m.stateAt;
|
206 | },
|
207 | execute(m) {
|
208 | return _asyncToGenerator(function* () {
|
209 | const influxUrl = m.$app.get('apis').influxDB.url;
|
210 | const specs = [];
|
211 |
|
212 | for (let sourceKey of Object.keys(m.sources)) {
|
213 | const { options, station, table, load, reverseTimeEditor } = m.sources[sourceKey];
|
214 | const spec = Object.assign({
|
215 | station,
|
216 | table
|
217 | }, options);
|
218 |
|
219 | if (!spec.start_option) {
|
220 | const requestOpts = {
|
221 | method: 'POST',
|
222 | qs: {
|
223 | db: load.database,
|
224 | q: `SELECT * FROM "${load.measurement}" ORDER BY time DESC LIMIT 1`
|
225 | },
|
226 | url: `${influxUrl}/query`
|
227 | };
|
228 | const response = yield new Promise(function (resolve, reject) {
|
229 | request(requestOpts, function (err, resp) {
|
230 | return err ? reject(err) : resolve(resp);
|
231 | });
|
232 | });
|
233 |
|
234 | if (response.statusCode !== 200) throw new Error(`Non-success status code ${response.statusCode}`);
|
235 |
|
236 | const body = JSON.parse(response.body);
|
237 |
|
238 | try {
|
239 | const recordDate = moment(body.results[0].series[0].values[0][0]).utc();
|
240 | const timeStamp = reverseTimeEditor ? reverseTimeEditor.edit(recordDate) : recordDate;
|
241 | spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS');
|
242 | spec.start_option = 'at-time';
|
243 | } catch (e) {
|
244 | spec.start_option = 'at-oldest';
|
245 | }
|
246 | }
|
247 |
|
248 | specs.push(spec);
|
249 | }
|
250 |
|
251 | return specs;
|
252 | })();
|
253 | },
|
254 | assign(m, res) {
|
255 | const log = m.$app.logger;
|
256 |
|
257 | log.info(`Mach [${m.key}]: Specs ready`);
|
258 |
|
259 | m.specs = res;
|
260 | m.specsStateAt = m.stateAt;
|
261 | }
|
262 | },
|
263 |
|
264 | specify: require('./tasks/specify').default,
|
265 |
|
266 | stateAt: require('./tasks/stateAt').default
|
267 | }; |
\ | No newline at end of file |