UNPKG

6.74 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2uuid = require 'uuid'
3async = require 'async'
4request = require 'request'
5OctobluRaven = require 'octoblu-raven'
6debug = require('debug')('octoblu-flow-canary:canary')
7Stats = require './stats'
8Slack = require './slack'
9
10class Canary
11 constructor: ({@Date,@stats,@slack,@octobluRaven}={})->
12 @OCTOBLU_CANARY_UUID = process.env.OCTOBLU_CANARY_UUID
13 @OCTOBLU_CANARY_TOKEN = process.env.OCTOBLU_CANARY_TOKEN
14 @OCTOBLU_API_HOST = process.env.OCTOBLU_API_HOST or 'https://api.octoblu.com'
15 @OCTOBLU_TRIGGER_HOST = process.env.OCTOBLU_TRIGGER_HOST or 'https://triggers.octoblu.com'
16
17 @CANARY_RESTART_FLOWS_MAX_TIME = Number.parseInt(process.env.CANARY_RESTART_FLOWS_MAX_TIME) or 1000*60*5
18 @CANARY_UPDATE_INTERVAL = Number.parseInt(process.env.CANARY_UPDATE_INTERVAL) or 1000*60
19 @CANARY_HEALTH_CHECK_MAX_DIFF = Number.parseInt(process.env.CANARY_HEALTH_CHECK_MAX_DIFF) or 1000*2
20 @CANARY_DATA_HISTORY_SIZE = Number.parseInt(process.env.CANARY_DATA_HISTORY_SIZE) or 5
21 @CANARY_ERROR_HISTORY_SIZE = Number.parseInt(process.env.CANARY_ERROR_HISTORY_SIZE) or 20
22
23 unless @OCTOBLU_CANARY_UUID and @OCTOBLU_CANARY_TOKEN
24 throw new Error 'Canary UUID or token not defined'
25
26 @jar = request.jar()
27 @jar.setCookie request.cookie("meshblu_auth_uuid=#{@OCTOBLU_CANARY_UUID}"), @OCTOBLU_API_HOST
28 @jar.setCookie request.cookie("meshblu_auth_token=#{@OCTOBLU_CANARY_TOKEN}"), @OCTOBLU_API_HOST
29
30 @flows = []
31 @Date ?= Date
32
33 @octobluRaven = new OctobluRaven
34
35 @stats ?= new Stats {@flows,@Date,@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
36 @slack ?= new Slack {@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
37
38 setInterval @processUpdateInterval, @CANARY_UPDATE_INTERVAL
39
40 getStats: =>
41 @stats.getCurrentStats()
42
43 getPassing: =>
44 @stats.getPassing()
45
46 processUpdateInterval: (callback=->) =>
47 @getFlows (error) =>
48 return @_reportError error, callback if error?
49 @stats.cleanupFlowStats(@flows)
50 @stats.updateStats()
51
52 # @restartFailedFlows =>
53 @postTriggers (error) =>
54 return @_reportError error, callback if error?
55 @slack.sendSlackNotifications @stats.getCurrentStats(), (error) =>
56 return @_reportError error, callback if error?
57 callback null
58
59 messageFromFlow: (message) =>
60 debug 'got message from flow', message
61 flowId = message.fromUuid
62 return unless flowId?
63 flowInfo = @stats.getFlowById(flowId)
64 @unshiftData flowInfo, 'messageTime', @Date.now()
65 @unshiftData flowInfo, 'messages', message
66 return if flowInfo.messageTime.length < 2
67 @unshiftData flowInfo, 'timeDiffs', flowInfo.messageTime[0] - flowInfo.messageTime[1]
68 return if @stats.passingTimeDiff flowInfo.timeDiffs[0]
69 @unshiftData flowInfo, 'failures',
70 time: flowInfo.messageTime[1]
71 timeDiff: flowInfo.timeDiffs[0]
72 , @CANARY_ERROR_HISTORY_SIZE
73
74 startAllFlows: (callback) =>
75 @getFlows (error) =>
76 return callback error if error?
77 flowStarters = []
78 _.each @flows, (flow) => flowStarters.push @curryStartFlow(flow.flowId)
79 async.series flowStarters, (error) =>
80 return callback error if error?
81 debug 'all flows started'
82 callback()
83
84 getFlows: (callback) =>
85 @requestOctobluUrl 'GET', '/api/flows', (error, body) =>
86 return callback error if error?
87 return callback new Error 'body is undefined' if _.isEmpty body
88 @flows = JSON.parse body
89 _.each @flows, (flow) =>
90 @stats.setFlowNames(flow)
91 callback null, @flows
92
93 getTriggers: =>
94 triggers = []
95 _.each @flows, (flow) =>
96 triggerNodes = _.filter flow.nodes, { type: 'operation:trigger' }
97 _.each triggerNodes, (node) =>
98 triggers.push { flowId: flow.flowId, triggerId: node.id }
99 debug " - TRIGGER: #{flow.name} : #{node.name} (#{node.id})"
100 return triggers
101
102 restartFailedFlows: (callback) =>
103 debug 'restarting failed flows'
104 flowStarters = []
105 _.each _.keys(@stats.getFlows()), (flowUuid) =>
106 flowInfo = @stats.getFlowById(flowUuid)
107 if flowInfo.currentTimeDiff > @CANARY_RESTART_FLOWS_MAX_TIME
108 debug "restarting failed flow #{flowUuid}"
109 flowStarters.push @curryStartFlow flowUuid
110 async.series flowStarters, callback
111
112 postTriggers: (callback) =>
113 debug 'posting triggers'
114 async.each @getTriggers(), (trigger, next) =>
115 flowInfo = @stats.getFlowById(trigger.flowId)
116 triggerInfo = flowInfo.triggerTime ?= {}
117 @unshiftData triggerInfo, trigger.triggerId, @Date.now()
118 @postTriggerService trigger, (error) =>
119 console.error error if error?
120 next()
121 , callback
122
123 curryStartFlow: (flowUuid) =>
124 return (callback) =>
125 # FIXME:
126 # Q: Remove the delay - why is it needed?
127 # A: Nanocyte-flow-deploy-service wants love.
128 debug "starting #{flowUuid}"
129 _.delay =>
130 @requestOctobluUrl 'POST', "/api/flows/#{flowUuid}/instance", (error, body) =>
131 return @_reportError error, callback if error?
132 debug "started #{flowUuid} body: #{body}"
133 flowInfo = @stats.getFlowById(flowUuid)
134 @unshiftData flowInfo, 'startTime', @Date.now()
135 callback()
136 , 3000
137
138 requestOctobluUrl: (method, path, callback) =>
139 url = "#{@OCTOBLU_API_HOST}#{path}"
140 headers = deploymentUuid: "flow-canary-#{uuid.v4()}"
141 @sendRequest {headers, method, url, @jar}, callback
142
143 postTriggerService: (trigger, callback) =>
144 url = "#{@OCTOBLU_TRIGGER_HOST}/flows/#{trigger.flowId}/triggers/#{trigger.triggerId}"
145 @sendRequest {method:'POST', url}, callback
146
147 sendRequest: (options, callback) =>
148 debug "#{options?.method} request #{options?.url}"
149 request options, (error, response, body) =>
150 body = undefined if _.isEmpty body
151 urlInfo = "[#{response?.statusCode}] #{options?.method} #{options?.url}"
152 debug urlInfo
153 if error? or response?.statusCode >= 400
154 info = {
155 url: urlInfo
156 body: body
157 error: error?.message ? error
158 time: @Date.now()
159 }
160 @stats.setCanaryErrors info, @CANARY_ERROR_HISTORY_SIZE
161 error = new Error urlInfo unless error?
162 error.info = info
163 return @_reportError error, callback
164 callback null, body
165
166 unshiftData: (obj, prop, data, trimSize=@CANARY_DATA_HISTORY_SIZE) =>
167 obj[prop] ?= []
168 obj[prop].unshift data
169 obj[prop] = obj[prop].slice 0, trimSize
170
171 _reportError: (error, callback) =>
172 @octobluRaven?.reportError error
173 console.error error?.stack ? error
174 callback error
175
176module.exports = Canary