UNPKG

12.3 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Logger = require('./logger'),
8 CoreServer = require('mongodb-core').Server,
9 spawn = require('child_process').spawn;
10
11var clone = function(o) {
12 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
13}
14
15class Mongos {
16 constructor(binary, options, clientOptions) {
17 options = options || {};
18 this.options = clone(options);
19
20 // Server state
21 this.state = 'stopped';
22
23 // Create logger instance
24 this.logger = Logger('Mongos', options);
25
26 // Unpack default runtime information
27 this.binary = binary || 'mongos';
28 // Current process
29 this.process = null;
30 // Current command
31 this.command = null;
32 // Credentials store
33 this.credentials = [];
34 // Additional internal client options
35 this.clientOptions = clientOptions || {};
36
37 // Default values for host and port if not set
38 this.options.bind_ip = typeof this.options.bind_ip == 'string' ? this.options.bind_ip : 'localhost';
39 this.options.port = typeof this.options.port == 'number' ? this.options.port : 27017;
40 }
41
42 get name() {
43 return f('%s:%s', this.options.bind_ip, this.options.port);
44 }
45
46 discover() {
47 var self = this;
48
49 return new Promise(function(resolve, reject) {
50 co(function*() {
51 var proc = spawn(self.binary, ['--version']);
52 // Variables receiving data
53 var stdout = '';
54 var stderr = '';
55 // Get the stdout
56 proc.stdout.on('data', function(data) { stdout += data; });
57 // Get the stderr
58 proc.stderr.on('data', function(data) { stderr += data; });
59 // Got an error
60 proc.on('error', function(err) { reject(err); });
61 // Process terminated
62 proc.on('close', function(code) {
63 // Perform version match
64 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
65
66 // Check if we have ssl
67 var sslMatch = stdout.match(/ssl/i)
68
69 // Resolve the server version
70 resolve({
71 version: versionMatch.toString().split('.').map(function(x) {
72 return parseInt(x, 10);
73 }),
74 ssl: sslMatch != null
75 });
76 });
77 }).catch(reject);
78 });
79 }
80
81 executeCommand(ns, command, credentials, options) {
82 var self = this;
83 options = options || {};
84 options = clone(options);
85
86 return new Promise(function(resolve, reject) {
87 co(function*() {
88 // Copy the basic options
89 var opt = clone(self.clientOptions);
90 opt.host = self.options.bind_ip;
91 opt.port = self.options.port;
92 opt.connectionTimeout = 5000;
93 opt.socketTimeout = 5000;
94 opt.pool = 1;
95
96 // Ensure we only connect once and emit any error caught
97 opt.reconnect = false;
98 opt.emitError = true;
99
100 // Execute command
101 var executeCommand = function(_ns, _command, _credentials, _server) {
102 // Do we have credentials
103 var authenticate = function(_server, _credentials, _callback) {
104 if(!_credentials) return _callback();
105 // Perform authentication using provided
106 // credentials for this command
107 _server.auth(
108 _credentials.provider,
109 _credentials.db,
110 _credentials.user,
111 _credentials.password, _callback);
112 }
113
114 authenticate(_server, _credentials, function(err) {
115 if(err && options.ignoreError) return resolve({ok:1});
116 // If we had an error return
117 if(err) {
118 _server.destroy();
119 return reject(err);
120 }
121 // Execute command
122 _server.command(_ns, _command, function(err, r) {
123 // Destroy the connection
124 _server.destroy();
125 // Return an error
126 if(err && options.ignoreError) return resolve({ok:1});
127 if(err) return reject(err);
128 // Return the ismaster command
129 resolve(r.result);
130 });
131 });
132 }
133
134 // Create an instance
135 var s = new CoreServer(opt);
136
137 s.on('error', function(err) {
138 if(options.ignoreError) return resolve({ok:1});
139 if(options.reExecuteOnError) {
140 options.reExecuteOnError = false;
141 return executeCommand(ns, command, credentials, s);
142 }
143
144 reject(err);
145 });
146
147 s.on('close', function(err) {
148 if(options.ignoreError) return resolve({ok:1});
149 if(options.reExecuteOnError) {
150 options.reExecuteOnError = false;
151 return executeCommand(ns, command, credentials, s);
152 }
153
154 reject(err);
155 });
156
157 s.on('timeout', function(err) {
158 if(options.ignoreError) return resolve({ok:1});
159 reject(err);
160 });
161
162 s.on('connect', function(_server) {
163 executeCommand(ns, command, credentials, _server);
164 });
165
166 // Connect
167 s.connect();
168 }).catch(reject);
169 });
170 }
171
172 start() {
173 var self = this;
174
175 return new Promise(function(resolve, reject) {
176 co(function*() {
177 // Get the version numbers
178 var result = yield self.discover();
179 var version = result.version;
180
181 // All errors found during validation
182 var errors = [];
183
184 // Do we have any errors
185 if(errors.length > 0) return reject(errors);
186
187 // Figure out what special options we need to pass into the boot script
188 // Removing any non-compatible parameters etc.
189 if(version[0] == 3 && version[1] >= 0 && version[1] <= 2) {
190 } else if(version[0] == 3 && version[1] >= 2) {
191 } else if(version[0] == 2 && version[1] <= 6) {
192 }
193
194 // Merge in all the options
195 var options = clone(self.options);
196
197 // Build command options list
198 var commandOptions = [];
199
200 // Go over all the options
201 for(var name in options) {
202 if(options[name] == null) {
203 commandOptions.push(f('--%s', name));
204 } else {
205 commandOptions.push(f('--%s=%s', name, options[name]));
206 }
207 }
208
209 // Command line
210 var commandLine = f('%s %s', self.binary, commandOptions.join(' '));
211
212 if(self.logger.isInfo()) {
213 self.logger.info(f('start mongos server [%s]', commandLine));
214 }
215
216 // console.log("---------------- start server")
217 // console.dir(commandLine)
218
219 // Spawn a mongos process
220 self.process = spawn(self.binary, commandOptions);
221
222 // Variables receiving data
223 var stdout = '';
224 var stderr = '';
225
226 // Get the stdout
227 self.process.stdout.on('data', function(data) {
228 stdout += data.toString();
229 // console.log(stdout.toString())
230 //
231 // Only emit event at start
232 if(self.state == 'stopped') {
233 if(stdout.indexOf('waiting for connections') != -1) {
234 if(self.logger.isInfo()) {
235 self.logger.info(f('successfully started mongos proxy [%s]', commandLine));
236 }
237
238 // Mark state as running
239 self.state = 'running';
240 // Resolve
241 resolve();
242 }
243 }
244 });
245
246 // Get the stderr
247 self.process.stderr.on('data', function(data) { stderr += data; });
248
249 // Got an error
250 self.process.on('error', function(err) {
251 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
252 reject(new Error({error:error, stdout: stdout, stderr: stderr}));
253 });
254
255 // Process terminated
256 self.process.on('close', function(code) {
257 if(self.state == 'stopped' && stdout == '' || (code != 0)) {
258 self.logger.error(f('failed to start mongos instance [%s]', commandLine));
259 return reject(new Error(f('failed to start mongos with options %s', commandOptions)))
260 }
261
262 self.state = 'stopped';
263 });
264 }).catch(reject);
265 });
266 }
267
268 /*
269 * Retrieve the ismaster for this server
270 */
271 ismaster() {
272 var self = this;
273
274 return new Promise(function(resolve, reject) {
275 co(function*() {
276 // Copy the basic options
277 var opt = clone(self.clientOptions);
278 opt.host = self.options.bind_ip;
279 opt.port = self.options.port;
280 opt.connectionTimeout = 5000;
281 opt.socketTimeout = 5000;
282 opt.pool = 1;
283
284 // Ensure we only connect once and emit any error caught
285 opt.reconnect = false;
286 opt.emitError = true;
287
288 // Create an instance
289 var s = new CoreServer(opt);
290 // Add listeners
291 s.on('error', function(err) {
292 reject(err);
293 });
294
295 s.on('timeout', function(err) {
296 reject(err);
297 });
298
299 s.on('connect', function(_server) {
300 _server.command('system.$cmd', {
301 ismaster: true
302 }, function(err, r) {
303 // Destroy the connection
304 _server.destroy();
305 // Return an error
306 if(err) return callback(err);
307 // Return the ismaster command
308 resolve(r.result);
309 });
310 });
311
312 // Connect
313 s.connect();
314 }).catch(reject);
315 });
316 }
317
318 /*
319 * Purge the db directory
320 */
321 purge() {
322 var self = this;
323
324 return new Promise(function(resolve, reject) {
325 co(function*() {
326 try {
327 // Delete the dbpath
328 rimraf.sync(self.options.dbpath);
329 } catch(err) {}
330
331 try {
332 // Re-Create the directory
333 mkdirp.sync(self.options.dbpath);
334 } catch(err) {}
335
336 // Return
337 resolve();
338 }).catch(reject);
339 });
340 }
341
342 stop(signal) {
343 var self = this;
344 signal = typeof signal == 'number' ? signal : signals.SIGTERM;
345
346 return new Promise(function(resolve, reject) {
347 co(function*() {
348 if(self.logger.isInfo()) {
349 self.logger.info(f('stop mongos proxy process with pid [%s]', self.process.pid));
350 }
351
352 // No process, just resolve
353 if(!self.process) return resolve();
354
355 // Wait for service to stop
356 self.process.on('close', function() {
357 if(self.logger.isInfo()) {
358 self.logger.info(f('stopped mongos proxy process with pid [%s]', self.process.pid));
359 }
360
361 // Set process to stopped
362 self.state = 'stopped';
363 // Return
364 resolve();
365 });
366
367 // Terminate the process
368 self.process.kill(signal);
369 }).catch(reject);
370 });
371 }
372
373 restart(purge) {
374 var self = this;
375
376 return new Promise(function(resolve, reject) {
377 co(function*() {
378 // Attempt to stop the server
379 yield self.stop();
380 // Do we wish to purge the directory ?
381 if(purge) yield self.purge();
382 // Start process
383 yield self.start();
384 resolve();
385 }).catch(reject);
386 });
387 }
388}
389
390module.exports = Mongos;
391
392// SIGHUP 1 Term Hangup detected on controlling terminal
393// or death of controlling process
394// SIGINT 2 Term Interrupt from keyboard
395// SIGQUIT 3 Core Quit from keyboard
396// SIGILL 4 Core Illegal Instruction
397// SIGABRT 6 Core Abort signal from abort(3)
398// SIGFPE 8 Core Floating point exception
399// SIGKILL 9 Term Kill signal
400// SIGSEGV 11 Core Invalid memory reference
401// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
402// SIGALRM 14 Term Timer signal from alarm(2)
403// SIGTERM 15 Term Termination signal
404// Signal map
405var signals = {
406 1: 'SIGHUP',
407 2: 'SIGINT',
408 3: 'SIGQUIT',
409 4: 'SIGABRT',
410 6: 'SIGABRT',
411 8: 'SIGFPE',
412 9: 'SIGKILL',
413 11: 'SIGSEGV',
414 13: 'SIGPIPE',
415 14: 'SIGALRM',
416 15: 'SIGTERM',
417 17: 'SIGSTOP',
418 19: 'SIGSTOP',
419 23: 'SIGSTOP'
420};