UNPKG

14.2 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Logger = require('./logger'),
8 CoreServer = require('mongodb-core').Server,
9 spawn = require('child_process').spawn;
10
11var clone = function(o) {
12 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
13}
14
15class Server {
16 constructor(binary, options, clientOptions) {
17 options = options || {};
18 this.options = clone(options);
19
20 // Server state
21 this.state = 'stopped';
22
23 // Create logger instance
24 this.logger = Logger('Server', options);
25
26 // Unpack default runtime information
27 this.binary = binary || 'mongod';
28 // Current process
29 this.process = null;
30 // Current command
31 this.command = null;
32 // Credentials store
33 this.credentials = [];
34 // Additional internal client options
35 this.clientOptions = clientOptions || {};
36
37 // Default values for host and port if not set
38 this.options.bind_ip = typeof this.options.bind_ip == 'string' ? this.options.bind_ip : 'localhost';
39 this.options.port = typeof this.options.port == 'number' ? this.options.port : 27017;
40 }
41
42 get config() {
43 return clone(this.options);
44 }
45
46 get host() {
47 return this.options.bind_ip;
48 }
49
50 get port() {
51 return this.options.port;
52 }
53
54 get name() {
55 return f('%s:%s', this.options.bind_ip, this.options.port);
56 }
57
58 discover() {
59 var self = this;
60
61 return new Promise(function(resolve, reject) {
62 co(function*() {
63 var proc = spawn(self.binary, ['--version']);
64 // Variables receiving data
65 var stdout = '';
66 var stderr = '';
67 // Get the stdout
68 proc.stdout.on('data', function(data) { stdout += data; });
69 // Get the stderr
70 proc.stderr.on('data', function(data) { stderr += data; });
71 // Got an error
72 proc.on('error', function(err) { reject(err); });
73 // Process terminated
74 proc.on('close', function(code) {
75 // Perform version match
76 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
77
78 // Check if we have ssl
79 var sslMatch = stdout.match(/ssl/i)
80
81 // Resolve the server version
82 resolve({
83 version: versionMatch.toString().split('.').map(function(x) {
84 return parseInt(x, 10);
85 }),
86 ssl: sslMatch != null
87 })
88 });
89 }).catch(reject);
90 });
91 }
92
93 instance(credentials, options) {
94 var self = this;
95 options = options || {};
96 options = clone(options);
97
98 return new Promise(function(resolve, reject) {
99 co(function*() {
100 // Copy the basic options
101 var opt = clone(self.clientOptions);
102 opt.host = self.options.bind_ip;
103 opt.port = self.options.port;
104 opt.connectionTimeout = 5000;
105 opt.socketTimeout = 5000;
106 opt.pool = 1;
107
108 // Ensure we only connect once and emit any error caught
109 opt.reconnect = false;
110 opt.emitError = true;
111
112 // Create an instance
113 var s = new CoreServer(opt);
114 s.on('error', function(err) {
115 reject(err);
116 });
117
118 s.on('close', function(err) {
119 reject(err);
120 });
121
122 s.on('timeout', function(err) {
123 reject(err);
124 });
125
126 s.on('connect', function(_server) {
127 // Do we have credentials
128 var authenticate = function(_server, _credentials, _callback) {
129 if(!_credentials) return _callback();
130 // Perform authentication using provided
131 // credentials for this command
132 _server.auth(
133 _credentials.provider,
134 _credentials.db,
135 _credentials.user,
136 _credentials.password, _callback);
137 }
138
139 // Perform any necessary authentication
140 authenticate(_server, credentials, function(err) {
141 if(err) return reject(err);
142 resolve(_server);
143 });
144 });
145
146 // Connect
147 s.connect();
148 });
149 });
150 }
151
152 executeCommand(ns, command, credentials, options) {
153 var self = this;
154 options = options || {};
155 options = clone(options);
156
157 return new Promise(function(resolve, reject) {
158 co(function*() {
159 // Copy the basic options
160 var opt = clone(self.clientOptions);
161 opt.host = self.options.bind_ip;
162 opt.port = self.options.port;
163 opt.connectionTimeout = 5000;
164 opt.socketTimeout = 0;
165 opt.pool = 1;
166
167 // Ensure we only connect once and emit any error caught
168 opt.reconnect = false;
169 opt.emitError = true;
170
171 // Execute command
172 var executeCommand = function(_ns, _command, _credentials, _server) {
173 // Do we have credentials
174 var authenticate = function(_server, _credentials, _callback) {
175 if(!_credentials) return _callback();
176 // Perform authentication using provided
177 // credentials for this command
178 _server.auth(
179 _credentials.provider,
180 _credentials.db,
181 _credentials.user,
182 _credentials.password, _callback);
183 }
184
185 authenticate(_server, _credentials, function(err) {
186 if(err && options.ignoreError) return resolve({ok:1});
187 // If we had an error return
188 if(err) {
189 _server.destroy();
190 return reject(err);
191 }
192
193 // Execute command
194 _server.command(_ns, _command, function(err, r) {
195 // Destroy the connection
196 _server.destroy();
197 // Return an error
198 if(err && options.ignoreError) return resolve({ok:1});
199 if(err) return reject(err);
200 // Return the ismaster command
201 resolve(r.result);
202 });
203 });
204 }
205
206 // Create an instance
207 var s = new CoreServer(opt);
208
209 s.on('error', function(err) {
210 if(options.ignoreError) return resolve({ok:1});
211 if(options.reExecuteOnError) {
212 options.reExecuteOnError = false;
213 return executeCommand(ns, command, credentials, s);
214 }
215
216 reject(err);
217 });
218
219 s.on('close', function(err) {
220 if(options.ignoreError) return resolve({ok:1});
221 if(options.reExecuteOnError) {
222 options.reExecuteOnError = false;
223 return executeCommand(ns, command, credentials, s);
224 }
225
226 reject(err);
227 });
228
229 s.on('timeout', function(err) {
230 if(options.ignoreError) return resolve({ok:1});
231 reject(err);
232 });
233
234 s.on('connect', function(_server) {
235 executeCommand(ns, command, credentials, _server);
236 });
237
238 // Connect
239 s.connect();
240 }).catch(function(err) {
241 if(options.ignoreError) return resolve({ok:1});
242 reject(err);
243 });
244 });
245 }
246
247 start() {
248 var self = this;
249
250 return new Promise(function(resolve, reject) {
251 co(function*() {
252 // Get the version numbers
253 var result = yield self.discover();
254 var version = result.version;
255
256 // All errors found during validation
257 var errors = [];
258
259 // Ensure basic parameters
260 if(!self.options.dbpath) {
261 errors.push(new Error('dbpath is required'));
262 }
263
264 // Do we have any errors
265 if(errors.length > 0) return reject(errors);
266
267 // Figure out what special options we need to pass into the boot script
268 // Removing any non-compatible parameters etc.
269 if(version[0] == 3 && version[1] >= 0 && version[1] <= 2) {
270 } else if(version[0] == 3 && version[1] >= 2) {
271 } else if(version[0] == 2 && version[1] <= 6) {
272 }
273
274 // Merge in all the options
275 var options = clone(self.options);
276
277 // Build command options list
278 var commandOptions = [];
279
280 // Do we have a 2.2 server, then we don't support setParameter
281 if(version[0] == 2 && version[1] == 2) {
282 delete options['setParameter'];
283 }
284
285 // Go over all the options
286 for(var name in options) {
287 if(options[name] == null) {
288 commandOptions.push(f('--%s', name));
289 } else if(Array.isArray(options[name])) {
290 // We have an array of a specific option f.ex --setParameter
291 for(var i = 0; i < options[name].length; i++) {
292 var o = options[name][i];
293
294 if(o == null) {
295 commandOptions.push(f('--%s', name));
296 } else {
297 commandOptions.push(f('--%s=%s', name, options[name][i]));
298 }
299 }
300 } else {
301 commandOptions.push(f('--%s=%s', name, options[name]));
302 }
303 }
304
305 // Command line
306 var commandLine = f('%s %s', self.binary, commandOptions.join(' '));
307
308 if(self.logger.isInfo()) {
309 self.logger.info(f('started mongod with [%s]', commandLine));
310 }
311
312 // Spawn a mongod process
313 self.process = spawn(self.binary, commandOptions);
314
315 // Variables receiving data
316 var stdout = '';
317 var stderr = '';
318
319 // Get the stdout
320 self.process.stdout.on('data', function(data) {
321 stdout += data.toString();
322
323 //
324 // Only emit event at start
325 if(self.state == 'stopped') {
326 if(stdout.indexOf('waiting for connections') != -1
327 || stdout.indexOf('connection accepted') != -1) {
328 // Mark state as running
329 self.state = 'running';
330 // Resolve
331 resolve();
332 }
333 }
334 });
335
336 // Get the stderr
337 self.process.stderr.on('data', function(data) { stderr += data; });
338
339 // Got an error
340 self.process.on('error', function(err) {
341 reject(new Error({error:error, stdout: stdout, stderr: stderr}));
342 });
343
344 // Process terminated
345 self.process.on('close', function(code) {
346 if(self.state == 'stopped' && stdout == '' || (code != 0)) {
347 return reject(new Error(f('failed to start mongod with options %s', commandOptions)))
348 }
349
350 self.state = 'stopped';
351 });
352 }).catch(reject);
353 });
354 }
355
356 /*
357 * Retrieve the ismaster for this server
358 */
359 ismaster() {
360 var self = this;
361
362 return new Promise(function(resolve, reject) {
363 co(function*() {
364 // Copy the basic options
365 var opt = clone(self.clientOptions);
366 opt.host = self.options.bind_ip;
367 opt.port = self.options.port;
368 opt.connectionTimeout = 5000;
369 opt.socketTimeout = 5000;
370 opt.pool = 1;
371
372 // Ensure we only connect once and emit any error caught
373 opt.reconnect = false;
374 opt.emitError = true;
375
376 // Create an instance
377 var s = new CoreServer(opt);
378 // Add listeners
379 s.on('error', function(err) {
380 reject(err);
381 });
382
383 s.on('close', function(err) {
384 reject(err);
385 });
386
387 s.on('timeout', function(err) {
388 reject(err);
389 });
390
391 s.on('connect', function(_server) {
392 _server.command('system.$cmd', {
393 ismaster: true
394 }, function(err, r) {
395 // Destroy the connection
396 _server.destroy();
397 // Return an error
398 if(err) return callback(err);
399 // Return the ismaster command
400 resolve(r.result);
401 });
402 });
403
404 // Connect
405 s.connect();
406 }).catch(reject);
407 });
408 }
409
410 /*
411 * Purge the db directory
412 */
413 purge() {
414 var self = this;
415
416 return new Promise(function(resolve, reject) {
417 co(function*() {
418 try {
419 // Delete the dbpath
420 rimraf.sync(self.options.dbpath);
421 } catch(err) {}
422
423 try {
424 // Re-Create the directory
425 mkdirp.sync(self.options.dbpath);
426 } catch(err) {}
427
428 // Return
429 resolve();
430 }).catch(reject);
431 });
432 }
433
434 stop(signal) {
435 var self = this;
436 signal = typeof signal == 'number' ? signal : signals.SIGTERM;
437
438 return new Promise(function(resolve, reject) {
439 co(function*() {
440 // No process, just resolve
441 if(!self.process) {
442 // Set process to stopped
443 self.state = 'stopped';
444 // Return
445 return resolve();
446 }
447
448 // Wait for service to stop
449 self.process.on('close', function() {
450 // Set process to stopped
451 self.state = 'stopped';
452 // Return
453 resolve();
454 });
455
456 // Terminate the process
457 self.process.kill(signal);
458 }).catch(reject);
459 });
460 }
461
462 restart(purge) {
463 var self = this;
464
465 return new Promise(function(resolve, reject) {
466 co(function*() {
467 // Attempt to stop the server
468 yield self.stop();
469 // Do we wish to purge the directory ?
470 if(purge) yield self.purge();
471 // Start process
472 yield self.start();
473 resolve();
474 }).catch(reject);
475 });
476 }
477}
478
479module.exports = Server;
480
481// SIGHUP 1 Term Hangup detected on controlling terminal
482// or death of controlling process
483// SIGINT 2 Term Interrupt from keyboard
484// SIGQUIT 3 Core Quit from keyboard
485// SIGILL 4 Core Illegal Instruction
486// SIGABRT 6 Core Abort signal from abort(3)
487// SIGFPE 8 Core Floating point exception
488// SIGKILL 9 Term Kill signal
489// SIGSEGV 11 Core Invalid memory reference
490// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
491// SIGALRM 14 Term Timer signal from alarm(2)
492// SIGTERM 15 Term Termination signal
493// Signal map
494var signals = {
495 1: 'SIGHUP',
496 2: 'SIGINT',
497 3: 'SIGQUIT',
498 4: 'SIGABRT',
499 6: 'SIGABRT',
500 8: 'SIGFPE',
501 9: 'SIGKILL',
502 11: 'SIGSEGV',
503 13: 'SIGPIPE',
504 14: 'SIGALRM',
505 15: 'SIGTERM',
506 17: 'SIGSTOP',
507 19: 'SIGSTOP',
508 23: 'SIGSTOP'
509};