UNPKG

10.9 kBJavaScriptView Raw
1(function() {
2 var ConsulElected, Watch, args, cp, debug, elected, fs, os, request, _handleExit,
3 __hasProp = {}.hasOwnProperty,
4 __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; };
5
6 debug = require("debug")("consul-elected");
7
8 Watch = require("watch-for-path");
9
10 fs = require("fs");
11
12 os = require("os");
13
14 cp = require("child_process");
15
16 request = require("request");
17
18 args = require("yargs").usage("Usage: $0 -s [server] -k [key] -c [command]").alias({
19 server: 's',
20 key: 'k',
21 command: 'c'
22 }).demand(['key', 'command'])["default"]({
23 server: "localhost:8500",
24 flapping: 30
25 }).describe({
26 server: "Consul server",
27 key: "Key for leader election",
28 command: "Command to run when elected",
29 cwd: "Working directory for command",
30 watch: "File to watch for restarts",
31 restart: "Restart command if watched path changes",
32 verbose: "Turn on debugging"
33 }).boolean(['restart', 'verbose']).argv;
34
35 if (args.verbose) {
36 (require("debug")).enable('consul-elected');
37 debug = require("debug")("consul-elected");
38 }
39
40 ConsulElected = (function(_super) {
41 __extends(ConsulElected, _super);
42
43 function ConsulElected(server, key, command) {
44 this.server = server;
45 this.key = key;
46 this.command = command;
47 this.base_url = "http://" + this.server + "/v1";
48 this.session = null;
49 this.is_leader = false;
50 this.process = null;
51 this._lastIndex = null;
52 this._monitoring = false;
53 this._terminating = false;
54 this._updateTitle();
55 if (args.watch) {
56 debug("Setting a watch on " + args.watch + " before starting up.");
57 new Watch(args.watch, (function(_this) {
58 return function(err) {
59 var last_m, last_restart;
60 if (err) {
61 throw err;
62 }
63 debug("Found " + args.watch + ". Starting up.");
64 if (args.restart) {
65 _this._w = fs.watch(args.watch, function(evt, file) {
66 debug("fs.watch fired for " + args.watch + " (" + evt + ")");
67 return _this.emit("_restart");
68 });
69 last_m = null;
70 _this._wi = setInterval(function() {
71 return fs.stat(args.watch, function(err, stats) {
72 if (err) {
73 return false;
74 }
75 if (last_m) {
76 if (Number(stats.mtime) !== last_m) {
77 debug("Polling found change in " + args.watch + ".");
78 _this.emit("_restart");
79 return last_m = Number(stats.mtime);
80 }
81 } else {
82 return last_m = Number(stats.mtime);
83 }
84 });
85 }, 1000);
86 }
87 _this._startUp();
88 last_restart = null;
89 return _this.on("_restart", function() {
90 var cur_t;
91 cur_t = Number(new Date);
92 if ((_this.process != null) && (!last_restart || cur_t - last_restart > 1200)) {
93 last_restart = cur_t;
94 debug("Triggering restart after watched file change.");
95 return _this.process.p.kill();
96 }
97 });
98 };
99 })(this));
100 } else {
101 this._startUp();
102 }
103 }
104
105 ConsulElected.prototype._updateTitle = function() {
106 return process.title = "consul-elected (" + (this.process ? "Running" : "Waiting") + ")(" + this.command + ")";
107 };
108
109 ConsulElected.prototype._startUp = function() {
110 return this._createSession((function(_this) {
111 return function(err, id) {
112 if (err) {
113 console.error("Failed to create session: " + err);
114 process.exit(1);
115 }
116 _this.session = id;
117 debug("Session ID is " + _this.session);
118 if (_this._terminating) {
119 return false;
120 }
121 return _this._monitorKey();
122 };
123 })(this));
124 };
125
126 ConsulElected.prototype._attemptKeyAcquire = function(cb) {
127 debug("Attempting to acquire leadership");
128 return request.put({
129 url: "" + this.base_url + "/kv/" + this.key,
130 body: {
131 hostname: os.hostname(),
132 pid: process.pid
133 },
134 json: true,
135 qs: {
136 acquire: this.session
137 }
138 }, (function(_this) {
139 return function(err, resp, body) {
140 if (err) {
141 throw err;
142 }
143 if (_this._terminating) {
144 return false;
145 }
146 if (body === true) {
147 _this.is_leader = true;
148 debug("I am now the leader.");
149 _this._runCommand();
150 return typeof cb === "function" ? cb() : void 0;
151 } else {
152 _this.is_leader = false;
153 debug("Did not get leader lock.");
154 _this._stopCommand();
155 return typeof cb === "function" ? cb() : void 0;
156 }
157 };
158 })(this));
159 };
160
161 ConsulElected.prototype._monitorKey = function() {
162 var opts;
163 if (this._monitoring) {
164 return false;
165 }
166 debug("Starting key monitor request.");
167 this._monitoring = true;
168 opts = this._lastIndex ? {
169 wait: '10m',
170 index: this._lastIndex
171 } : null;
172 return request.get({
173 url: "" + this.base_url + "/kv/" + this.key,
174 qs: opts,
175 json: true
176 }, (function(_this) {
177 return function(err, resp, body) {
178 var _ref, _ref1;
179 if (err) {
180 throw err;
181 }
182 if (_this._terminating) {
183 return false;
184 }
185 if (resp.headers['x-consul-index']) {
186 _this._lastIndex = resp.headers['x-consul-index'];
187 debug("Last index is now " + _this._lastIndex);
188 } else {
189 _this._monitoring = false;
190 _this._monitorKey();
191 return false;
192 }
193 _this._monitoring = false;
194 if (body && ((_ref = body[0]) != null ? _ref.Session : void 0)) {
195 debug("Leader is " + (body[0].Session === _this.session ? "Me" : body[0].Session) + ". Polling again.");
196 _this._monitorKey();
197 if (body[0].Session === _this.session) {
198 if (!_this.process) {
199 debug("I am the leader, but I have no process. How so?");
200 return _this._runCommand();
201 } else if ((_ref1 = _this.process) != null ? _ref1.stopping : void 0) {
202 debug("Resetting process.stopping state since poll says I am the leader.");
203 return _this.process.stopping = false;
204 }
205 }
206 } else {
207 return _this._attemptKeyAcquire(function() {
208 return _this._monitorKey();
209 });
210 }
211 };
212 })(this));
213 };
214
215 ConsulElected.prototype._runCommand = function() {
216 var cmd, opts, uptime;
217 debug("Should start command: " + this.command);
218 if (this.process) {
219 this.process.p.removeAllListeners();
220 this.process.p = null;
221 uptime = Number(new Date) - this.process.start;
222 debug("Command uptime was " + (Math.floor(uptime / 1000)) + " seconds.");
223 }
224 opts = {};
225 if (args.cwd) {
226 opts.cwd = args.cwd;
227 }
228 cmd = this.command.split(" ");
229 this.process = {
230 p: null,
231 start: Number(new Date),
232 stopping: false
233 };
234 this.process.p = cp.spawn(cmd[0], cmd.slice(1), opts);
235 this.process.p.stderr.pipe(process.stderr);
236 this._updateTitle();
237 this.process.p.on("error", (function(_this) {
238 return function(err) {
239 debug("Command got error: " + err);
240 if (!_this.process.stopping) {
241 return _this._runCommand();
242 }
243 };
244 })(this));
245 return this.process.p.on("exit", (function(_this) {
246 return function(code, signal) {
247 debug("Command exited: " + code + " || " + signal);
248 if (!_this.process.stopping) {
249 return _this._runCommand();
250 }
251 };
252 })(this));
253 };
254
255 ConsulElected.prototype._stopCommand = function() {
256 debug("Should stop command: " + this.command);
257 if (this.process) {
258 this.process.stopping = true;
259 this.process.p.once("exit", (function(_this) {
260 return function() {
261 debug("Command is stopped.");
262 _this.process = null;
263 return _this._updateTitle();
264 };
265 })(this));
266 return this.process.p.kill();
267 } else {
268 return debug("Stop called with no process running?");
269 }
270 };
271
272 ConsulElected.prototype._createSession = function(cb) {
273 debug("Sending session request");
274 return request.put({
275 url: "" + this.base_url + "/session/create",
276 body: {
277 Name: "" + (os.hostname()) + "-" + this.key
278 },
279 json: true
280 }, (function(_this) {
281 return function(err, resp, body) {
282 return cb(err, body != null ? body.ID : void 0);
283 };
284 })(this));
285 };
286
287 ConsulElected.prototype.terminate = function(cb) {
288 var destroySession;
289 this._terminating = true;
290 if (this.process) {
291 this._stopCommand();
292 }
293 destroySession = (function(_this) {
294 return function() {
295 if (_this.session) {
296 return request.put({
297 url: "" + _this.base_url + "/session/destroy/" + _this.session
298 }, function(err, resp, body) {
299 debug("Session destroy gave status of " + resp.statusCode);
300 return cb();
301 });
302 } else {
303 return cb();
304 }
305 };
306 })(this);
307 if (this.is_leader) {
308 return request.put({
309 url: "" + this.base_url + "/kv/" + this.key,
310 qs: {
311 release: this.session
312 }
313 }, (function(_this) {
314 return function(err, resp, body) {
315 debug("Release leadership gave status of " + resp.statusCode);
316 return destroySession();
317 };
318 })(this));
319 } else {
320 return destroySession();
321 }
322 };
323
324 return ConsulElected;
325
326 })(require("events").EventEmitter);
327
328 elected = new ConsulElected(args.server, args.key, args.command);
329
330 _handleExit = function() {
331 return elected.terminate(function() {
332 debug("Consul Elected exiting.");
333 return process.exit();
334 });
335 };
336
337 process.on('SIGINT', _handleExit);
338
339 process.on('SIGTERM', _handleExit);
340
341}).call(this);