1 | debug = require("debug")("consul-elected")
|
2 |
|
3 | Watch = require "watch-for-path"
|
4 |
|
5 | fs = require "fs"
|
6 | os = require "os"
|
7 | cp = require "child_process"
|
8 | request = require "request"
|
9 |
|
10 | args = require("yargs")
|
11 | .usage("Usage: $0 -s [server] -k [key] -c [command]")
|
12 | .alias
|
13 | server: 's'
|
14 | key: 'k'
|
15 | command: 'c'
|
16 | .demand(['key','command'])
|
17 | .default
|
18 | server: "localhost:8500"
|
19 | flapping: 30
|
20 | .describe
|
21 | server: "Consul server"
|
22 | key: "Key for leader election"
|
23 | command: "Command to run when elected"
|
24 | cwd: "Working directory for command"
|
25 | watch: "File to watch for restarts"
|
26 | restart: "Restart command if watched path changes"
|
27 | verbose: "Turn on debugging"
|
28 | .boolean(['restart','verbose'])
|
29 | .argv
|
30 |
|
31 | if args.verbose
|
32 | (require "debug").enable('consul-elected')
|
33 | debug = require("debug")("consul-elected")
|
34 |
|
35 |
|
36 |
|
37 | class ConsulElected extends require("events").EventEmitter
|
38 | constructor: (@server,@key,@command) ->
|
39 | @base_url = "http://#{@server}/v1"
|
40 |
|
41 | @session = null
|
42 | @is_leader = false
|
43 |
|
44 | @process = null
|
45 |
|
46 | @_lastIndex = null
|
47 | @_monitoring = false
|
48 | @_terminating = false
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 | @_updateTitle()
|
56 |
|
57 | if args.watch
|
58 | debug "Setting a watch on #{ args.watch } before starting up."
|
59 | new Watch args.watch, (err) =>
|
60 | throw err if err
|
61 |
|
62 | debug "Found #{ args.watch }. Starting up."
|
63 |
|
64 | if args.restart
|
65 |
|
66 |
|
67 | @_w = fs.watch args.watch, (evt,file) =>
|
68 | debug "fs.watch fired for #{args.watch} (#{evt})"
|
69 | @emit "_restart"
|
70 |
|
71 | last_m = null
|
72 | @_wi = setInterval =>
|
73 | fs.stat args.watch, (err,stats) =>
|
74 | if err
|
75 | return false
|
76 |
|
77 | if last_m
|
78 | if Number(stats.mtime) != last_m
|
79 | debug "Polling found change in #{args.watch}."
|
80 | @emit "_restart"
|
81 | last_m = Number(stats.mtime)
|
82 |
|
83 | else
|
84 | last_m = Number(stats.mtime)
|
85 |
|
86 | , 1000
|
87 |
|
88 |
|
89 | @_startUp()
|
90 |
|
91 | last_restart = null
|
92 | @on "_restart", =>
|
93 | cur_t = Number(new Date)
|
94 | if @process? && (!last_restart || cur_t - last_restart > 1200)
|
95 | last_restart = cur_t
|
96 |
|
97 | debug "Triggering restart after watched file change."
|
98 | @process.p.kill()
|
99 |
|
100 | else
|
101 | @_startUp()
|
102 |
|
103 |
|
104 |
|
105 | _updateTitle: ->
|
106 | process.title = "consul-elected (#{ if @process then "Running" else "Waiting" })(#{@command})"
|
107 |
|
108 |
|
109 |
|
110 | _startUp: ->
|
111 | @_createSession (err,id) =>
|
112 | if err
|
113 | console.error "Failed to create session: #{err}"
|
114 | process.exit(1)
|
115 |
|
116 | @session = id
|
117 |
|
118 | debug "Session ID is #{@session}"
|
119 |
|
120 | return false if @_terminating
|
121 |
|
122 | @_monitorKey()
|
123 |
|
124 |
|
125 |
|
126 | _attemptKeyAcquire: (cb) ->
|
127 | debug "Attempting to acquire leadership"
|
128 | request.put
|
129 | url: "#{@base_url}/kv/#{@key}"
|
130 | body: { hostname:os.hostname(), pid:process.pid }
|
131 | json: true
|
132 | qs: { acquire:@session }
|
133 | , (err,resp,body) =>
|
134 | throw err if err
|
135 |
|
136 | return false if @_terminating
|
137 |
|
138 | if body == true
|
139 |
|
140 | @is_leader = true
|
141 | debug "I am now the leader."
|
142 | @_runCommand()
|
143 | cb?()
|
144 |
|
145 | else
|
146 |
|
147 | @is_leader = false
|
148 | debug "Did not get leader lock."
|
149 | @_stopCommand()
|
150 | cb?()
|
151 |
|
152 |
|
153 |
|
154 | _monitorKey: ->
|
155 | if @_monitoring
|
156 | return false
|
157 |
|
158 | debug "Starting key monitor request."
|
159 | @_monitoring = true
|
160 |
|
161 |
|
162 |
|
163 |
|
164 | opts =
|
165 | if @_lastIndex
|
166 | wait: '10m'
|
167 | index: @_lastIndex
|
168 | else
|
169 | null
|
170 |
|
171 | request.get
|
172 | url: "#{@base_url}/kv/#{@key}"
|
173 | qs: opts
|
174 | json: true
|
175 | , (err,resp,body) =>
|
176 |
|
177 | throw err if err
|
178 |
|
179 | return false if @_terminating
|
180 |
|
181 | if resp.headers['x-consul-index']
|
182 | @_lastIndex = resp.headers['x-consul-index']
|
183 | debug "Last index is now #{ @_lastIndex }"
|
184 | else
|
185 |
|
186 |
|
187 | @_monitoring = false
|
188 | @_monitorKey()
|
189 | return false
|
190 |
|
191 | @_monitoring = false
|
192 |
|
193 | if body && body[0]?.Session
|
194 |
|
195 | debug "Leader is #{ if body[0].Session == @session then "Me" else body[0].Session }. Polling again."
|
196 | @_monitorKey()
|
197 |
|
198 |
|
199 | if body[0].Session == @session
|
200 | if !@process
|
201 |
|
202 | debug "I am the leader, but I have no process. How so?"
|
203 | @_runCommand()
|
204 |
|
205 | else if @process?.stopping
|
206 |
|
207 |
|
208 | debug "Resetting process.stopping state since poll says I am the leader."
|
209 | @process.stopping = false
|
210 |
|
211 | else
|
212 |
|
213 | @_attemptKeyAcquire =>
|
214 | @_monitorKey()
|
215 |
|
216 |
|
217 |
|
218 | _runCommand: ->
|
219 | debug "Should start command: #{@command}"
|
220 |
|
221 | if @process
|
222 |
|
223 |
|
224 |
|
225 | @process.p.removeAllListeners()
|
226 | @process.p = null
|
227 |
|
228 | uptime = Number(new Date) - @process.start
|
229 | debug "Command uptime was #{ Math.floor(uptime / 1000) } seconds."
|
230 |
|
231 | opts = {}
|
232 | if args.cwd
|
233 | opts.cwd = args.cwd
|
234 |
|
235 | cmd = @command.split(" ")
|
236 |
|
237 | @process = p:null, start:Number(new Date), stopping:false
|
238 | @process.p = cp.spawn cmd[0], cmd[1..], opts
|
239 |
|
240 | @process.p.stderr.pipe(process.stderr)
|
241 |
|
242 | @_updateTitle()
|
243 |
|
244 | @process.p.on "error", (err) =>
|
245 | debug "Command got error: #{err}"
|
246 | @_runCommand() if !@process.stopping
|
247 |
|
248 | @process.p.on "exit", (code,signal) =>
|
249 | debug "Command exited: #{code} || #{signal}"
|
250 | @_runCommand() if !@process.stopping
|
251 |
|
252 |
|
253 |
|
254 | _stopCommand: ->
|
255 | debug "Should stop command: #{@command}"
|
256 |
|
257 | if @process
|
258 | @process.stopping = true
|
259 | @process.p.once "exit", =>
|
260 | debug "Command is stopped."
|
261 | @process = null
|
262 |
|
263 | @_updateTitle()
|
264 |
|
265 | @process.p.kill()
|
266 | else
|
267 | debug "Stop called with no process running?"
|
268 |
|
269 |
|
270 |
|
271 | _createSession: (cb) ->
|
272 | debug "Sending session request"
|
273 | request.put
|
274 | url: "#{@base_url}/session/create"
|
275 | body: { Name:"#{os.hostname()}-#{@key}" }
|
276 | json: true
|
277 | , (err,resp,body) =>
|
278 | cb err, body?.ID
|
279 |
|
280 |
|
281 |
|
282 | terminate: (cb) ->
|
283 | @_terminating = true
|
284 |
|
285 | @_stopCommand() if @process
|
286 |
|
287 | destroySession = =>
|
288 | if @session
|
289 |
|
290 | request.put
|
291 | url: "#{@base_url}/session/destroy/#{@session}"
|
292 | , (err,resp,body) =>
|
293 | debug "Session destroy gave status of #{ resp.statusCode }"
|
294 | cb()
|
295 | else
|
296 | cb()
|
297 |
|
298 |
|
299 | if @is_leader
|
300 | request.put
|
301 | url: "#{@base_url}/kv/#{@key}"
|
302 | qs: { release:@session }
|
303 | , (err,resp,body) =>
|
304 | debug "Release leadership gave status of #{ resp.statusCode }"
|
305 | destroySession()
|
306 | else
|
307 | destroySession()
|
308 |
|
309 |
|
310 |
|
311 | elected = new ConsulElected args.server, args.key, args.command
|
312 |
|
313 | _handleExit = ->
|
314 | elected.terminate ->
|
315 | debug "Consul Elected exiting."
|
316 | process.exit()
|
317 |
|
318 | process.on 'SIGINT', _handleExit
|
319 | process.on 'SIGTERM', _handleExit
|
320 |
|