1 | _ = require 'lodash'
|
2 | async = require 'async'
|
3 | mongojs = require 'mongojs'
|
4 | Redis = require 'ioredis'
|
5 | request = require 'request'
|
6 | MeshbluHttp = require 'meshblu-http'
|
7 |
|
8 | throw new Error("INTERVAL_REDIS_URI is required") unless process.env.INTERVAL_REDIS_URI?
|
9 |
|
10 | client = new Redis process.env.REDIS_URI, dropBufferSupport: true
|
11 | intervalClient = new Redis process.env.INTERVAL_REDIS_URI, dropBufferSupport: true
|
12 | database = mongojs process.env.MONGODB_URI, ['instances']
|
13 | datastore = database.instances
|
14 |
|
15 | convertFlow = (record, callback) =>
|
16 | {flowId, instanceId} = record
|
17 | flowId = flowId.replace /-stop/, ''
|
18 | console.log {flowId}
|
19 | datastore.findOne {flowId, instanceId}, (error, record) =>
|
20 | flowData = JSON.parse record.flowData
|
21 | {uuid, token} = flowData['engine-output']?.config
|
22 | return callback() unless uuid? && token?
|
23 |
|
24 | meshbluHttp = new MeshbluHttp {uuid, token, hostname: 'meshblu-http.octoblu.com'}
|
25 | meshbluHttp.whoami (error) =>
|
26 | return callback() if error?
|
27 |
|
28 | intervals = getIntervals flowData
|
29 | uniqIntervals = _.uniqBy intervals, 'id'
|
30 | async.map uniqIntervals, async.apply(convertToDevice, uuid, token), (error, data) =>
|
31 | return callback error if error?
|
32 | data = _.compact data
|
33 | async.each data, async.apply(updateInterval, flowId, instanceId, intervals), (error) =>
|
34 | updatePermissions {uuid, token, data}, (error) =>
|
35 | return callback error if error?
|
36 | updateMongoFlow {flowId, instanceId, flowData}, (error) =>
|
37 | return callback error if error?
|
38 | update =
|
39 | $set:
|
40 | intervalDeviceMigration: Date.now()
|
41 | datastore.update flowId: "#{flowId}-stop", update, callback
|
42 |
|
43 | updateMongoFlow = ({flowId, instanceId, flowData}, callback) =>
|
44 | update =
|
45 | $set:
|
46 | flowData: JSON.stringify(flowData)
|
47 | intervalDeviceMigration: Date.now()
|
48 | datastore.update {flowId, instanceId}, update, callback
|
49 |
|
50 | updatePermissions = ({uuid, token, data}, callback) =>
|
51 | deviceIds = _.map data, 'uuid'
|
52 |
|
53 | updateSendWhitelist =
|
54 | $addToSet:
|
55 | sendWhitelist:
|
56 | $each: deviceIds
|
57 |
|
58 | meshbluHttp = new MeshbluHttp {uuid, token, hostname: 'meshblu-http.octoblu.com'}
|
59 | meshbluHttp.updateDangerously uuid, updateSendWhitelist, callback
|
60 |
|
61 | updateInterval = (flowId, instanceId, intervals, intervalDevice, callback) =>
|
62 | intervals = _.filter intervals, id: intervalDevice.nodeId
|
63 | async.each intervals, (interval, callback) =>
|
64 | interval.deviceId = intervalDevice.deviceId
|
65 | nodeId = interval.id
|
66 |
|
67 | client.hset flowId, "#{instanceId}/#{interval.id}/config", JSON.stringify(interval), (error) =>
|
68 | return callback error if error?
|
69 |
|
70 | redisData = [
|
71 | "interval/uuid/#{flowId}/#{nodeId}"
|
72 | intervalDevice.uuid
|
73 | "interval/token/#{flowId}/#{nodeId}"
|
74 | intervalDevice.token
|
75 | ]
|
76 | intervalClient.mset redisData, callback
|
77 | , callback
|
78 |
|
79 | convertToDevice = (uuid, token, interval, callback) =>
|
80 | return callback() unless interval.deviceId == '765bd3a4-546d-45e6-a62f-1157281083f0'
|
81 | createFlowDevice {uuid, token, nodeId: interval.id}, (error, response) =>
|
82 | return callback error if error?
|
83 | callback null, nodeId: interval.id, uuid: response.uuid, token: response.token
|
84 |
|
85 | createFlowDevice = ({uuid, token, nodeId}, callback) =>
|
86 | options =
|
87 | uri: "https://interval.octoblu.com/nodes/#{nodeId}/intervals"
|
88 | json: true
|
89 | auth:
|
90 | username: uuid
|
91 | password: token
|
92 |
|
93 | request.post options, (error, response, body) =>
|
94 | return callback error if error?
|
95 | return callback new Error "Bad response: #{response.statusCode}" unless response.statusCode < 300
|
96 | callback null, body
|
97 |
|
98 | getIntervals = (flowData) =>
|
99 | nodes = _.map _.values(flowData), 'config'
|
100 | _.filter nodes, (node) =>
|
101 | return _.includes ['interval', 'schedule', 'throttle', 'debounce', 'delay', 'leading-edge-debounce'], node?.class
|
102 |
|
103 | query =
|
104 | flowId: /-stop$/
|
105 | intervalDeviceMigration: $eq: null
|
106 |
|
107 | cursor = datastore.find(query).limit 1000, (error, records) =>
|
108 | throw error if error?
|
109 | async.eachSeries records, convertFlow, (error) =>
|
110 | throw error if error?
|
111 | process.exit 0
|