UNPKG

15.3 kBJavaScriptView Raw
1"use strict"
2
3// var Promise = require('es6-promise').Promise;
4// require('es6-promise').polyfill();
5var Promise = require("bluebird");
6
7var co = require('co'),
8 f = require('util').format,
9 mkdirp = require('mkdirp'),
10 rimraf = require('rimraf'),
11 Logger = require('./logger'),
12 EventEmitter = require('events'),
13 CoreServer = require('mongodb-core').Server,
14 spawn = require('child_process').spawn;
15
16var clone = function(o) {
17 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
18}
19
20class Server extends EventEmitter {
21 constructor(binary, options, clientOptions) {
22 super();
23 options = options || {};
24 this.options = clone(options);
25
26 // Server state
27 this.state = 'stopped';
28
29 // Create logger instance
30 this.logger = Logger('Server', options);
31
32 // Unpack default runtime information
33 this.binary = binary || 'mongod';
34 // Current process
35 this.process = null;
36 // Current command
37 this.command = null;
38 // Credentials store
39 this.credentials = [];
40 // Additional internal client options
41 this.clientOptions = clientOptions || {};
42
43 // Default values for host and port if not set
44 this.options.bind_ip = typeof this.options.bind_ip == 'string' ? this.options.bind_ip : 'localhost';
45 this.options.port = typeof this.options.port == 'number' ? this.options.port : 27017;
46 }
47
48 get config() {
49 return clone(this.options);
50 }
51
52 get host() {
53 return this.options.bind_ip;
54 }
55
56 get port() {
57 return this.options.port;
58 }
59
60 get name() {
61 return f('%s:%s', this.options.bind_ip, this.options.port);
62 }
63
64 discover() {
65 var self = this;
66
67 return new Promise(function(resolve, reject) {
68 co(function*() {
69 var proc = spawn(self.binary, ['--version']);
70
71 // Do we have a valid proc
72 if(!proc || !proc.stdout || !proc.stderr) {
73 return reject(new Error(f('failed to start [%s --version]', self.binary)));
74 }
75
76 // Variables receiving data
77 var stdout = '';
78 var stderr = '';
79 // Get the stdout
80 proc.stdout.on('data', function(data) { stdout += data; });
81 // Get the stderr
82 proc.stderr.on('data', function(data) { stderr += data; });
83 // Got an error
84 proc.on('error', function(err) { reject(err); });
85 // Process terminated
86 proc.on('close', function(code) {
87 // Perform version match
88 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
89
90 // Check if we have ssl
91 var sslMatch = stdout.match(/ssl/i)
92
93 // Resolve the server version
94 resolve({
95 version: versionMatch.toString().split('.').map(function(x) {
96 return parseInt(x, 10);
97 }),
98 ssl: sslMatch != null
99 })
100 });
101 }).catch(reject);
102 });
103 }
104
105 instance(credentials, options) {
106 var self = this;
107 options = options || {};
108 options = clone(options);
109
110 return new Promise(function(resolve, reject) {
111 co(function*() {
112 // Copy the basic options
113 var opt = clone(self.clientOptions);
114 opt.host = self.options.bind_ip;
115 opt.port = self.options.port;
116 opt.connectionTimeout = 5000;
117 opt.socketTimeout = 5000;
118 opt.pool = 1;
119
120 // Ensure we only connect once and emit any error caught
121 opt.reconnect = false;
122 opt.emitError = true;
123
124 // Create an instance
125 var s = new CoreServer(opt);
126 s.on('error', function(err) {
127 reject(err);
128 });
129
130 s.on('close', function(err) {
131 reject(err);
132 });
133
134 s.on('timeout', function(err) {
135 reject(err);
136 });
137
138 s.on('connect', function(_server) {
139 // Do we have credentials
140 var authenticate = function(_server, _credentials, _callback) {
141 if(!_credentials) return _callback();
142 // Perform authentication using provided
143 // credentials for this command
144 _server.auth(
145 _credentials.provider,
146 _credentials.db,
147 _credentials.user,
148 _credentials.password, _callback);
149 }
150
151 // Perform any necessary authentication
152 authenticate(_server, credentials, function(err) {
153 if(err) return reject(err);
154 resolve(_server);
155 });
156 });
157
158 // Connect
159 s.connect();
160 });
161 });
162 }
163
164 executeCommand(ns, command, credentials, options) {
165 var self = this;
166 options = options || {};
167 options = clone(options);
168
169 return new Promise(function(resolve, reject) {
170 co(function*() {
171 // Copy the basic options
172 var opt = clone(self.clientOptions);
173 opt.host = self.options.bind_ip;
174 opt.port = self.options.port;
175 opt.connectionTimeout = 5000;
176 opt.socketTimeout = 0;
177 opt.pool = 1;
178
179 // Ensure we only connect once and emit any error caught
180 opt.reconnect = false;
181 opt.emitError = true;
182
183 // Execute command
184 var executeCommand = function(_ns, _command, _credentials, _server) {
185 // Do we have credentials
186 var authenticate = function(_server, _credentials, _callback) {
187 if(!_credentials) return _callback();
188 // Perform authentication using provided
189 // credentials for this command
190 _server.auth(
191 _credentials.provider,
192 _credentials.db,
193 _credentials.user,
194 _credentials.password, _callback);
195 }
196
197 authenticate(_server, _credentials, function(err) {
198 if(err && options.ignoreError) return resolve({ok:1});
199 // If we had an error return
200 if(err) {
201 _server.destroy();
202 return reject(err);
203 }
204
205 // Execute command
206 _server.command(_ns, _command, function(err, r) {
207 // Destroy the connection
208 _server.destroy();
209 // Return an error
210 if(err && options.ignoreError) return resolve({ok:1});
211 if(err) return reject(err);
212 // Return the ismaster command
213 resolve(r.result);
214 });
215 });
216 }
217
218 // Create an instance
219 var s = new CoreServer(opt);
220
221 s.on('error', function(err) {
222 if(options.ignoreError) return resolve({ok:1});
223 if(options.reExecuteOnError) {
224 options.reExecuteOnError = false;
225 return executeCommand(ns, command, credentials, s);
226 }
227
228 reject(err);
229 });
230
231 s.on('close', function(err) {
232 if(options.ignoreError) return resolve({ok:1});
233 if(options.reExecuteOnError) {
234 options.reExecuteOnError = false;
235 return executeCommand(ns, command, credentials, s);
236 }
237
238 reject(err);
239 });
240
241 s.on('timeout', function(err) {
242 if(options.ignoreError) return resolve({ok:1});
243 reject(err);
244 });
245
246 s.on('connect', function(_server) {
247 executeCommand(ns, command, credentials, _server);
248 });
249
250 // Connect
251 s.connect();
252 }).catch(function(err) {
253 if(options.ignoreError) return resolve({ok:1});
254 reject(err);
255 });
256 });
257 }
258
259 start() {
260 var self = this;
261
262 return new Promise(function(resolve, reject) {
263 co(function*() {
264 // Get the version numbers
265 var result = yield self.discover();
266 var version = result.version;
267
268 // All errors found during validation
269 var errors = [];
270
271 // Ensure basic parameters
272 if(!self.options.dbpath) {
273 errors.push(new Error('dbpath is required'));
274 }
275
276 // Do we have any errors
277 if(errors.length > 0) return reject(errors);
278
279 // Figure out what special options we need to pass into the boot script
280 // Removing any non-compatible parameters etc.
281 if(version[0] == 3 && version[1] >= 0 && version[1] <= 2) {
282 } else if(version[0] == 3 && version[1] >= 2) {
283 } else if(version[0] == 2 && version[1] <= 6) {
284 }
285
286 // Merge in all the options
287 var options = clone(self.options);
288
289 // Build command options list
290 var commandOptions = [];
291
292 // Do we have a 2.2 server, then we don't support setParameter
293 if(version[0] == 2 && version[1] == 2) {
294 delete options['setParameter'];
295 }
296
297 // Go over all the options
298 for(var name in options) {
299 if(options[name] == null) {
300 commandOptions.push(f('--%s', name));
301 } else if(Array.isArray(options[name])) {
302 // We have an array of a specific option f.ex --setParameter
303 for(var i = 0; i < options[name].length; i++) {
304 var o = options[name][i];
305
306 if(o == null) {
307 commandOptions.push(f('--%s', name));
308 } else {
309 commandOptions.push(f('--%s=%s', name, options[name][i]));
310 }
311 }
312 } else {
313 commandOptions.push(f('--%s=%s', name, options[name]));
314 }
315 }
316
317 // Command line
318 var commandLine = f('%s %s', self.binary, commandOptions.join(' '));
319 // Emit start event
320 self.emit('state', {
321 event: 'start', topology: 'server', cmd: commandLine, options: self.options
322 });
323
324 if(self.logger.isInfo()) {
325 self.logger.info(f('started mongod with [%s]', commandLine));
326 }
327
328 // Spawn a mongod process
329 self.process = spawn(self.binary, commandOptions);
330
331 // Variables receiving data
332 var stdout = '';
333 var stderr = '';
334
335 // Get the stdout
336 self.process.stdout.on('data', function(data) {
337 stdout += data.toString();
338 self.emit('state', {
339 event: 'stdout', topology: 'server', stdout: data.toString(), options: self.options
340 });
341
342 //
343 // Only emit event at start
344 if(self.state == 'stopped') {
345 if(stdout.indexOf('waiting for connections') != -1
346 || stdout.indexOf('connection accepted') != -1) {
347 // Mark state as running
348 self.state = 'running';
349
350 // Emit start event
351 self.emit('state', {
352 event: 'running', topology: 'server', cmd: commandLine, options: self.options
353 });
354
355 // Resolve
356 resolve();
357 }
358 }
359 });
360
361 // Get the stderr
362 self.process.stderr.on('data', function(data) { stderr += data; });
363
364 // Got an error
365 self.process.on('error', function(err) {
366 self.emit('state', {
367 event: 'sterr', topology: 'server', stdout: stdout, sterr: sterr.toString(), options: self.options
368 });
369 reject(new Error({error:error, stdout: stdout, stderr: stderr}));
370 });
371
372 // Process terminated
373 self.process.on('close', function(code) {
374 if((self.state == 'stopped' && stdout == '') || (code != 0)) {
375 return reject(new Error(f('failed to start mongod with options %s\n%s', commandOptions, stdout)))
376 }
377
378 self.state = 'stopped';
379 });
380 }).catch(reject);
381 });
382 }
383
384 /*
385 * Retrieve the ismaster for this server
386 */
387 ismaster() {
388 var self = this;
389
390 return new Promise(function(resolve, reject) {
391 co(function*() {
392 // Copy the basic options
393 var opt = clone(self.clientOptions);
394 opt.host = self.options.bind_ip;
395 opt.port = self.options.port;
396 opt.connectionTimeout = 5000;
397 opt.socketTimeout = 5000;
398 opt.pool = 1;
399
400 // Ensure we only connect once and emit any error caught
401 opt.reconnect = false;
402 opt.emitError = true;
403
404 // Create an instance
405 var s = new CoreServer(opt);
406 // Add listeners
407 s.on('error', function(err) {
408 reject(err);
409 });
410
411 s.on('close', function(err) {
412 reject(err);
413 });
414
415 s.on('timeout', function(err) {
416 reject(err);
417 });
418
419 s.on('connect', function(_server) {
420 _server.command('system.$cmd', {
421 ismaster: true
422 }, function(err, r) {
423 // Destroy the connection
424 _server.destroy();
425 // Return an error
426 if(err) return reject(err);
427 // Return the ismaster command
428 resolve(r.result);
429 });
430 });
431
432 // Connect
433 s.connect();
434 }).catch(reject);
435 });
436 }
437
438 /*
439 * Purge the db directory
440 */
441 purge() {
442 var self = this;
443
444 return new Promise(function(resolve, reject) {
445 co(function*() {
446 try {
447 // Delete the dbpath
448 rimraf.sync(self.options.dbpath);
449 } catch(err) {}
450
451 try {
452 // Re-Create the directory
453 mkdirp.sync(self.options.dbpath);
454 } catch(err) {}
455
456 // Return
457 resolve();
458 }).catch(reject);
459 });
460 }
461
462 stop(signal) {
463 var self = this;
464 signal = typeof signal == 'number' ? signals[signal] : signals['15'];
465 return new Promise(function(resolve, reject) {
466 co(function*() {
467 // No process, just resolve
468 if(!self.process || self.state == 'stopped') {
469 // Set process to stopped
470 self.state = 'stopped';
471 // Return
472 return resolve();
473 }
474
475 // Wait for service to stop
476 self.process.on('close', function() {
477 // Set process to stopped
478 self.state = 'stopped';
479 // Return
480 resolve();
481 });
482
483 // Terminate the process
484 self.process.kill(signal);
485 }).catch(reject);
486 });
487 }
488
489 restart(purge) {
490 var self = this;
491
492 return new Promise(function(resolve, reject) {
493 co(function*() {
494 // Attempt to stop the server
495 yield self.stop();
496 // Do we wish to purge the directory ?
497 if(purge) yield self.purge();
498 // Start process
499 yield self.start();
500 resolve();
501 }).catch(reject);
502 });
503 }
504}
505
506module.exports = Server;
507
508// SIGHUP 1 Term Hangup detected on controlling terminal
509// or death of controlling process
510// SIGINT 2 Term Interrupt from keyboard
511// SIGQUIT 3 Core Quit from keyboard
512// SIGILL 4 Core Illegal Instruction
513// SIGABRT 6 Core Abort signal from abort(3)
514// SIGFPE 8 Core Floating point exception
515// SIGKILL 9 Term Kill signal
516// SIGSEGV 11 Core Invalid memory reference
517// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
518// SIGALRM 14 Term Timer signal from alarm(2)
519// SIGTERM 15 Term Termination signal
520// Signal map
521var signals = {
522 1: 'SIGHUP',
523 2: 'SIGINT',
524 3: 'SIGQUIT',
525 4: 'SIGABRT',
526 6: 'SIGABRT',
527 8: 'SIGFPE',
528 9: 'SIGKILL',
529 11: 'SIGSEGV',
530 13: 'SIGPIPE',
531 14: 'SIGALRM',
532 15: 'SIGTERM',
533 17: 'SIGSTOP',
534 19: 'SIGSTOP',
535 23: 'SIGSTOP'
536};