1 | _ = require 'lodash'
|
2 | uuid = require 'uuid'
|
3 | async = require 'async'
|
4 | request = require 'request'
|
5 | OctobluRaven = require 'octoblu-raven'
|
6 | debug = require('debug')('octoblu-flow-canary:canary')
|
7 | Stats = require './stats'
|
8 | Slack = require './slack'
|
9 |
|
10 | class Canary
|
11 | constructor: ({@Date,@stats,@slack,@octobluRaven}={})->
|
12 | @OCTOBLU_CANARY_UUID = process.env.OCTOBLU_CANARY_UUID
|
13 | @OCTOBLU_CANARY_TOKEN = process.env.OCTOBLU_CANARY_TOKEN
|
14 | @OCTOBLU_API_HOST = process.env.OCTOBLU_API_HOST or 'https://api.octoblu.com'
|
15 | @OCTOBLU_TRIGGER_HOST = process.env.OCTOBLU_TRIGGER_HOST or 'https://triggers.octoblu.com'
|
16 |
|
17 | @CANARY_RESTART_FLOWS_MAX_TIME = Number.parseInt(process.env.CANARY_RESTART_FLOWS_MAX_TIME) or 1000*60*5
|
18 | @CANARY_UPDATE_INTERVAL = Number.parseInt(process.env.CANARY_UPDATE_INTERVAL) or 1000*60
|
19 | @CANARY_HEALTH_CHECK_MAX_DIFF = Number.parseInt(process.env.CANARY_HEALTH_CHECK_MAX_DIFF) or 1000*2
|
20 | @CANARY_DATA_HISTORY_SIZE = Number.parseInt(process.env.CANARY_DATA_HISTORY_SIZE) or 5
|
21 | @CANARY_ERROR_HISTORY_SIZE = Number.parseInt(process.env.CANARY_ERROR_HISTORY_SIZE) or 20
|
22 |
|
23 | unless @OCTOBLU_CANARY_UUID and @OCTOBLU_CANARY_TOKEN
|
24 | throw new Error 'Canary UUID or token not defined'
|
25 |
|
26 | @jar = request.jar()
|
27 | @jar.setCookie request.cookie("meshblu_auth_uuid=#{@OCTOBLU_CANARY_UUID}"), @OCTOBLU_API_HOST
|
28 | @jar.setCookie request.cookie("meshblu_auth_token=#{@OCTOBLU_CANARY_TOKEN}"), @OCTOBLU_API_HOST
|
29 |
|
30 | @flows = []
|
31 | @Date ?= Date
|
32 |
|
33 | @octobluRaven = new OctobluRaven
|
34 |
|
35 | @stats ?= new Stats {@flows,@Date,@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
|
36 | @slack ?= new Slack {@CANARY_UPDATE_INTERVAL,@CANARY_HEALTH_CHECK_MAX_DIFF}
|
37 |
|
38 | setInterval @processUpdateInterval, @CANARY_UPDATE_INTERVAL
|
39 |
|
40 | getStats: =>
|
41 | @stats.getCurrentStats()
|
42 |
|
43 | getPassing: =>
|
44 | @stats.getPassing()
|
45 |
|
46 | processUpdateInterval: (callback=->) =>
|
47 | @getFlows (error) =>
|
48 | return @_reportError error, callback if error?
|
49 | @stats.cleanupFlowStats(@flows)
|
50 | @stats.updateStats()
|
51 |
|
52 |
|
53 | @postTriggers (error) =>
|
54 | return @_reportError error, callback if error?
|
55 | @slack.sendSlackNotifications @stats.getCurrentStats(), (error) =>
|
56 | return @_reportError error, callback if error?
|
57 | callback null
|
58 |
|
59 | messageFromFlow: (message) =>
|
60 | debug 'got message from flow', message
|
61 | flowId = message.fromUuid
|
62 | return unless flowId?
|
63 | flowInfo = @stats.getFlowById(flowId)
|
64 | @unshiftData flowInfo, 'messageTime', @Date.now()
|
65 | @unshiftData flowInfo, 'messages', message
|
66 | return if flowInfo.messageTime.length < 2
|
67 | @unshiftData flowInfo, 'timeDiffs', flowInfo.messageTime[0] - flowInfo.messageTime[1]
|
68 | return if @stats.passingTimeDiff flowInfo.timeDiffs[0]
|
69 | @unshiftData flowInfo, 'failures',
|
70 | time: flowInfo.messageTime[1]
|
71 | timeDiff: flowInfo.timeDiffs[0]
|
72 | , @CANARY_ERROR_HISTORY_SIZE
|
73 |
|
74 | startAllFlows: (callback) =>
|
75 | @getFlows (error) =>
|
76 | return callback error if error?
|
77 | flowStarters = []
|
78 | _.each @flows, (flow) => flowStarters.push @curryStartFlow(flow.flowId)
|
79 | async.series flowStarters, (error) =>
|
80 | return callback error if error?
|
81 | debug 'all flows started'
|
82 | callback()
|
83 |
|
84 | getFlows: (callback) =>
|
85 | @requestOctobluUrl 'GET', '/api/flows', (error, body) =>
|
86 | return callback error if error?
|
87 | return callback new Error 'body is undefined' if _.isEmpty body
|
88 | @flows = JSON.parse body
|
89 | _.each @flows, (flow) =>
|
90 | @stats.setFlowNames(flow)
|
91 | callback null, @flows
|
92 |
|
93 | getTriggers: =>
|
94 | triggers = []
|
95 | _.each @flows, (flow) =>
|
96 | triggerNodes = _.filter flow.nodes, { type: 'operation:trigger' }
|
97 | _.each triggerNodes, (node) =>
|
98 | triggers.push { flowId: flow.flowId, triggerId: node.id }
|
99 | debug " - TRIGGER: #{flow.name} : #{node.name} (#{node.id})"
|
100 | return triggers
|
101 |
|
102 | restartFailedFlows: (callback) =>
|
103 | debug 'restarting failed flows'
|
104 | flowStarters = []
|
105 | _.each _.keys(@stats.getFlows()), (flowUuid) =>
|
106 | flowInfo = @stats.getFlowById(flowUuid)
|
107 | if flowInfo.currentTimeDiff > @CANARY_RESTART_FLOWS_MAX_TIME
|
108 | debug "restarting failed flow #{flowUuid}"
|
109 | flowStarters.push @curryStartFlow flowUuid
|
110 | async.series flowStarters, callback
|
111 |
|
112 | postTriggers: (callback) =>
|
113 | debug 'posting triggers'
|
114 | async.each @getTriggers(), (trigger, next) =>
|
115 | flowInfo = @stats.getFlowById(trigger.flowId)
|
116 | triggerInfo = flowInfo.triggerTime ?= {}
|
117 | @unshiftData triggerInfo, trigger.triggerId, @Date.now()
|
118 | @postTriggerService trigger, (error) =>
|
119 | console.error error if error?
|
120 | next()
|
121 | , callback
|
122 |
|
123 | curryStartFlow: (flowUuid) =>
|
124 | return (callback) =>
|
125 |
|
126 |
|
127 |
|
128 | debug "starting #{flowUuid}"
|
129 | _.delay =>
|
130 | @requestOctobluUrl 'POST', "/api/flows/#{flowUuid}/instance", (error, body) =>
|
131 | return @_reportError error, callback if error?
|
132 | debug "started #{flowUuid} body: #{body}"
|
133 | flowInfo = @stats.getFlowById(flowUuid)
|
134 | @unshiftData flowInfo, 'startTime', @Date.now()
|
135 | callback()
|
136 | , 3000
|
137 |
|
138 | requestOctobluUrl: (method, path, callback) =>
|
139 | url = "#{@OCTOBLU_API_HOST}#{path}"
|
140 | headers = deploymentUuid: "flow-canary-#{uuid.v4()}"
|
141 | @sendRequest {headers, method, url, @jar}, callback
|
142 |
|
143 | postTriggerService: (trigger, callback) =>
|
144 | url = "#{@OCTOBLU_TRIGGER_HOST}/flows/#{trigger.flowId}/triggers/#{trigger.triggerId}"
|
145 | @sendRequest {method:'POST', url}, callback
|
146 |
|
147 | sendRequest: (options, callback) =>
|
148 | debug "#{options?.method} request #{options?.url}"
|
149 | request options, (error, response, body) =>
|
150 | body = undefined if _.isEmpty body
|
151 | urlInfo = "[#{response?.statusCode}] #{options?.method} #{options?.url}"
|
152 | debug urlInfo
|
153 | if error? or response?.statusCode >= 400
|
154 | info = {
|
155 | url: urlInfo
|
156 | body: body
|
157 | error: error?.message ? error
|
158 | time: @Date.now()
|
159 | }
|
160 | @stats.setCanaryErrors info, @CANARY_ERROR_HISTORY_SIZE
|
161 | error = new Error urlInfo unless error?
|
162 | error.info = info
|
163 | return @_reportError error, callback
|
164 | callback null, body
|
165 |
|
166 | unshiftData: (obj, prop, data, trimSize=@CANARY_DATA_HISTORY_SIZE) =>
|
167 | obj[prop] ?= []
|
168 | obj[prop].unshift data
|
169 | obj[prop] = obj[prop].slice 0, trimSize
|
170 |
|
171 | _reportError: (error, callback) =>
|
172 | @octobluRaven?.reportError error
|
173 | console.error error?.stack ? error
|
174 | callback error
|
175 |
|
176 | module.exports = Canary
|