UNPKG

5.85 kBtext/coffeescriptView Raw
1debug = (require 'debug')('octoblu-flow-canary:canary')
2request = require 'request'
3async = require 'async'
4uuid = require 'uuid'
5_ = require 'lodash'
6Stats = require './stats'
7Slack = require './slack'
8
9class Canary
10
11 constructor: ({@Date,@stats,@slack}={})->
12 @OCTOBLU_CANARY_UUID = process.env.OCTOBLU_CANARY_UUID
13 @OCTOBLU_CANARY_TOKEN = process.env.OCTOBLU_CANARY_TOKEN
14 @OCTOBLU_API_HOST = process.env.OCTOBLU_API_HOST or 'https://app.octoblu.com'
15 @OCTOBLU_TRIGGER_HOST = process.env.OCTOBLU_TRIGGER_HOST or 'https://triggers.octoblu.com'
16
17 @CANARY_RESTART_FLOWS_MAX_TIME = Number.parseInt(process.env.CANARY_RESTART_FLOWS_MAX_TIME) or 1000*60*5
18 @CANARY_UPDATE_INTERVAL = Number.parseInt(process.env.CANARY_UPDATE_INTERVAL) or 1000*60
19 @CANARY_HEALTH_CHECK_MAX_DIFF = Number.parseInt(process.env.CANARY_HEALTH_CHECK_MAX_DIFF) or 1000*2
20 @CANARY_DATA_HISTORY_SIZE = Number.parseInt(process.env.CANARY_DATA_HISTORY_SIZE) or 5
21 @CANARY_ERROR_HISTORY_SIZE = Number.parseInt(process.env.CANARY_ERROR_HISTORY_SIZE) or 20
22
23 unless @OCTOBLU_CANARY_UUID and @OCTOBLU_CANARY_TOKEN
24 throw new Error 'Canary UUID or token not defined'
25
26 @jar = request.jar()
27 @jar.setCookie request.cookie("meshblu_auth_uuid=#{@OCTOBLU_CANARY_UUID}"), @OCTOBLU_API_HOST
28 @jar.setCookie request.cookie("meshblu_auth_token=#{@OCTOBLU_CANARY_TOKEN}"), @OCTOBLU_API_HOST
29
30 @flows = []
31 @Date ?= Date
32
33 @stats ?= new Stats {@flows,@Date,@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
34 @slack ?= new Slack {@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
35
36 setInterval @processUpdateInterval, @CANARY_UPDATE_INTERVAL
37
38 getStats: =>
39 @stats.getCurrentStats()
40
41 getPassing: =>
42 @stats.getPassing()
43
44 processUpdateInterval: (callback=->) =>
45 @getFlows =>
46 @stats.cleanupFlowStats(@flows)
47 @stats.updateStats()
48
49 # @restartFailedFlows =>
50 @postTriggers =>
51 @slack.sendSlackNotifications @stats.getCurrentStats(), callback
52
53 messageFromFlow: (flowId) =>
54 flowInfo = @stats.getFlowById(flowId)
55 @unshiftData flowInfo, 'messageTime', @Date.now()
56 return if flowInfo.messageTime.length < 2
57 @unshiftData flowInfo, 'timeDiffs', flowInfo.messageTime[0] - flowInfo.messageTime[1]
58 return if @stats.passingTimeDiff flowInfo.timeDiffs[0]
59 @unshiftData flowInfo, 'failures',
60 time: flowInfo.messageTime[1]
61 timeDiff: flowInfo.timeDiffs[0]
62 , @CANARY_ERROR_HISTORY_SIZE
63
64 startAllFlows: (callback=->) =>
65 @getFlows =>
66 flowStarters = []
67 _.each @flows, (flow) => flowStarters.push @curryStartFlow(flow.flowId)
68 async.series flowStarters, =>
69 debug 'all flows started'
70 callback()
71
72 getFlows: (callback) =>
73 @requestOctobluUrl 'GET', '/api/flows', (error, body) =>
74 return callback error if error?
75 return callback new Error 'body is undefined' unless body
76 @flows = JSON.parse body
77 _.each @flows, (flow) =>
78 @stats.setFlowNames(flow)
79 callback null, @flows
80
81 getTriggers: =>
82 triggers = []
83 _.each @flows, (flow) =>
84 triggerNodes = _.filter flow.nodes, (node) => return node.type == 'operation:trigger'
85 _.each triggerNodes, (node) =>
86 triggers.push { flowId: flow.flowId, triggerId: node.id }
87 debug " - TRIGGER: #{flow.name} : #{node.name} (#{node.id})"
88 return triggers
89
90 restartFailedFlows: (callback=->) =>
91 debug 'restarting failed flows'
92 flowStarters = []
93 _.each _.keys(@stats.getFlows()), (flowUuid) =>
94 flowInfo = @stats.getFlowById(flowUuid)
95 if flowInfo.currentTimeDiff > @CANARY_RESTART_FLOWS_MAX_TIME
96 debug "restarting failed flow #{flowUuid}"
97 flowStarters.push @curryStartFlow flowUuid
98 async.series flowStarters, callback
99
100 postTriggers: (callback=->) =>
101 debug 'posting triggers'
102 async.each @getTriggers(), (trigger, callback) =>
103 flowInfo = @stats.getFlowById(trigger.flowId)
104 triggerInfo = flowInfo.triggerTime ?= {}
105 @unshiftData triggerInfo, trigger.triggerId, @Date.now()
106 @postTriggerService trigger, => callback()
107 , callback
108
109 curryStartFlow: (flowUuid) =>
110 return (callback=->) =>
111 # FIXME:
112 # Q: Remove the delay - why is it needed?
113 # A: Nanocyte-flow-deploy-service wants love.
114 debug "starting #{flowUuid}"
115 _.delay =>
116 @requestOctobluUrl 'POST', "/api/flows/#{flowUuid}/instance", (error, body) =>
117 debug "started #{flowUuid} body: #{body}"
118 flowInfo = @stats.getFlowById(flowUuid)
119 @unshiftData flowInfo, 'startTime', @Date.now()
120 callback()
121 , 3000
122
123 requestOctobluUrl: (method, path, callback) =>
124 url = "#{@OCTOBLU_API_HOST}#{path}"
125 headers = deploymentUuid: "flow-canary-#{uuid.v4()}"
126 @sendRequest {headers, method, url, @jar}, callback
127
128 postTriggerService: (trigger, callback=->) =>
129 url = "#{@OCTOBLU_TRIGGER_HOST}/flows/#{trigger.flowId}/triggers/#{trigger.triggerId}"
130 @sendRequest {method:'POST', url}, callback
131
132 sendRequest: (options, callback) =>
133 debug "#{options?.method} request #{options?.url}"
134 request options, (error, response, body) =>
135 body = undefined if _.isEmpty body
136 urlInfo = "[#{response?.statusCode}] #{options?.method} #{options?.url}"
137 debug urlInfo
138 if error? or response?.statusCode >= 400
139 @stats.setCanaryErrors {
140 url: urlInfo
141 body: body
142 error: error?.message
143 time: @Date.now()
144 }, @CANARY_ERROR_HISTORY_SIZE
145 return callback new Error urlInfo
146 callback null, body
147
148 unshiftData: (obj, prop, data, trimSize=@CANARY_DATA_HISTORY_SIZE) =>
149 obj[prop] ?= []
150 obj[prop].unshift data
151 obj[prop] = obj[prop].slice 0, trimSize
152
153module.exports = Canary