UNPKG

19.1 kBJavaScriptView Raw
1"use strict";
2
3var _createClass = (function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; })();
4
5function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
6
7var co = require('co'),
8 f = require('util').format,
9 mkdirp = require('mkdirp'),
10 rimraf = require('rimraf'),
11 Logger = require('./logger'),
12 CoreServer = require('mongodb-core').Server,
13 spawn = require('child_process').spawn;
14
15var clone = function clone(o) {
16 var obj = {};for (var name in o) {
17 obj[name] = o[name];
18 }return obj;
19};
20
21var Mongos = (function () {
22 function Mongos(binary, options, clientOptions) {
23 _classCallCheck(this, Mongos);
24
25 options = options || {};
26 this.options = clone(options);
27
28 // Server state
29 this.state = 'stopped';
30
31 // Create logger instance
32 this.logger = Logger('Mongos', options);
33
34 // Unpack default runtime information
35 this.binary = binary || 'mongos';
36 // Current process
37 this.process = null;
38 // Current command
39 this.command = null;
40 // Credentials store
41 this.credentials = [];
42 // Additional internal client options
43 this.clientOptions = clientOptions || {};
44
45 // Default values for host and port if not set
46 this.options.bind_ip = typeof this.options.bind_ip == 'string' ? this.options.bind_ip : 'localhost';
47 this.options.port = typeof this.options.port == 'number' ? this.options.port : 27017;
48 }
49
50 _createClass(Mongos, [{
51 key: 'discover',
52 value: function discover() {
53 var self = this;
54
55 return new Promise(function (resolve, reject) {
56 co(regeneratorRuntime.mark(function _callee() {
57 var proc, stdout, stderr;
58 return regeneratorRuntime.wrap(function _callee$(_context) {
59 while (1) {
60 switch (_context.prev = _context.next) {
61 case 0:
62 proc = spawn(self.binary, ['--version']);
63 // Variables receiving data
64
65 stdout = '';
66 stderr = '';
67 // Get the stdout
68
69 proc.stdout.on('data', function (data) {
70 stdout += data;
71 });
72 // Get the stderr
73 proc.stderr.on('data', function (data) {
74 stderr += data;
75 });
76 // Got an error
77 proc.on('error', function (err) {
78 reject(err);
79 });
80 // Process terminated
81 proc.on('close', function (code) {
82 // Perform version match
83 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/);
84
85 // Check if we have ssl
86 var sslMatch = stdout.match(/ssl/i);
87
88 // Resolve the server version
89 resolve({
90 version: versionMatch.toString().split('.').map(function (x) {
91 return parseInt(x, 10);
92 }),
93 ssl: sslMatch != null
94 });
95 });
96
97 case 7:
98 case 'end':
99 return _context.stop();
100 }
101 }
102 }, _callee, this);
103 })).catch(reject);
104 });
105 }
106 }, {
107 key: 'executeCommand',
108 value: function executeCommand(ns, command, credentials, options) {
109 var self = this;
110 options = options || {};
111 options = clone(options);
112
113 return new Promise(function (resolve, reject) {
114 co(regeneratorRuntime.mark(function _callee2() {
115 var opt, executeCommand, s;
116 return regeneratorRuntime.wrap(function _callee2$(_context2) {
117 while (1) {
118 switch (_context2.prev = _context2.next) {
119 case 0:
120 // Copy the basic options
121 opt = clone(self.clientOptions);
122
123 opt.host = self.options.bind_ip;
124 opt.port = self.options.port;
125 opt.connectionTimeout = 5000;
126 opt.socketTimeout = 5000;
127 opt.pool = 1;
128
129 // Ensure we only connect once and emit any error caught
130 opt.reconnect = false;
131 opt.emitError = true;
132
133 // Execute command
134
135 executeCommand = function executeCommand(_ns, _command, _credentials, _server) {
136 // Do we have credentials
137 var authenticate = function authenticate(_server, _credentials, _callback) {
138 if (!_credentials) return _callback();
139 // Perform authentication using provided
140 // credentials for this command
141 _server.auth(_credentials.provider, _credentials.db, _credentials.user, _credentials.password, _callback);
142 };
143
144 authenticate(_server, _credentials, function (err) {
145 if (err && options.ignoreError) return resolve({ ok: 1 });
146 // If we had an error return
147 if (err) {
148 _server.destroy();
149 return reject(err);
150 }
151 // Execute command
152 _server.command(_ns, _command, function (err, r) {
153 // Destroy the connection
154 _server.destroy();
155 // Return an error
156 if (err && options.ignoreError) return resolve({ ok: 1 });
157 if (err) return reject(err);
158 // Return the ismaster command
159 resolve(r.result);
160 });
161 });
162 };
163
164 // Create an instance
165
166 s = new CoreServer(opt);
167
168 s.on('error', function (err) {
169 if (options.ignoreError) return resolve({ ok: 1 });
170 if (options.reExecuteOnError) {
171 options.reExecuteOnError = false;
172 return executeCommand(ns, command, credentials, s);
173 }
174
175 reject(err);
176 });
177
178 s.on('close', function (err) {
179 if (options.ignoreError) return resolve({ ok: 1 });
180 if (options.reExecuteOnError) {
181 options.reExecuteOnError = false;
182 return executeCommand(ns, command, credentials, s);
183 }
184
185 reject(err);
186 });
187
188 s.on('timeout', function (err) {
189 if (options.ignoreError) return resolve({ ok: 1 });
190 reject(err);
191 });
192
193 s.on('connect', function (_server) {
194 executeCommand(ns, command, credentials, _server);
195 });
196
197 // Connect
198 s.connect();
199
200 case 15:
201 case 'end':
202 return _context2.stop();
203 }
204 }
205 }, _callee2, this);
206 })).catch(reject);
207 });
208 }
209 }, {
210 key: 'start',
211 value: function start() {
212 var self = this;
213
214 return new Promise(function (resolve, reject) {
215 co(regeneratorRuntime.mark(function _callee3() {
216 var result, version, errors, options, commandOptions, name, commandLine, stdout, stderr;
217 return regeneratorRuntime.wrap(function _callee3$(_context3) {
218 while (1) {
219 switch (_context3.prev = _context3.next) {
220 case 0:
221 _context3.next = 2;
222 return self.discover();
223
224 case 2:
225 result = _context3.sent;
226 version = result.version;
227
228 // All errors found during validation
229
230 errors = [];
231
232 // Do we have any errors
233
234 if (!(errors.length > 0)) {
235 _context3.next = 7;
236 break;
237 }
238
239 return _context3.abrupt('return', reject(errors));
240
241 case 7:
242
243 // Figure out what special options we need to pass into the boot script
244 // Removing any non-compatible parameters etc.
245 if (version[0] == 3 && version[1] >= 0 && version[1] <= 2) {} else if (version[0] == 3 && version[1] >= 2) {} else if (version[0] == 2 && version[1] <= 6) {}
246
247 // Merge in all the options
248 options = clone(self.options);
249
250 // Build command options list
251
252 commandOptions = [];
253
254 // Go over all the options
255
256 for (name in options) {
257 if (options[name] == null) {
258 commandOptions.push(f('--%s', name));
259 } else {
260 commandOptions.push(f('--%s=%s', name, options[name]));
261 }
262 }
263
264 // Command line
265 commandLine = f('%s %s', self.binary, commandOptions.join(' '));
266
267 if (self.logger.isInfo()) {
268 self.logger.info(f('start mongos server [%s]', commandLine));
269 }
270
271 // console.log("---------------- start server")
272 // console.dir(commandLine)
273
274 // Spawn a mongos process
275 self.process = spawn(self.binary, commandOptions);
276
277 // Variables receiving data
278 stdout = '';
279 stderr = '';
280
281 // Get the stdout
282
283 self.process.stdout.on('data', function (data) {
284 stdout += data.toString();
285 // console.log(stdout.toString())
286 //
287 // Only emit event at start
288 if (self.state == 'stopped') {
289 if (stdout.indexOf('waiting for connections') != -1) {
290 if (self.logger.isInfo()) {
291 self.logger.info(f('successfully started mongos proxy [%s]', commandLine));
292 }
293
294 // Mark state as running
295 self.state = 'running';
296 // Resolve
297 resolve();
298 }
299 }
300 });
301
302 // Get the stderr
303 self.process.stderr.on('data', function (data) {
304 stderr += data;
305 });
306
307 // Got an error
308 self.process.on('error', function (err) {
309 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
310 reject(new Error({ error: error, stdout: stdout, stderr: stderr }));
311 });
312
313 // Process terminated
314 self.process.on('close', function (code) {
315 if (self.state == 'stopped' && stdout == '' || code != 0) {
316 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
317 return reject(new Error(f('failed to start mongos with options %s', commandOptions)));
318 }
319
320 self.state = 'stopped';
321 });
322
323 case 20:
324 case 'end':
325 return _context3.stop();
326 }
327 }
328 }, _callee3, this);
329 })).catch(reject);
330 });
331 }
332
333 /*
334 * Retrieve the ismaster for this server
335 */
336
337 }, {
338 key: 'ismaster',
339 value: function ismaster() {
340 var self = this;
341
342 return new Promise(function (resolve, reject) {
343 co(regeneratorRuntime.mark(function _callee4() {
344 var opt, s;
345 return regeneratorRuntime.wrap(function _callee4$(_context4) {
346 while (1) {
347 switch (_context4.prev = _context4.next) {
348 case 0:
349 // Copy the basic options
350 opt = clone(self.clientOptions);
351
352 opt.host = self.options.bind_ip;
353 opt.port = self.options.port;
354 opt.connectionTimeout = 5000;
355 opt.socketTimeout = 5000;
356 opt.pool = 1;
357
358 // Ensure we only connect once and emit any error caught
359 opt.reconnect = false;
360 opt.emitError = true;
361
362 // Create an instance
363 s = new CoreServer(opt);
364 // Add listeners
365
366 s.on('error', function (err) {
367 reject(err);
368 });
369
370 s.on('timeout', function (err) {
371 reject(err);
372 });
373
374 s.on('connect', function (_server) {
375 _server.command('system.$cmd', {
376 ismaster: true
377 }, function (err, r) {
378 // Destroy the connection
379 _server.destroy();
380 // Return an error
381 if (err) return callback(err);
382 // Return the ismaster command
383 resolve(r.result);
384 });
385 });
386
387 // Connect
388 s.connect();
389
390 case 13:
391 case 'end':
392 return _context4.stop();
393 }
394 }
395 }, _callee4, this);
396 })).catch(reject);
397 });
398 }
399
400 /*
401 * Purge the db directory
402 */
403
404 }, {
405 key: 'purge',
406 value: function purge() {
407 var self = this;
408
409 return new Promise(function (resolve, reject) {
410 co(regeneratorRuntime.mark(function _callee5() {
411 return regeneratorRuntime.wrap(function _callee5$(_context5) {
412 while (1) {
413 switch (_context5.prev = _context5.next) {
414 case 0:
415 try {
416 // Delete the dbpath
417 rimraf.sync(self.options.dbpath);
418 } catch (err) {}
419
420 try {
421 // Re-Create the directory
422 mkdirp.sync(self.options.dbpath);
423 } catch (err) {}
424
425 // Return
426 resolve();
427
428 case 3:
429 case 'end':
430 return _context5.stop();
431 }
432 }
433 }, _callee5, this);
434 })).catch(reject);
435 });
436 }
437 }, {
438 key: 'stop',
439 value: function stop(signal) {
440 var self = this;
441 signal = typeof signal == 'number' ? signal : signals.SIGTERM;
442
443 return new Promise(function (resolve, reject) {
444 co(regeneratorRuntime.mark(function _callee6() {
445 return regeneratorRuntime.wrap(function _callee6$(_context6) {
446 while (1) {
447 switch (_context6.prev = _context6.next) {
448 case 0:
449 if (self.logger.isInfo()) {
450 self.logger.info(f('stop mongos proxy process with pid [%s]', self.process.pid));
451 }
452
453 // No process, just resolve
454
455 if (self.process) {
456 _context6.next = 3;
457 break;
458 }
459
460 return _context6.abrupt('return', resolve());
461
462 case 3:
463
464 // Wait for service to stop
465 self.process.on('close', function () {
466 if (self.logger.isInfo()) {
467 self.logger.info(f('stopped mongos proxy process with pid [%s]', self.process.pid));
468 }
469
470 // Set process to stopped
471 self.state = 'stopped';
472 // Return
473 resolve();
474 });
475
476 // Terminate the process
477 self.process.kill(signal);
478
479 case 5:
480 case 'end':
481 return _context6.stop();
482 }
483 }
484 }, _callee6, this);
485 })).catch(reject);
486 });
487 }
488 }, {
489 key: 'restart',
490 value: function restart(purge) {
491 var self = this;
492
493 return new Promise(function (resolve, reject) {
494 co(regeneratorRuntime.mark(function _callee7() {
495 return regeneratorRuntime.wrap(function _callee7$(_context7) {
496 while (1) {
497 switch (_context7.prev = _context7.next) {
498 case 0:
499 _context7.next = 2;
500 return self.stop();
501
502 case 2:
503 if (!purge) {
504 _context7.next = 5;
505 break;
506 }
507
508 _context7.next = 5;
509 return self.purge();
510
511 case 5:
512 _context7.next = 7;
513 return self.start();
514
515 case 7:
516 resolve();
517
518 case 8:
519 case 'end':
520 return _context7.stop();
521 }
522 }
523 }, _callee7, this);
524 })).catch(reject);
525 });
526 }
527 }, {
528 key: 'name',
529 get: function get() {
530 return f('%s:%s', this.options.bind_ip, this.options.port);
531 }
532 }]);
533
534 return Mongos;
535})();
536
537module.exports = Mongos;
538
539// SIGHUP 1 Term Hangup detected on controlling terminal
540// or death of controlling process
541// SIGINT 2 Term Interrupt from keyboard
542// SIGQUIT 3 Core Quit from keyboard
543// SIGILL 4 Core Illegal Instruction
544// SIGABRT 6 Core Abort signal from abort(3)
545// SIGFPE 8 Core Floating point exception
546// SIGKILL 9 Term Kill signal
547// SIGSEGV 11 Core Invalid memory reference
548// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
549// SIGALRM 14 Term Timer signal from alarm(2)
550// SIGTERM 15 Term Termination signal
551// Signal map
552var signals = {
553 1: 'SIGHUP',
554 2: 'SIGINT',
555 3: 'SIGQUIT',
556 4: 'SIGABRT',
557 6: 'SIGABRT',
558 8: 'SIGFPE',
559 9: 'SIGKILL',
560 11: 'SIGSEGV',
561 13: 'SIGPIPE',
562 14: 'SIGALRM',
563 15: 'SIGTERM',
564 17: 'SIGSTOP',
565 19: 'SIGSTOP',
566 23: 'SIGSTOP'
567};