1 | 'use strict';
|
2 |
|
3 | Object.defineProperty(exports, "__esModule", {
|
4 | value: true
|
5 | });
|
6 | const ldmp = require('@dendra-science/csi-ldmp-client');
|
7 | const moment = require('moment');
|
8 | const { MomentEditor } = require('@dendra-science/utils-moment');
|
9 |
|
10 | function handleRecord(rec) {
|
11 | if (!rec) return;
|
12 |
|
13 | const m = this.model;
|
14 | const recNbr = rec.recordNumber;
|
15 |
|
16 | try {
|
17 |
|
18 |
|
19 |
|
20 | if (typeof recNbr === 'undefined') throw new Error('Record number undefined');
|
21 |
|
22 | if (m.specifyStateAt !== m.stateAt) {
|
23 | this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`);
|
24 | return;
|
25 | }
|
26 |
|
27 | const recordDate = moment(rec.timeString).utcOffset(0, true).utc();
|
28 |
|
29 | if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format');
|
30 |
|
31 | const sourceKey = `${rec.station} ${rec.table}`;
|
32 | const source = m.sources[sourceKey];
|
33 |
|
34 | if (!source) throw new Error(`No source found for '${sourceKey}'`);
|
35 |
|
36 |
|
37 |
|
38 | const archiveDate = source.timeEditor ? source.timeEditor.edit(recordDate) : recordDate;
|
39 | const id = `csi-${rec.station}-${rec.table}-${archiveDate.format('YYYY-MM-DD-HH-mm')}`;
|
40 |
|
41 | this.documentService.create({
|
42 | _id: id,
|
43 | content: rec
|
44 | }).then(() => {
|
45 | return this.client.ack();
|
46 | }).then(() => {
|
47 | if (!m.stamps) m.stamps = {};
|
48 | m.stamps[sourceKey] = recordDate.valueOf();
|
49 | }).catch(err => {
|
50 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
|
51 | });
|
52 | } catch (err) {
|
53 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`);
|
54 | }
|
55 | }
|
56 |
|
57 | exports.default = {
|
58 | client: {
|
59 | guard(m) {
|
60 | return !m.clientError && !m.private.client;
|
61 | },
|
62 | execute(m) {
|
63 | return new ldmp.LDMPClient(m.$app.get('clients').ldmp);
|
64 | },
|
65 | assign(m, res) {
|
66 | const log = m.$app.logger;
|
67 |
|
68 | log.info(`Mach [${m.key}]: Client ready`);
|
69 |
|
70 | m.private.client = res;
|
71 | m.private.client.on('record', handleRecord.bind({
|
72 | client: res,
|
73 | documentService: m.$app.get('connections').jsonArchive.app.service('/documents'),
|
74 | log: m.$app.logger,
|
75 | model: m
|
76 | }));
|
77 | }
|
78 | },
|
79 |
|
80 | connect: require('./tasks/connect').default,
|
81 |
|
82 | connectReset: require('./tasks/connectReset').default,
|
83 |
|
84 | disconnect: require('./tasks/disconnect').default,
|
85 |
|
86 | sources: {
|
87 | guard(m) {
|
88 | return !m.sourcesError && m.state.sources && m.state.sources.length > 0 && m.sourcesStateAt !== m.stateAt;
|
89 | },
|
90 | execute() {
|
91 | return true;
|
92 | },
|
93 | assign(m) {
|
94 | const log = m.$app.logger;
|
95 |
|
96 | log.info(`Mach [${m.key}]: Sources ready`);
|
97 |
|
98 | m.sources = m.state.sources.reduce((sources, src) => {
|
99 | if (src.station && src.table) {
|
100 | const sourceKey = `${src.station} ${src.table}`;
|
101 | const source = sources[sourceKey] = Object.assign({}, src);
|
102 |
|
103 | if (source.transform && source.transform.time_edit) {
|
104 |
|
105 | source.timeEditor = new MomentEditor(source.transform.time_edit);
|
106 | }
|
107 | }
|
108 |
|
109 | return sources;
|
110 | }, {});
|
111 |
|
112 | m.sourcesStateAt = m.stateAt;
|
113 | }
|
114 | },
|
115 |
|
116 | specs: {
|
117 | guard(m) {
|
118 | return !m.specsError && m.sourcesStateAt === m.stateAt && m.specsStateAt !== m.stateAt;
|
119 | },
|
120 | execute(m) {
|
121 | const specs = [];
|
122 |
|
123 | for (let sourceKey of Object.keys(m.sources)) {
|
124 | const { options, station, table } = m.sources[sourceKey];
|
125 | const spec = Object.assign({
|
126 | station,
|
127 | table
|
128 | }, options);
|
129 |
|
130 | if (!spec.start_option) {
|
131 | if (m.state.stamps && m.state.stamps[sourceKey]) {
|
132 | const timeStamp = moment(m.state.stamps[sourceKey]).utc();
|
133 | spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS');
|
134 | spec.start_option = 'at-time';
|
135 | } else {
|
136 | spec.start_option = 'at-oldest';
|
137 | }
|
138 | }
|
139 |
|
140 | specs.push(spec);
|
141 | }
|
142 |
|
143 | return specs;
|
144 | },
|
145 | assign(m, res) {
|
146 | const log = m.$app.logger;
|
147 |
|
148 | log.info(`Mach [${m.key}]: Specs ready`);
|
149 |
|
150 | m.specs = res;
|
151 | m.specsStateAt = m.stateAt;
|
152 | }
|
153 | },
|
154 |
|
155 | specify: require('./tasks/specify').default,
|
156 |
|
157 | stateAt: require('./tasks/stateAt').default,
|
158 |
|
159 | stateStamps: {
|
160 | guard(m) {
|
161 | return !m.stateStampsReady;
|
162 | },
|
163 | execute() {
|
164 | return true;
|
165 | },
|
166 | assign(m) {
|
167 | m.state.stamps = Object.assign({}, m.stamps);
|
168 | }
|
169 | }
|
170 | }; |
\ | No newline at end of file |