UNPKG

26.2 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Server = require('./server'),
8 CoreServer = require('mongodb-core').Server,
9 spawn = require('child_process').spawn;
10
11var clone = function(o) {
12 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
13}
14
15var waitMS = function(ms) {
16 return new Promise(function(resolve, reject) {
17 setTimeout(function() {
18 resolve();
19 }, ms);
20 });
21}
22
23class ConfigServers {
24 constructor(binary, nodes, options) {
25 options = options || {};
26 // Save the default passed in parameters
27 this.nodes = nodes;
28 this.options = clone(options);
29
30 // Ensure we have a list of nodes
31 if(!Array.isArray(this.nodes) || this.nodes.length == 0) {
32 throw new Error('a list of nodes must be passed in');
33 }
34
35 // Server state
36 this.state = 'stopped';
37
38 // Unpack default runtime information
39 this.binary = binary || 'mongod';
40
41 // Wait times
42 this.electionCycleWaitMS = typeof this.options.electionCycleWaitMS == 'number'
43 ? this.options.electionCycleWaitMS : 31000;
44 this.retryWaitMS = typeof this.options.retryWaitMS == 'number'
45 ? this.options.retryWaitMS : 5000;
46
47 // Remove the values from the options
48 delete this.options['electionCycleWaitMS'];
49 delete this.options['retryWaitMS'];
50
51 // Self reference
52 var self = this;
53
54 // Create server managers for each node
55 this.managers = this.nodes.map(function(x) {
56 var opts = clone(x);
57 delete opts['logpath'];
58 delete opts['replSet'];
59
60 // Add the needed config server options
61 if(!opts.configsvr) opts.configsvr = null
62
63 // Set server instance
64 var server = new Server(self.binary, opts, options);
65
66 // Create manager
67 return server;
68 });
69 }
70
71 discover() {
72 var self = this;
73
74 return new Promise(function(resolve, reject) {
75 co(function*() {
76 var proc = spawn(self.binary, ['--version']);
77 // Variables receiving data
78 var stdout = '';
79 var stderr = '';
80 // Get the stdout
81 proc.stdout.on('data', function(data) { stdout += data; });
82 // Get the stderr
83 proc.stderr.on('data', function(data) { stderr += data; });
84 // Got an error
85 proc.on('error', function(err) { reject(err); });
86 // Process terminated
87 proc.on('close', function(code) {
88 // Perform version match
89 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
90
91 // Check if we have ssl
92 var sslMatch = stdout.match(/ssl/i)
93
94 // Resolve the server version
95 resolve({
96 version: versionMatch.toString().split('.').map(function(x) {
97 return parseInt(x, 10);
98 }),
99 ssl: sslMatch != null
100 });
101 });
102 }).catch(reject);
103 });
104 }
105
106 start() {
107 var self = this;
108
109 return new Promise(function(resolve, reject) {
110 co(function*() {
111 // We are already running, just return
112 if(self.state == 'running') return resolve();
113
114 // Boot all the servers
115 for(var i = 0; i < self.managers.length; i++) {
116 yield self.managers[i].start();
117 }
118
119 // Set the state to running
120 self.state == 'running';
121
122 // We have a stable replicaset
123 resolve();
124 }).catch(reject);
125 });
126 }
127
128 /**
129 * Return members url
130 * @method
131 * return {String}
132 */
133 url() {
134 var members = this.nodes.map(function(x) {
135 return f('%s:%s', x.bind_ip || 'localhost', x.port);
136 });
137
138 // Generate the url
139 return f('%s', members.join(','));
140 }
141
142 /**
143 * Locate all the arbiters
144 * @method
145 * @returns {Promise}
146 */
147 secondaries() {
148 var self = this;
149
150 return new Promise(function(resolve, reject) {
151 co(function*() {
152 var arbiters = [];
153
154 // Go over all the managers
155 for(var i = 0; i < self.managers.length; i++) {
156 var ismaster = yield self.managers[i].ismaster();
157 if(ismaster.arbiterOnly) arbiters.push(self.managers[i]);
158 }
159
160 resolve(arbiters);
161 }).catch(reject);
162 });
163 }
164
165 /**
166 * Locate all the secondaries
167 * @method
168 * @returns {Promise}
169 */
170 secondaries() {
171 var self = this;
172
173 return new Promise(function(resolve, reject) {
174 co(function*() {
175 var secondaries = [];
176
177 // Go over all the managers
178 for(var i = 0; i < self.managers.length; i++) {
179 var ismaster = yield self.managers[i].ismaster();
180 if(ismaster.secondary) secondaries.push(self.managers[i]);
181 }
182
183 resolve(secondaries);
184 }).catch(reject);
185 });
186 }
187
188 /**
189 * Block until we have a new primary available
190 * @method
191 * @returns {Promise}
192 */
193 waitForPrimary() {
194 var self = this;
195 var waitedForElectionCycle = false;
196
197 return new Promise(function(resolve, reject) {
198 co(function*() {
199 // Keep going until we have a new primary
200 while(true) {
201 for(var i = 0; i < self.managers.length; i++) {
202 try {
203 var ismaster = yield self.managers[i].ismaster();
204
205 // Do we have an electionId and ismaster
206 if(ismaster.electionId
207 && ismaster.ismaster
208 && !ismaster.electionId.equals(self.electionId)) {
209 // We have a new primary
210 self.electionId = ismaster.electionId;
211 self.lastKnownPrimary = ismaster.me;
212 // Return the manager
213 return resolve(self.managers[i]);
214 } else if(ismaster.ismaster
215 && !waitedForElectionCycle) {
216 // Wait for 31 seconds to allow a full election cycle to pass
217 yield waitMS(self.electionCycleWaitMS);
218 // Set waitedForElectionCycle
219 waitedForElectionCycle = true;
220 } else if(ismaster.ismaster
221 && waitedForElectionCycle) {
222 return resolve();
223 }
224 } catch(err) {
225 yield waitMS(self.retryWaitMS);
226 }
227 }
228
229 // Wait for second and retry detection
230 yield waitMS(1000);
231 }
232 }).catch(reject);
233 });
234 }
235
236 /**
237 * Step down the primary server
238 * @method
239 * @param {boolean} [returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
240 * @param {number} [options.stepDownSecs=60] The number of seconds to wait before stepping down primary.
241 * @param {number} [options.secondaryCatchUpPeriodSecs=null] The number of seconds that the mongod will wait for an electable secondary to catch up to the primary.
242 * @param {boolean} [options.force=false] A boolean that determines whether the primary steps down if no electable and up-to-date secondary exists within the wait period.
243 * @returns {Promise}
244 */
245 stepDownPrimary(returnImmediately, options, credentials) {
246 var self = this;
247 options = options || {};
248
249 return new Promise(function(resolve, reject) {
250 co(function*() {
251 options = clone(options);
252
253 // Step down command
254 var command = {
255 replSetStepDown: typeof options.stepDownSecs == 'number'
256 ? options.stepDownSecs
257 : 60
258 }
259
260 // Remove stepDownSecs
261 delete options['stepDownSecs'];
262 // Mix in any other options
263 for(var name in options) {
264 command[name] = options[name];
265 }
266
267 // Locate the current primary
268 var manager = yield self.primary();
269 if(manager == null) {
270 return reject(new Error('no primary found in the replicaset'));
271 }
272
273 // Pick the first manager and execute replicaset configuration
274 try {
275 var result = yield manager.executeCommand('admin.$cmd', command, credentials);
276 } catch(err) {
277 // We got an error back from the command, if successful the socket is closed
278 if(err.ok == 0) {
279 return reject(new Error('failed to step down primary'));
280 }
281 }
282
283 // Do we need to return immediately
284 if(returnImmediately) {
285 return resolve();
286 }
287
288 // We want to wait for a new primary to appear
289 yield self.waitForPrimary();
290
291 // Finish up
292 resolve();
293 }).catch(reject);
294 });
295 }
296
297 /**
298 * Get the current replicaset configuration
299 * @method
300 * @param {object} manager The server manager that we wish to remove from the set.
301 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
302 * @returns {Promise}
303 */
304 configuration(manager, credentials) {
305 return new Promise(function(resolve, reject) {
306 co(function*() {
307 // Execute the reconfigure command
308 var result = yield manager.executeCommand('admin.$cmd', {
309 replSetGetConfig: true
310 }, credentials);
311
312 if(result && result.ok == 0) {
313 return reject(new Error(f('failed to execute replSetGetConfig against server [%s]', node.name)));
314 }
315
316 resolve(result.config);
317 }).catch(reject);
318 });
319 }
320
321 /**
322 * Set a new configuration
323 * @method
324 * @param {object} configuration The configuration JSON object
325 * @param {object} [options] Any options for the operation.
326 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
327 * @param {boolean} [options.force=false] Force the server reconfiguration
328 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
329 * @returns {Promise}
330 */
331 reconfigure(config, options, credentials) {
332 options = options || {returnImmediately:false};
333 var self = this;
334
335 // Default returnImmediately to false
336 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
337 // Default force to false
338 var force = typeof options.force == 'boolean' ? options.force : false;
339
340 return new Promise(function(resolve, reject) {
341 co(function*() {
342 // Last known config
343 var lastConfig = self.configurations[self.configurations.length - 1];
344 // Grab the current configuration and clone it (including member object)
345 config = clone(config);
346 config.members = config.members.map(function(x) {
347 return clone(x);
348 });
349
350 // Update the version to the latest + 1
351 config.version = lastConfig.version + 1;
352
353 // Reconfigure the replicaset
354 var primary = yield self.primary();
355 if(!primary) return reject(new Error('no primary available'));
356
357 // Execute the reconfigure command
358 var result = yield primary.executeCommand('admin.$cmd', {
359 replSetReconfig: config, force: force
360 }, credentials);
361
362 if(result && result.ok == 0) {
363 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
364 }
365
366 // Push new configuration to list
367 self.configurations.push(config);
368
369 // If we want to return immediately do so now
370 if(returnImmediately) return resolve(server);
371
372 // Found a valid state
373 var waitedForElectionCycle = false;
374
375 // Wait for the server to get in a stable state
376 while(true) {
377 try {
378 var primary = yield self.primary();
379 if(!primary) {
380 yield waitMS(self.retryWaitMS);
381 continue;
382 }
383
384 // Get the current ismaster
385 var ismaster = yield primary.ismaster();
386
387 // Did we cause a new election
388 if(ismaster.ismaster
389 && ismaster.electionId
390 && !self.electionId.equals(ismaster.electionId)) {
391 yield self.waitForPrimary();
392 return resolve();
393 } else if((ismaster.secondary || ismaster.arbiterOnly)
394 && ismaster.electionId
395 && self.electionId.equals(ismaster.electionId)) {
396 return resolve();
397 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
398 && !waitedForElectionCycle) {
399 // Wait for an election cycle to have passed
400 waitedForElectionCycle = true;
401 yield waitMS(self.electionCycleWaitMS);
402 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
403 && waitedForElectionCycle) {
404 return resolve();
405 } else {
406 yield waitMS(self.retryWaitMS);
407 }
408 } catch(err) {
409 yield waitMS(self.retryWaitMS);
410 }
411 }
412
413 // Should not reach here
414 reject(new Error(f('failed to successfully set a configuration [%s]', JSON.stringify(config))));
415 }).catch(reject);
416 });
417 }
418
419
420 /**
421 * Adds a new member to the replicaset
422 * @method
423 * @param {object} node All the settings used to boot the mongod process.
424 * @param {object} [options] Any options for the operation.
425 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
426 * @param {boolean} [options.force=false] Force the server reconfiguration
427 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
428 * @returns {Promise}
429 */
430 addMember(node, options, credentials) {
431 options = options || {returnImmediately:false};
432 var self = this;
433
434 // Default returnImmediately to false
435 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
436 // Default force to false
437 var force = typeof options.force == 'boolean' ? options.force : false;
438
439 return new Promise(function(resolve, reject) {
440 co(function*() {
441 // Clone the top level settings
442 node = clone(node);
443 // Clone the settings and remove the logpath
444 var opts = clone(node.options);
445 delete opts['logpath'];
446
447 // Add the needed replicaset options
448 opts.replSet = self.options.replSet;
449
450 // Create a new server instance
451 var server = new Server(self.binary, opts, self.options);
452
453 // Purge the directory
454 yield server.purge();
455
456 // Boot the instance
457 yield server.start();
458
459 // No configurations available
460 if(self.configurations.length == 0) {
461 return reject(new Error('no configurations exist yet, did you start the replicaset?'));
462 }
463
464 // Locate max id
465 var max = 0;
466
467 // Grab the current configuration and clone it (including member object)
468 var config = clone(self.configurations[self.configurations.length - 1]);
469 config.members = config.members.map(function(x) {
470 max = x._id > max ? x._id : max;
471 return clone(x);
472 });
473
474 // Let's add our new server to the configuration
475 delete node['options'];
476 // Create the member
477 var member = {
478 _id: max + 1,
479 host: f('%s:%s', opts.bind_ip, opts.port),
480 };
481
482 // Did we specify any special options
483 if(node.arbiter) member.arbiterOnly = true;
484 if(node.builIndexes) member.buildIndexes = true;
485 if(node.hidden) member.hidden = true;
486 if(typeof node.priority == 'number') member.priority = node.priority;
487 if(node.tags) member.tags = node.tags;
488 if(node.slaveDelay) member.slaveDelay = node.slaveDelay;
489 if(node.votes) member.votes = node.votes;
490
491 // Add to the list of members
492 config.members.push(member);
493 // Update the configuration version
494 config.version = config.version + 1;
495
496 // Reconfigure the replicaset
497 var primary = yield self.primary();
498 if(!primary) return reject(new Error('no primary available'));
499
500 // Execute the reconfigure command
501 var result = yield primary.executeCommand('admin.$cmd', {
502 replSetReconfig: config, force: force
503 }, credentials);
504
505 if(result && result.ok == 0) {
506 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
507 }
508
509 // Push new configuration to list
510 self.configurations.push(config);
511
512 // Add manager to list of managers
513 self.managers.push(server);
514
515 // If we want to return immediately do so now
516 if(returnImmediately) return resolve(server);
517
518 // Found a valid state
519 var waitedForElectionCycle = false;
520
521 // Wait for the server to get in a stable state
522 while(true) {
523 try {
524 // Get the ismaster for this server
525 var ismaster = yield server.ismaster();
526 // Did we cause a new election
527 if(ismaster.ismaster
528 && ismaster.electionId
529 && !self.electionId.equals(ismaster.electionId)) {
530 yield self.waitForPrimary();
531 return resolve(server);
532 } else if((ismaster.secondary || ismaster.arbiterOnly)
533 && ismaster.electionId
534 && self.electionId.equals(ismaster.electionId)) {
535 return resolve(server);
536 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
537 && !waitedForElectionCycle) {
538 // Wait for an election cycle to have passed
539 waitedForElectionCycle = true;
540 yield waitMS(self.electionCycleWaitMS);
541 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
542 && waitedForElectionCycle) {
543 return resolve(server);
544 } else {
545 yield waitMS(self.retryWaitMS);
546 }
547 } catch(err) {
548 yield waitMS(self.retryWaitMS);
549 }
550 }
551
552 // Should not reach here
553 reject(new Error(f('failed to successfully add a new member with options [%s]', JSON.stringify(node))));
554 }).catch(reject);
555 });
556 }
557
558 /**
559 * Remove a member from the set
560 * @method
561 * @param {object} manager The server manager that we wish to remove from the set.
562 * @param {object} [options] Any options for the operation.
563 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
564 * @param {boolean} [options.force=false] Force the server reconfiguration
565 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
566 * @returns {Promise}
567 */
568 removeMember(node, options, credentials) {
569 options = options || {returnImmediately:false};
570 var self = this;
571
572 // Default returnImmediately to false
573 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
574 // Default force to false
575 var force = typeof options.force == 'boolean' ? options.force : false;
576
577 return new Promise(function(resolve, reject) {
578 co(function*() {
579 // Grab the current configuration and clone it (including member object)
580 var config = clone(self.configurations[self.configurations.length - 1]);
581 config.members = config.members.map(function(x) {
582 return clone(x);
583 });
584
585 // Locate the member and remove it
586 config.members = config.members.filter(function(x) {
587 return x.host != node.name;
588 });
589 // Update the configuration version
590 config.version = config.version + 1;
591
592 // Reconfigure the replicaset
593 var primary = yield self.primary();
594 if(!primary) return reject(new Error('no primary available'));
595
596 // Execute the reconfigure command
597 var result = yield primary.executeCommand('admin.$cmd', {
598 replSetReconfig: config, force: force
599 }, credentials);
600
601 if(result && result.ok == 0) {
602 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
603 }
604
605 // Push new configuration to list
606 self.configurations.push(config);
607
608 // Remove from the list of managers
609 self.managers = self.managers.filter(function(x) {
610 return x.name != node.name;
611 });
612
613 // If we want to return immediately do so now
614 if(returnImmediately) {
615 // Shut down node
616 yield node.stop();
617 // Finished
618 return resolve();
619 }
620
621 // Found a valid state
622 var waitedForElectionCycle = false;
623
624 // Wait for the server to get in a stable state
625 while(true) {
626 try {
627 var primary = yield self.primary();
628 if(!primary) {
629 yield waitMS(self.retryWaitMS);
630 continue;
631 }
632
633 // Get the ismaster for this server
634 var ismaster = yield primary.ismaster();
635 // Did we cause a new election
636 if(ismaster.ismaster
637 && ismaster.electionId
638 && !self.electionId.equals(ismaster.electionId)) {
639 yield self.waitForPrimary();
640 // Shut down node
641 yield node.stop();
642 return resolve();
643 } else if((ismaster.secondary || ismaster.arbiterOnly)
644 && ismaster.electionId
645 && self.electionId.equals(ismaster.electionId)) {
646 return resolve();
647 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
648 && !waitedForElectionCycle) {
649 // Wait for an election cycle to have passed
650 waitedForElectionCycle = true;
651 yield waitMS(self.electionCycleWaitMS);
652 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
653 && waitedForElectionCycle) {
654 // Shut down node
655 yield node.stop();
656 return resolve();
657 } else {
658 yield waitMS(self.retryWaitMS);
659 }
660 } catch(err) {
661 yield waitMS(self.retryWaitMS);
662 }
663 }
664
665 // Should not reach here
666 reject(new Error(f('failed to successfully remove member [%s]', JSON.stringify(node.name))));
667 });
668 });
669 }
670
671 /**
672 * Remove a member from the set
673 * @method
674 * @param {object} node The server manager that we wish to remove from the set.
675 * @param {object} [options] Any options for the operation.
676 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
677 * @param {boolean} [options.maxRetries=30] Number of retries before giving up for the server to come back as secondary.
678 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
679 * @returns {Promise}
680 */
681 maintenance(value, node, options, credentials) {
682 options = options || {returnImmediately:false};
683 var self = this;
684
685 // Default returnImmediately to false
686 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
687 var maxRetries = typeof options.maxRetries == 'number' ? options.maxRetries : 30;
688
689 return new Promise(function(resolve, reject) {
690 co(function*() {
691 // Establish if the node is a secondary
692 var ismaster = yield node.ismaster();
693
694 // Ensure we only call the operation on a server in the right mode
695 if(value == true && !ismaster.secondary) {
696 return reject(new Error(f('the server at %s is not a secondary', node.name)));
697 } else if(value == false && (ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)) {
698 return reject(new Error(f('the server at %s is not in maintenance mode', node.name)));
699 }
700
701 // We have a secondary, execute the command against it
702 var result = yield node.executeCommand('admin.$cmd', {
703 replSetMaintenance: value
704 }, credentials);
705
706 // Return the error
707 if(result && result.ok == 0) {
708 return reject(new Error(f('failed to execute replSetMaintenance for server [%s]', node.name)));
709 }
710
711 // Bring back the node from maintenance but don't wait around
712 if((value == false && returnImmediately) || value == true) {
713 return resolve();
714 }
715
716 // Max waitTime
717 var currentTries = maxRetries;
718
719 // Did we pull the server back from maintenance mode
720 while(true) {
721 if(currentTries == 0) {
722 return reject(new Error(f('server %s failed to come back as a secondary after %s milliseconds waiting', node.name, (maxRetries*1000))));
723 }
724
725 // Wait for 1000 ms before figuring out if the node is back
726 yield waitMS(1000);
727
728 // Get the result
729 var ismaster = yield node.ismaster();
730
731 // Is it back to secondary state
732 if(ismaster.secondary) {
733 return resolve();
734 }
735
736 currentTries = currentTries - 1;
737 }
738
739 resolve();
740 }).catch(reject);
741 });
742 }
743
744 stop() {
745 var self = this;
746
747 return new Promise(function(resolve, reject) {
748 co(function*() {
749 for(var i = 0; i < self.managers.length; i++) {
750 yield self.managers[i].stop();
751 }
752
753 resolve();
754 }).catch(reject);
755 });
756 }
757
758 restart() {
759 var self = this;
760
761 return new Promise(function(resolve, reject) {
762 co(function*() {
763 }).catch(reject);
764 });
765 }
766
767 purge() {
768 var self = this;
769
770 return new Promise(function(resolve, reject) {
771 co(function*() {
772 // Purge all directories
773 for(var i = 0; i < self.managers.length; i++) {
774 yield self.managers[i].purge();
775 }
776
777 resolve();
778 }).catch(reject);
779 });
780 }
781}
782
783module.exports = ConfigServers;
\No newline at end of file