UNPKG

4.36 kBJavaScriptView Raw
1const ldmp = require('@dendra-science/csi-ldmp-client')
2const moment = require('moment')
3const {MomentEditor} = require('@dendra-science/utils-moment')
4
5function handleRecord (rec) {
6 if (!rec) return
7
8 const m = this.model
9 const recNbr = rec.recordNumber
10
11 try {
12 //
13 // Begin standard record validation
14 // TODO: Move to helper
15 if (typeof recNbr === 'undefined') throw new Error('Record number undefined')
16
17 if (m.specifyStateAt !== m.stateAt) {
18 this.log.info(`Mach [${m.key}] Rec [${recNbr}: Deferring`)
19 return
20 }
21
22 const recordDate = moment(rec.timeString).utcOffset(0, true).utc()
23
24 if (!(recordDate && recordDate.isValid())) throw new Error('Invalid time format')
25
26 const sourceKey = `${rec.station} ${rec.table}`
27 const source = m.sources[sourceKey]
28
29 if (!source) throw new Error(`No source found for '${sourceKey}'`)
30 // End standard record validation
31 //
32
33 const archiveDate = source.timeEditor ? source.timeEditor.edit(recordDate) : recordDate
34 const id = `csi-${rec.station}-${rec.table}-${archiveDate.format('YYYY-MM-DD-HH-mm')}`
35
36 this.documentService.create({
37 _id: id,
38 content: rec
39 }).then(() => {
40 return this.client.ack()
41 }).then(() => {
42 if (!m.stamps) m.stamps = {}
43 m.stamps[sourceKey] = recordDate.valueOf()
44 }).catch(err => {
45 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
46 })
47 } catch (err) {
48 this.log.error(`Mach [${m.key}] Rec [${recNbr}: ${err.message}`)
49 }
50}
51
52export default {
53 client: {
54 guard (m) {
55 return !m.clientError && !m.private.client
56 },
57 execute (m) {
58 return new ldmp.LDMPClient(m.$app.get('clients').ldmp)
59 },
60 assign (m, res) {
61 const log = m.$app.logger
62
63 log.info(`Mach [${m.key}]: Client ready`)
64
65 m.private.client = res
66 m.private.client.on('record', handleRecord.bind({
67 client: res,
68 documentService: m.$app.get('connections').jsonArchive.app.service('/documents'),
69 log: m.$app.logger,
70 model: m
71 }))
72 }
73 },
74
75 connect: require('./tasks/connect').default,
76
77 connectReset: require('./tasks/connectReset').default,
78
79 disconnect: require('./tasks/disconnect').default,
80
81 sources: {
82 guard (m) {
83 return !m.sourcesError &&
84 m.state.sources && (m.state.sources.length > 0) &&
85 (m.sourcesStateAt !== m.stateAt)
86 },
87 execute () { return true },
88 assign (m) {
89 const log = m.$app.logger
90
91 log.info(`Mach [${m.key}]: Sources ready`)
92
93 m.sources = m.state.sources.reduce((sources, src) => {
94 if (src.station && src.table) {
95 const sourceKey = `${src.station} ${src.table}`
96 const source = sources[sourceKey] = Object.assign({}, src)
97
98 if (source.transform && source.transform.time_edit) {
99 // Create a MomentEditor instance for adjusting timestamps
100 source.timeEditor = new MomentEditor(source.transform.time_edit)
101 }
102 }
103
104 return sources
105 }, {})
106
107 m.sourcesStateAt = m.stateAt
108 }
109 },
110
111 specs: {
112 guard (m) {
113 return !m.specsError &&
114 (m.sourcesStateAt === m.stateAt) &&
115 (m.specsStateAt !== m.stateAt)
116 },
117 execute (m) {
118 const specs = []
119
120 for (let sourceKey of Object.keys(m.sources)) {
121 const {options, station, table} = m.sources[sourceKey]
122 const spec = Object.assign({
123 station,
124 table
125 }, options)
126
127 if (!spec.start_option) {
128 if (m.state.stamps && m.state.stamps[sourceKey]) {
129 const timeStamp = moment(m.state.stamps[sourceKey]).utc()
130 spec.time_stamp = timeStamp.format('YYYY MM DD HH:mm:ss.SS')
131 spec.start_option = 'at-time'
132 } else {
133 spec.start_option = 'at-oldest'
134 }
135 }
136
137 specs.push(spec)
138 }
139
140 return specs
141 },
142 assign (m, res) {
143 const log = m.$app.logger
144
145 log.info(`Mach [${m.key}]: Specs ready`)
146
147 m.specs = res
148 m.specsStateAt = m.stateAt
149 }
150 },
151
152 specify: require('./tasks/specify').default,
153
154 stateAt: require('./tasks/stateAt').default,
155
156 stateStamps: {
157 guard (m) {
158 return !m.stateStampsReady
159 },
160 execute () { return true },
161 assign (m) {
162 m.state.stamps = Object.assign({}, m.stamps)
163 }
164 }
165}