UNPKG

9.7 kBtext/coffeescriptView Raw
1debug = require("debug")("consul-elected")
2
3Watch = require "watch-for-path"
4
5fs = require "fs"
6os = require "os"
7cp = require "child_process"
8request = require "request"
9
10args = require("yargs")
11 .usage("Usage: $0 -s [server] -k [key] -c [command]")
12 .alias
13 server: 's'
14 key: 'k'
15 command: 'c'
16 .demand(['key','command'])
17 .default
18 server: "localhost:8500"
19 flapping: 30
20 .describe
21 server: "Consul server"
22 key: "Key for leader election"
23 command: "Command to run when elected"
24 cwd: "Working directory for command"
25 watch: "File to watch for restarts"
26 restart: "Restart command if watched path changes"
27 verbose: "Turn on debugging"
28 .boolean(['restart','verbose'])
29 .argv
30
31if args.verbose
32 (require "debug").enable('consul-elected')
33 debug = require("debug")("consul-elected")
34
35#----------
36
37class ConsulElected extends require("events").EventEmitter
38 constructor: (@server,@key,@command) ->
39 @base_url = "http://#{@server}/v1"
40
41 @session = null
42 @is_leader = false
43
44 @process = null
45
46 @_lastIndex = null
47 @_monitoring = false
48 @_terminating = false
49
50 # create a debounced function for calling restart, so that we don't
51 # trigger multiple times in a row. This would just be _.debounce,
52 # but bringing underscore in for one thing seemed silly
53
54 # Set a slightly more readable title
55 @_updateTitle()
56
57 if args.watch
58 debug "Setting a watch on #{ args.watch } before starting up."
59 new Watch args.watch, (err) =>
60 throw err if err
61
62 debug "Found #{ args.watch }. Starting up."
63
64 if args.restart
65 # now set a normal watch on the now-existant path, so that we
66 # can restart if it changes
67 @_w = fs.watch args.watch, (evt,file) =>
68 debug "fs.watch fired for #{args.watch} (#{evt})"
69 @emit "_restart"
70
71 last_m = null
72 @_wi = setInterval =>
73 fs.stat args.watch, (err,stats) =>
74 if err
75 return false
76
77 if last_m
78 if Number(stats.mtime) != last_m
79 debug "Polling found change in #{args.watch}."
80 @emit "_restart"
81 last_m = Number(stats.mtime)
82
83 else
84 last_m = Number(stats.mtime)
85
86 , 1000
87
88 # path now exists...
89 @_startUp()
90
91 last_restart = null
92 @on "_restart", =>
93 cur_t = Number(new Date)
94 if @process? && (!last_restart || cur_t - last_restart > 1200)
95 last_restart = cur_t
96 # send a kill, then let our normal exit code handle the restart
97 debug "Triggering restart after watched file change."
98 @process.p.kill()
99
100 else
101 @_startUp()
102
103 #----------
104
105 _updateTitle: ->
106 process.title = "consul-elected (#{ if @process then "Running" else "Waiting" })(#{@command})"
107
108 #----------
109
110 _startUp: ->
111 @_createSession (err,id) =>
112 if err
113 console.error "Failed to create session: #{err}"
114 process.exit(1)
115
116 @session = id
117
118 debug "Session ID is #{@session}"
119
120 return false if @_terminating
121
122 @_monitorKey()
123
124 #----------
125
126 _attemptKeyAcquire: (cb) ->
127 debug "Attempting to acquire leadership"
128 request.put
129 url: "#{@base_url}/kv/#{@key}"
130 body: { hostname:os.hostname(), pid:process.pid }
131 json: true
132 qs: { acquire:@session }
133 , (err,resp,body) =>
134 throw err if err
135
136 return false if @_terminating
137
138 if body == true
139 # We got the lock
140 @is_leader = true
141 debug "I am now the leader."
142 @_runCommand()
143 cb?()
144
145 else
146 # We did not get the lock
147 @is_leader = false
148 debug "Did not get leader lock."
149 @_stopCommand()
150 cb?()
151
152 #----------
153
154 _monitorKey: ->
155 if @_monitoring
156 return false
157
158 debug "Starting key monitor request."
159 @_monitoring = true
160
161 # on our first lookup, we won't yet have @_lastIndex and we'll just
162 # want an answer back right away to see if there is an existing leader
163
164 opts =
165 if @_lastIndex
166 wait: '10m'
167 index: @_lastIndex
168 else
169 null
170
171 request.get
172 url: "#{@base_url}/kv/#{@key}"
173 qs: opts
174 json: true
175 , (err,resp,body) =>
176 # FIXME: What should we be doing here?
177 throw err if err
178
179 return false if @_terminating
180
181 if resp.headers['x-consul-index']
182 @_lastIndex = resp.headers['x-consul-index']
183 debug "Last index is now #{ @_lastIndex }"
184 else
185 # if we don't get an index, there's probably something wrong
186 # with our poll attempt. just retry that.
187 @_monitoring = false
188 @_monitorKey()
189 return false
190
191 @_monitoring = false
192
193 if body && body[0]?.Session
194 # there is a leader... poll again
195 debug "Leader is #{ if body[0].Session == @session then "Me" else body[0].Session }. Polling again."
196 @_monitorKey()
197
198 # if we're the leader, make sure our process is still healthy
199 if body[0].Session == @session
200 if !@process
201 # not sure how we would arrive here...
202 debug "I am the leader, but I have no process. How so?"
203 @_runCommand()
204
205 else if @process?.stopping
206 # setting this to false will cause the process to restart
207 # after it exits
208 debug "Resetting process.stopping state since poll says I am the leader."
209 @process.stopping = false
210
211 else
212 # no leader... jump in
213 @_attemptKeyAcquire =>
214 @_monitorKey()
215
216 #----------
217
218 _runCommand: ->
219 debug "Should start command: #{@command}"
220
221 if @process
222 # FIXME: this is to remove old process information, but should we make
223 # sure it's actually dead here? or put a bullet in it?
224
225 @process.p.removeAllListeners()
226 @process.p = null
227
228 uptime = Number(new Date) - @process.start
229 debug "Command uptime was #{ Math.floor(uptime / 1000) } seconds."
230
231 opts = {}
232 if args.cwd
233 opts.cwd = args.cwd
234
235 cmd = @command.split(" ")
236
237 @process = p:null, start:Number(new Date), stopping:false
238 @process.p = cp.spawn cmd[0], cmd[1..], opts
239
240 @process.p.stderr.pipe(process.stderr)
241
242 @_updateTitle()
243
244 @process.p.on "error", (err) =>
245 debug "Command got error: #{err}"
246 @_runCommand() if !@process.stopping
247
248 @process.p.on "exit", (code,signal) =>
249 debug "Command exited: #{code} || #{signal}"
250 @_runCommand() if !@process.stopping
251
252 #----------
253
254 _stopCommand: ->
255 debug "Should stop command: #{@command}"
256
257 if @process
258 @process.stopping = true
259 @process.p.once "exit", =>
260 debug "Command is stopped."
261 @process = null
262
263 @_updateTitle()
264
265 @process.p.kill()
266 else
267 debug "Stop called with no process running?"
268
269 #----------
270
271 _createSession: (cb) ->
272 debug "Sending session request"
273 request.put
274 url: "#{@base_url}/session/create"
275 body: { Name:"#{os.hostname()}-#{@key}" }
276 json: true
277 , (err,resp,body) =>
278 cb err, body?.ID
279
280 #----------
281
282 terminate: (cb) ->
283 @_terminating = true
284
285 @_stopCommand() if @process
286
287 destroySession = =>
288 if @session
289 # terminate our session
290 request.put
291 url: "#{@base_url}/session/destroy/#{@session}"
292 , (err,resp,body) =>
293 debug "Session destroy gave status of #{ resp.statusCode }"
294 cb()
295 else
296 cb()
297
298 # give up our lock if we have one
299 if @is_leader
300 request.put
301 url: "#{@base_url}/kv/#{@key}"
302 qs: { release:@session }
303 , (err,resp,body) =>
304 debug "Release leadership gave status of #{ resp.statusCode }"
305 destroySession()
306 else
307 destroySession()
308
309#----------
310
311elected = new ConsulElected args.server, args.key, args.command
312
313_handleExit = ->
314 elected.terminate ->
315 debug "Consul Elected exiting."
316 process.exit()
317
318process.on 'SIGINT', _handleExit
319process.on 'SIGTERM', _handleExit
320