UNPKG

4.47 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6const ldmp = require('@dendra-science/csi-ldmp-client');
7const moment = require('moment');
8const { MomentEditor } = require('@dendra-science/utils-moment');
9
10function handleRecord(rec) {
11 if (!rec) return;
12
13 const m = this.model;
14 const recNbr = rec.recordNumber;
15
16 try {
17 //
18 // Begin standard record validation
19 // TODO: Move to helper
20 if (typeof recNbr === 'undefined') throw new Error('Record number undefined');
21
22 if (m.specifyStateAt !== m.stateAt) {
23 this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`);
24 return;
25 }
26
27 const recordDate = moment(rec.timeString).utcOffset(0, true).utc();
28
29 if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format');
30
31 const sourceKey = `${rec.station} ${rec.table}`;
32 const source = m.sources[sourceKey];
33
34 if (!source) throw new Error(`No source found for '${sourceKey}'`);
35 // End standard record validation
36 //
37
38 const archiveDate = source.timeEditor ? source.timeEditor.edit(recordDate) : recordDate;
39 const id = `csi-${rec.station}-${rec.table}-${archiveDate.format('YYYY-MM-DD-HH-mm')}`;
40
41 this.documentService.create({
42 _id: id,
43 content: rec
44 }).then(() => {
45 return this.client.ack();
46 }).then(() => {
47 if (!m.stamps) m.stamps = {};
48 m.stamps[sourceKey] = recordDate.valueOf();
49 }).catch(err => {
50 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
51 });
52 } catch (err) {
53 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
54 }
55}
56
57exports.default = {
58 client: {
59 guard(m) {
60 return !m.clientError && !m.private.client;
61 },
62 execute(m) {
63 return new ldmp.LDMPClient(m.$app.get('clients').ldmp);
64 },
65 assign(m, res) {
66 const log = m.$app.logger;
67
68 log.info(`Mach [${m.key}]: Client ready`);
69
70 m.private.client = res;
71 m.private.client.on('record', handleRecord.bind({
72 client: res,
73 documentService: m.$app.get('connections').jsonArchive.app.service('/documents'),
74 log: m.$app.logger,
75 model: m
76 }));
77 }
78 },
79
80 connect: require('./tasks/connect').default,
81
82 connectReset: require('./tasks/connectReset').default,
83
84 disconnect: require('./tasks/disconnect').default,
85
86 sources: {
87 guard(m) {
88 return !m.sourcesError && m.state.sources && m.state.sources.length > 0 && m.sourcesStateAt !== m.stateAt;
89 },
90 execute() {
91 return true;
92 },
93 assign(m) {
94 const log = m.$app.logger;
95
96 log.info(`Mach [${m.key}]: Sources ready`);
97
98 m.sources = m.state.sources.reduce((sources, src) => {
99 if (src.station && src.table) {
100 const sourceKey = `${src.station} ${src.table}`;
101 const source = sources[sourceKey] = Object.assign({}, src);
102
103 if (source.transform && source.transform.time_edit) {
104 // Create a MomentEditor instance for adjusting timestamps
105 source.timeEditor = new MomentEditor(source.transform.time_edit);
106 }
107 }
108
109 return sources;
110 }, {});
111
112 m.sourcesStateAt = m.stateAt;
113 }
114 },
115
116 specs: {
117 guard(m) {
118 return !m.specsError && m.sourcesStateAt === m.stateAt && m.specsStateAt !== m.stateAt;
119 },
120 execute(m) {
121 const specs = [];
122
123 for (let sourceKey of Object.keys(m.sources)) {
124 const { options, station, table } = m.sources[sourceKey];
125 const spec = Object.assign({
126 station,
127 table
128 }, options);
129
130 if (!spec.start_option) {
131 if (m.state.stamps && m.state.stamps[sourceKey]) {
132 const timeStamp = moment(m.state.stamps[sourceKey]).utc();
133 spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS');
134 spec.start_option = 'at-time';
135 } else {
136 spec.start_option = 'at-oldest';
137 }
138 }
139
140 specs.push(spec);
141 }
142
143 return specs;
144 },
145 assign(m, res) {
146 const log = m.$app.logger;
147
148 log.info(`Mach [${m.key}]: Specs ready`);
149
150 m.specs = res;
151 m.specsStateAt = m.stateAt;
152 }
153 },
154
155 specify: require('./tasks/specify').default,
156
157 stateAt: require('./tasks/stateAt').default,
158
159 stateStamps: {
160 guard(m) {
161 return !m.stateStampsReady;
162 },
163 execute() {
164 return true;
165 },
166 assign(m) {
167 m.state.stamps = Object.assign({}, m.stamps);
168 }
169 }
170};
\No newline at end of file