1 | _ = require 'lodash'
|
2 | path = require 'path'
|
3 | glob = require 'glob'
|
4 | MeshbluHTTP = require 'meshblu-http'
|
5 | moment = require 'moment'
|
6 | debug = require('debug')('slurry-core:configure-handler')
|
7 |
|
8 | THIRTY_SECONDS = 30 * 1000
|
9 |
|
10 | class ConfigureHandler
|
11 | constructor: ({ @slurrySpreader, @defaultConfiguration, @configurationsPath, @meshbluConfig }={}) ->
|
12 | throw new Error 'ConfigureHandler requires configurationsPath' unless @configurationsPath?
|
13 | throw new Error 'ConfigureHandler requires slurrySpreader' unless @slurrySpreader?
|
14 | @configurations = @_getConfigurations()
|
15 | @_slurryStreams = {}
|
16 | @slurrySpreader.on 'create', @_onSlurryCreate
|
17 | @slurrySpreader.on 'destroy', @_onSlurryDestroy
|
18 | @slurrySpreader.on 'onlineUntil', @_updateOnlineUntil
|
19 |
|
20 | configureSchema: (callback) =>
|
21 | callback null, @_configureSchemaFromConfigurations @configurations
|
22 |
|
23 | formSchema: (callback) =>
|
24 | callback null, @_formSchemaFromConfigurations @configurations
|
25 |
|
26 | onConfigure: ({auth, userDeviceUuid, encrypted, config}, callback) =>
|
27 | selectedConfiguration = config.schemas?.selected?.configure ? @defaultConfiguration ? 'Default'
|
28 | slurry = {
|
29 | auth
|
30 | selectedConfiguration
|
31 | encrypted
|
32 | config
|
33 | uuid: userDeviceUuid
|
34 | }
|
35 | return @slurrySpreader.remove(slurry, callback) if config.slurry?.disabled
|
36 | @slurrySpreader.add slurry, callback
|
37 |
|
38 | _configureSchemaFromConfigurations: (configurations) =>
|
39 | _.mapValues configurations, @_configureSchemaFromJob
|
40 |
|
41 | _configureSchemaFromJob: (job, key) =>
|
42 | configure = _.cloneDeep job.configure
|
43 | _.set configure, 'x-form-schema.angular', "configure.#{key}.angular"
|
44 | slurryProp = _.get configure, 'properties.slurry'
|
45 | newSlurryProp = _.merge slurryProp, @_generateConfigureSlurry()
|
46 | _.set configure, 'properties.slurry', newSlurryProp
|
47 | configure.required = _.union ['metadata'], configure.required
|
48 | return configure
|
49 |
|
50 | _destroySlurry: ({ uuid }) =>
|
51 | slurryStream = @_slurryStreams[uuid]
|
52 | return unless slurryStream?
|
53 | slurryStream.removeListener 'close', slurryStream.__slurryOnClose
|
54 | slurryStream.removeListener 'error', slurryStream.__slurryOnError
|
55 | delete slurryStream.__slurryOnClose
|
56 | throw new Error 'slurryStream must implement destroy method' unless _.isFunction slurryStream?.destroy
|
57 | slurryStream.destroy()
|
58 | delete @_slurryStreams[uuid]
|
59 |
|
60 | _onSlurryCreate: (slurry) =>
|
61 | {
|
62 | uuid
|
63 | selectedConfiguration
|
64 | config
|
65 | encrypted
|
66 | auth
|
67 | } = slurry
|
68 | slurryConfiguration = @configurations[selectedConfiguration]
|
69 | return unless slurryConfiguration?
|
70 |
|
71 | @_destroySlurry { uuid }
|
72 | return if config.slurry?.disabled
|
73 |
|
74 | slurryConfiguration.action {encrypted, auth, userDeviceUuid: uuid}, config, (error, slurryStream) =>
|
75 | @_updateStatusDeviceWithError {auth, userDeviceUuid: uuid, error} if error?
|
76 |
|
77 | if error?
|
78 | console.error 'slurryConfiguration.action Error', error.stack
|
79 | @slurrySpreader.delay {uuid, timeout:THIRTY_SECONDS}, _.noop if error.shouldRetry
|
80 | @slurrySpreader.remove {uuid}, _.noop unless error.shouldRetry
|
81 | @slurrySpreader.close {uuid}, _.noop
|
82 | return
|
83 |
|
84 | return @_onSlurryDelay slurry unless slurryStream?
|
85 |
|
86 | slurryStream.__slurryOnClose = =>
|
87 | @_onSlurryClose slurry
|
88 |
|
89 | slurryStream.__slurryOnError = (error) =>
|
90 | console.error 'Slurry onError', error.stack
|
91 | @slurrySpreader.remove {uuid}, _.noop unless error.shouldRetry
|
92 | @_updateStatusDeviceWithError {auth, userDeviceUuid: uuid, error}
|
93 | @_destroySlurry slurry
|
94 |
|
95 | slurryStream.__slurryOnDelay = (error, timeout=THIRTY_SECONDS) =>
|
96 | throw new Error 'parameter "error" must pass _.isError' unless _.isError error
|
97 | console.error 'Slurry onDelay', error.stack
|
98 | @_updateStatusDeviceWithError {auth, userDeviceUuid: uuid, error}
|
99 | @_onSlurryDelay {uuid, timeout}
|
100 |
|
101 | throw new Error 'slurryStream must implement on method' unless _.isFunction slurryStream?.on
|
102 | slurryStream.on 'close', slurryStream.__slurryOnClose
|
103 | slurryStream.on 'error', slurryStream.__slurryOnError
|
104 | slurryStream.on 'delay', slurryStream.__slurryOnDelay
|
105 | @_slurryStreams[uuid] = slurryStream
|
106 |
|
107 | _onSlurryDelay: ({uuid, timeout}) =>
|
108 | @_destroySlurry { uuid }
|
109 | @slurrySpreader.delay {uuid, timeout}, (error) =>
|
110 | @_slurryStreams[uuid].destroy?()
|
111 | return console.error error.stack if error?
|
112 |
|
113 | _onSlurryDestroy: ({ uuid }) =>
|
114 | @_destroySlurry { uuid }
|
115 |
|
116 | _onSlurryClose: (slurry) =>
|
117 | @slurrySpreader.close slurry, _.noop
|
118 |
|
119 | _formSchemaFromConfigurations: (configurations) =>
|
120 | return {
|
121 | configure: _.mapValues configurations, 'form'
|
122 | }
|
123 |
|
124 | _generateConfigureSlurry: =>
|
125 | return {
|
126 | type: 'object'
|
127 | required: ['disabled']
|
128 | properties:
|
129 | disabled:
|
130 | type: 'boolean'
|
131 | title: 'Disabled'
|
132 | description: 'Disable streaming'
|
133 | }
|
134 |
|
135 | _getConfigurations: =>
|
136 | dirnames = glob.sync path.join(@configurationsPath, '/*/')
|
137 | configurations = {}
|
138 | _.each dirnames, (dir) =>
|
139 | key = _.upperFirst _.camelCase path.basename dir
|
140 | try
|
141 | configurations[key] = require dir
|
142 | catch error
|
143 | console.error error.stack
|
144 |
|
145 | return configurations
|
146 |
|
147 | _updateOnlineUntil: ({slurry, onlineUntil}) =>
|
148 | debug '_updateOnlineUntil', slurry?.uuid, onlineUntil
|
149 | {auth, config} = slurry
|
150 | {statusDevice} = config
|
151 | meshblu = new MeshbluHTTP _.defaults auth, @meshbluConfig
|
152 | meshblu.update statusDevice, {
|
153 | 'status.onlineUntil': onlineUntil
|
154 | }, (error) => console.error error.stack if error?
|
155 |
|
156 | _hasStatusDeviceRef: (config) =>
|
157 | return config?.status?.$ref?
|
158 |
|
159 | _updateStatusDeviceWithError: ({auth, userDeviceUuid, error}, callback=_.noop) =>
|
160 | debug '_updateStatusDeviceWithError', userDeviceUuid, error
|
161 |
|
162 | meshblu = new MeshbluHTTP _.defaults auth, @meshbluConfig
|
163 | meshblu.device userDeviceUuid, (newError, {statusDevice}={}) =>
|
164 | debug "_updateStatusDeviceWithError:statusDevice #{newError?.message} #{userDeviceUuid}/#{statusDevice}" if newError?
|
165 | return callback newError if newError?
|
166 | return callback() unless statusDevice?
|
167 | update =
|
168 | $push:
|
169 | 'status.errors':
|
170 | $each: [
|
171 | date: moment.utc().format()
|
172 | code: error.code ? 500
|
173 | message: error.message
|
174 | ]
|
175 | $slice: -99
|
176 | meshblu.updateDangerously statusDevice, update, callback
|
177 |
|
178 | module.exports = ConfigureHandler
|