UNPKG

12.7 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Logger = require('./logger'),
8 EventEmitter = require('events'),
9 CoreServer = require('mongodb-core').Server,
10 spawn = require('child_process').spawn;
11
12var Promise = require("bluebird");
13
14var clone = function(o) {
15 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
16}
17
18class Mongos extends EventEmitter {
19 constructor(binary, options, clientOptions) {
20 super();
21 // Ensure options are correct
22 options = options || {};
23 this.options = clone(options);
24
25 // Server state
26 this.state = 'stopped';
27
28 // Create logger instance
29 this.logger = Logger('Mongos', options);
30
31 // Unpack default runtime information
32 this.binary = binary || 'mongos';
33 // Current process
34 this.process = null;
35 // Current command
36 this.command = null;
37 // Credentials store
38 this.credentials = [];
39 // Additional internal client options
40 this.clientOptions = clientOptions || {};
41
42 // Default values for host and port if not set
43 this.options.bind_ip = typeof this.options.bind_ip == 'string' ? this.options.bind_ip : 'localhost';
44 this.options.port = typeof this.options.port == 'number' ? this.options.port : 27017;
45 }
46
47 get name() {
48 return f('%s:%s', this.options.bind_ip, this.options.port);
49 }
50
51 discover() {
52 var self = this;
53
54 return new Promise(function(resolve, reject) {
55 co(function*() {
56 var proc = spawn(self.binary, ['--version']);
57 // Variables receiving data
58 var stdout = '';
59 var stderr = '';
60 // Get the stdout
61 proc.stdout.on('data', function(data) { stdout += data; });
62 // Get the stderr
63 proc.stderr.on('data', function(data) { stderr += data; });
64 // Got an error
65 proc.on('error', function(err) { reject(err); });
66 // Process terminated
67 proc.on('close', function(code) {
68 // Perform version match
69 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
70
71 // Check if we have ssl
72 var sslMatch = stdout.match(/ssl/i)
73
74 // Resolve the server version
75 resolve({
76 version: versionMatch.toString().split('.').map(function(x) {
77 return parseInt(x, 10);
78 }),
79 ssl: sslMatch != null
80 });
81 });
82 }).catch(reject);
83 });
84 }
85
86 executeCommand(ns, command, credentials, options) {
87 var self = this;
88 options = options || {};
89 options = clone(options);
90
91 return new Promise(function(resolve, reject) {
92 co(function*() {
93 // Copy the basic options
94 var opt = clone(self.clientOptions);
95 opt.host = self.options.bind_ip;
96 opt.port = self.options.port;
97 opt.connectionTimeout = 5000;
98 opt.socketTimeout = 5000;
99 opt.pool = 1;
100
101 // Ensure we only connect once and emit any error caught
102 opt.reconnect = false;
103 opt.emitError = true;
104
105 // Execute command
106 var executeCommand = function(_ns, _command, _credentials, _server) {
107 // Do we have credentials
108 var authenticate = function(_server, _credentials, _callback) {
109 if(!_credentials) return _callback();
110 // Perform authentication using provided
111 // credentials for this command
112 _server.auth(
113 _credentials.provider,
114 _credentials.db,
115 _credentials.user,
116 _credentials.password, _callback);
117 }
118
119 authenticate(_server, _credentials, function(err) {
120 if(err && options.ignoreError) return resolve({ok:1});
121 // If we had an error return
122 if(err) {
123 _server.destroy();
124 return reject(err);
125 }
126 // Execute command
127 _server.command(_ns, _command, function(err, r) {
128 // Destroy the connection
129 _server.destroy();
130 // Return an error
131 if(err && options.ignoreError) return resolve({ok:1});
132 if(err) return reject(err);
133 // Return the ismaster command
134 resolve(r.result);
135 });
136 });
137 }
138
139 // Create an instance
140 var s = new CoreServer(opt);
141
142 s.on('error', function(err) {
143 if(options.ignoreError) return resolve({ok:1});
144 if(options.reExecuteOnError) {
145 options.reExecuteOnError = false;
146 return executeCommand(ns, command, credentials, s);
147 }
148
149 reject(err);
150 });
151
152 s.on('close', function(err) {
153 if(options.ignoreError) return resolve({ok:1});
154 if(options.reExecuteOnError) {
155 options.reExecuteOnError = false;
156 return executeCommand(ns, command, credentials, s);
157 }
158
159 reject(err);
160 });
161
162 s.on('timeout', function(err) {
163 if(options.ignoreError) return resolve({ok:1});
164 reject(err);
165 });
166
167 s.on('connect', function(_server) {
168 executeCommand(ns, command, credentials, _server);
169 });
170
171 // Connect
172 s.connect();
173 }).catch(reject);
174 });
175 }
176
177 start() {
178 var self = this;
179
180 return new Promise(function(resolve, reject) {
181 co(function*() {
182 // Get the version numbers
183 var result = yield self.discover();
184 var version = result.version;
185
186 // All errors found during validation
187 var errors = [];
188
189 // Do we have any errors
190 if(errors.length > 0) return reject(errors);
191
192 // Figure out what special options we need to pass into the boot script
193 // Removing any non-compatible parameters etc.
194 if(version[0] == 3 && version[1] >= 0 && version[1] <= 2) {
195 } else if(version[0] == 3 && version[1] >= 2) {
196 } else if(version[0] == 2 && version[1] <= 6) {
197 }
198
199 // Merge in all the options
200 var options = clone(self.options);
201
202 // Build command options list
203 var commandOptions = [];
204
205 // Go over all the options
206 for(var name in options) {
207 if(options[name] == null) {
208 commandOptions.push(f('--%s', name));
209 } else {
210 commandOptions.push(f('--%s=%s', name, options[name]));
211 }
212 }
213
214 // Command line
215 var commandLine = f('%s %s', self.binary, commandOptions.join(' '));
216
217 if(self.logger.isInfo()) {
218 self.logger.info(f('start mongos server [%s]', commandLine));
219 }
220
221 // Emit start event
222 self.emit('state', {
223 event: 'start', topology: 'mongos', cmd: commandLine, options: self.options
224 });
225
226 // Spawn a mongos process
227 self.process = spawn(self.binary, commandOptions);
228
229 // Variables receiving data
230 var stdout = '';
231 var stderr = '';
232
233 // Get the stdout
234 self.process.stdout.on('data', function(data) {
235 stdout += data.toString();
236 self.emit('state', {
237 event: 'stdout', topology: 'mongos', stdout: data.toString(), options: self.options
238 });
239
240 //
241 // Only emit event at start
242 if(self.state == 'stopped') {
243 if(stdout.indexOf('waiting for connections') != -1) {
244 if(self.logger.isInfo()) {
245 self.logger.info(f('successfully started mongos proxy [%s]', commandLine));
246 }
247
248 // Emit running state
249 self.emit('state', {
250 event: 'running', topology: 'mongos', cmd: commandLine, options: self.options
251 });
252
253 // Mark state as running
254 self.state = 'running';
255 // Resolve
256 resolve();
257 }
258 }
259 });
260
261 // Get the stderr
262 self.process.stderr.on('data', function(data) { stderr += data; });
263
264 // Got an error
265 self.process.on('error', function(err) {
266 self.emit('state', {
267 event: 'sterr', topology: 'mongos', sterr: sterr.toString(), options: self.options
268 });
269 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
270 reject(new Error({error:error, stdout: stdout, stderr: stderr}));
271 });
272
273 // Process terminated
274 self.process.on('close', function(code) {
275 if(self.state == 'stopped' && stdout == '' || (code != 0)) {
276 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
277 return reject(new Error(f('failed to start mongos with options %s', commandOptions)))
278 }
279
280 self.state = 'stopped';
281 });
282 }).catch(reject);
283 });
284 }
285
286 /*
287 * Retrieve the ismaster for this server
288 */
289 ismaster() {
290 var self = this;
291
292 return new Promise(function(resolve, reject) {
293 co(function*() {
294 // Copy the basic options
295 var opt = clone(self.clientOptions);
296 opt.host = self.options.bind_ip;
297 opt.port = self.options.port;
298 opt.connectionTimeout = 5000;
299 opt.socketTimeout = 5000;
300 opt.pool = 1;
301
302 // Ensure we only connect once and emit any error caught
303 opt.reconnect = false;
304 opt.emitError = true;
305
306 // Create an instance
307 var s = new CoreServer(opt);
308 // Add listeners
309 s.on('error', function(err) {
310 reject(err);
311 });
312
313 s.on('timeout', function(err) {
314 reject(err);
315 });
316
317 s.on('connect', function(_server) {
318 _server.command('system.$cmd', {
319 ismaster: true
320 }, function(err, r) {
321 // Destroy the connection
322 _server.destroy();
323 // Return an error
324 if(err) return callback(err);
325 // Return the ismaster command
326 resolve(r.result);
327 });
328 });
329
330 // Connect
331 s.connect();
332 }).catch(reject);
333 });
334 }
335
336 /*
337 * Purge the db directory
338 */
339 purge() {
340 var self = this;
341
342 return new Promise(function(resolve, reject) {
343 co(function*() {
344 try {
345 // Delete the dbpath
346 rimraf.sync(self.options.dbpath);
347 } catch(err) {}
348
349 try {
350 // Re-Create the directory
351 mkdirp.sync(self.options.dbpath);
352 } catch(err) {}
353
354 // Return
355 resolve();
356 }).catch(reject);
357 });
358 }
359
360 stop(signal) {
361 var self = this;
362 signal = typeof signal == 'number' ? signal : signals.SIGTERM;
363
364 return new Promise(function(resolve, reject) {
365 co(function*() {
366 if(self.logger.isInfo()) {
367 self.logger.info(f('stop mongos proxy process with pid [%s]', self.process.pid));
368 }
369
370 // No process, just resolve
371 if(!self.process) return resolve();
372
373 // Wait for service to stop
374 self.process.on('close', function() {
375 if(self.logger.isInfo()) {
376 self.logger.info(f('stopped mongos proxy process with pid [%s]', self.process.pid));
377 }
378
379 // Set process to stopped
380 self.state = 'stopped';
381 // Return
382 resolve();
383 });
384
385 // Terminate the process
386 self.process.kill(signal);
387 }).catch(reject);
388 });
389 }
390
391 restart(purge) {
392 var self = this;
393
394 return new Promise(function(resolve, reject) {
395 co(function*() {
396 // Attempt to stop the server
397 yield self.stop();
398 // Do we wish to purge the directory ?
399 if(purge) yield self.purge();
400 // Start process
401 yield self.start();
402 resolve();
403 }).catch(reject);
404 });
405 }
406}
407
408module.exports = Mongos;
409
410// SIGHUP 1 Term Hangup detected on controlling terminal
411// or death of controlling process
412// SIGINT 2 Term Interrupt from keyboard
413// SIGQUIT 3 Core Quit from keyboard
414// SIGILL 4 Core Illegal Instruction
415// SIGABRT 6 Core Abort signal from abort(3)
416// SIGFPE 8 Core Floating point exception
417// SIGKILL 9 Term Kill signal
418// SIGSEGV 11 Core Invalid memory reference
419// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
420// SIGALRM 14 Term Timer signal from alarm(2)
421// SIGTERM 15 Term Termination signal
422// Signal map
423var signals = {
424 1: 'SIGHUP',
425 2: 'SIGINT',
426 3: 'SIGQUIT',
427 4: 'SIGABRT',
428 6: 'SIGABRT',
429 8: 'SIGFPE',
430 9: 'SIGKILL',
431 11: 'SIGSEGV',
432 13: 'SIGPIPE',
433 14: 'SIGALRM',
434 15: 'SIGTERM',
435 17: 'SIGSTOP',
436 19: 'SIGSTOP',
437 23: 'SIGSTOP'
438};