1 | const ldmp = require('@dendra-science/csi-ldmp-client')
|
2 | const moment = require('moment')
|
3 | const {MomentEditor} = require('@dendra-science/utils-moment')
|
4 |
|
5 | function handleRecord (rec) {
|
6 | if (!rec) return
|
7 |
|
8 | const m = this.model
|
9 | const recNbr = rec.recordNumber
|
10 |
|
11 | try {
|
12 |
|
13 |
|
14 |
|
15 | if (typeof recNbr === 'undefined') throw new Error('Record number undefined')
|
16 |
|
17 | if (m.specifyStateAt !== m.stateAt) {
|
18 | this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`)
|
19 | return
|
20 | }
|
21 |
|
22 | const recordDate = moment(rec.timeString).utcOffset(0, true).utc()
|
23 |
|
24 | if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format')
|
25 |
|
26 | const sourceKey = `${rec.station} ${rec.table}`
|
27 | const source = m.sources[sourceKey]
|
28 |
|
29 | if (!source) throw new Error(`No source found for '${sourceKey}'`)
|
30 |
|
31 |
|
32 |
|
33 | const archiveDate = source.timeEditor ? source.timeEditor.edit(recordDate) : recordDate
|
34 | const id = `csi-${rec.station}-${rec.table}-${archiveDate.format('YYYY-MM-DD-HH-mm')}`
|
35 |
|
36 | this.documentService.create({
|
37 | _id: id,
|
38 | content: rec
|
39 | }).then(() => {
|
40 | return this.client.ack()
|
41 | }).then(() => {
|
42 | if (!m.stamps) m.stamps = {}
|
43 | m.stamps[sourceKey] = recordDate.valueOf()
|
44 | }).catch(err => {
|
45 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
|
46 | })
|
47 | } catch (err) {
|
48 | this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
|
49 | }
|
50 | }
|
51 |
|
52 | export default {
|
53 | client: {
|
54 | guard (m) {
|
55 | return !m.clientError && !m.private.client
|
56 | },
|
57 | execute (m) {
|
58 | return new ldmp.LDMPClient(m.$app.get('clients').ldmp)
|
59 | },
|
60 | assign (m, res) {
|
61 | const log = m.$app.logger
|
62 |
|
63 | log.info(`Mach [${m.key}]: Client ready`)
|
64 |
|
65 | m.private.client = res
|
66 | m.private.client.on('record', handleRecord.bind({
|
67 | client: res,
|
68 | documentService: m.$app.get('connections').jsonArchive.app.service('/documents'),
|
69 | log: m.$app.logger,
|
70 | model: m
|
71 | }))
|
72 | }
|
73 | },
|
74 |
|
75 | connect: require('./tasks/connect').default,
|
76 |
|
77 | connectReset: require('./tasks/connectReset').default,
|
78 |
|
79 | disconnect: require('./tasks/disconnect').default,
|
80 |
|
81 | sources: {
|
82 | guard (m) {
|
83 | return !m.sourcesError &&
|
84 | m.state.sources && (m.state.sources.length > 0) &&
|
85 | (m.sourcesStateAt !== m.stateAt)
|
86 | },
|
87 | execute () { return true },
|
88 | assign (m) {
|
89 | const log = m.$app.logger
|
90 |
|
91 | log.info(`Mach [${m.key}]: Sources ready`)
|
92 |
|
93 | m.sources = m.state.sources.reduce((sources, src) => {
|
94 | if (src.station && src.table) {
|
95 | const sourceKey = `${src.station} ${src.table}`
|
96 | const source = sources[sourceKey] = Object.assign({}, src)
|
97 |
|
98 | if (source.transform && source.transform.time_edit) {
|
99 |
|
100 | source.timeEditor = new MomentEditor(source.transform.time_edit)
|
101 | }
|
102 | }
|
103 |
|
104 | return sources
|
105 | }, {})
|
106 |
|
107 | m.sourcesStateAt = m.stateAt
|
108 | }
|
109 | },
|
110 |
|
111 | specs: {
|
112 | guard (m) {
|
113 | return !m.specsError &&
|
114 | (m.sourcesStateAt === m.stateAt) &&
|
115 | (m.specsStateAt !== m.stateAt)
|
116 | },
|
117 | execute (m) {
|
118 | const specs = []
|
119 |
|
120 | for (let sourceKey of Object.keys(m.sources)) {
|
121 | const {options, station, table} = m.sources[sourceKey]
|
122 | const spec = Object.assign({
|
123 | station,
|
124 | table
|
125 | }, options)
|
126 |
|
127 | if (!spec.start_option) {
|
128 | if (m.state.stamps && m.state.stamps[sourceKey]) {
|
129 | const timeStamp = moment(m.state.stamps[sourceKey]).utc()
|
130 | spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS')
|
131 | spec.start_option = 'at-time'
|
132 | } else {
|
133 | spec.start_option = 'at-oldest'
|
134 | }
|
135 | }
|
136 |
|
137 | specs.push(spec)
|
138 | }
|
139 |
|
140 | return specs
|
141 | },
|
142 | assign (m, res) {
|
143 | const log = m.$app.logger
|
144 |
|
145 | log.info(`Mach [${m.key}]: Specs ready`)
|
146 |
|
147 | m.specs = res
|
148 | m.specsStateAt = m.stateAt
|
149 | }
|
150 | },
|
151 |
|
152 | specify: require('./tasks/specify').default,
|
153 |
|
154 | stateAt: require('./tasks/stateAt').default,
|
155 |
|
156 | stateStamps: {
|
157 | guard (m) {
|
158 | return !m.stateStampsReady
|
159 | },
|
160 | execute () { return true },
|
161 | assign (m) {
|
162 | m.state.stamps = Object.assign({}, m.stamps)
|
163 | }
|
164 | }
|
165 | }
|