UNPKG

33.8 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Logger = require('./logger'),
8 Server = require('./server'),
9 CoreServer = require('mongodb-core').Server,
10 spawn = require('child_process').spawn;
11
12var clone = function(o) {
13 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
14}
15
16var waitMS = function(ms) {
17 return new Promise(function(resolve, reject) {
18 setTimeout(function() {
19 resolve();
20 }, ms);
21 });
22}
23
24class ReplSet {
25 constructor(binary, nodes, options) {
26 options = options || {};
27 // Save the default passed in parameters
28 this.nodes = nodes;
29 this.options = clone(options);
30
31 // Create logger instance
32 this.logger = Logger('ReplSet', options);
33
34 // Did we specify special settings for the configuration JSON used
35 // to set up the replicaset (delete it from the internal options after
36 // transferring it to a new variable)
37 if(this.options.configSettings) {
38 this.configSettings = this.options.configSettings;
39 delete this.options['configSettings'];
40 }
41
42 // Ensure we have a list of nodes
43 if(!Array.isArray(this.nodes) || this.nodes.length == 0) {
44 throw new Error('a list of nodes must be passed in');
45 }
46
47 // Ensure we have set basic options
48 if(!options.replSet) throw new Error('replSet must be set');
49
50 // Server state
51 this.state = 'stopped';
52
53 // Unpack default runtime information
54 this.binary = binary || 'mongod';
55
56 // Wait times
57 this.electionCycleWaitMS = typeof this.options.electionCycleWaitMS == 'number'
58 ? this.options.electionCycleWaitMS : 31000;
59 this.retryWaitMS = typeof this.options.retryWaitMS == 'number'
60 ? this.options.retryWaitMS : 5000;
61
62 // Remove the values from the options
63 delete this.options['electionCycleWaitMS'];
64 delete this.options['retryWaitMS'];
65
66 // Self reference
67 var self = this;
68
69 // Basic config settings for replicaset
70 this.version = 1;
71 this.replSet = options.replSet;
72
73 // Contains all the configurations
74 this.configurations = [];
75
76 // Get the current electionId
77 this.electionId = null;
78
79 // Create server managers for each node
80 this.managers = this.nodes.map(function(x) {
81 var opts = clone(x.options);
82 delete opts['logpath'];
83
84 // Add the needed replicaset options
85 opts.replSet = options.replSet;
86
87 // Set server instance
88 var server = new Server(self.binary, opts, options);
89
90 // Create manager
91 return server;
92 });
93 }
94
95 discover() {
96 var self = this;
97
98 return new Promise(function(resolve, reject) {
99 co(function*() {
100 var proc = spawn(self.binary, ['--version']);
101 // Variables receiving data
102 var stdout = '';
103 var stderr = '';
104 // Get the stdout
105 proc.stdout.on('data', function(data) { stdout += data; });
106 // Get the stderr
107 proc.stderr.on('data', function(data) { stderr += data; });
108 // Got an error
109 proc.on('error', function(err) { reject(err); });
110 // Process terminated
111 proc.on('close', function(code) {
112 // Perform version match
113 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
114
115 // Check if we have ssl
116 var sslMatch = stdout.match(/ssl/i)
117 // Final result
118 var result = {
119 version: versionMatch.toString().split('.').map(function(x) {
120 return parseInt(x, 10);
121 }),
122 ssl: sslMatch != null
123 }
124
125 if(self.logger.isInfo()) {
126 self.logger.info(f('mongod discovery returned %s', JSON.stringify(result)));
127 }
128
129 // Resolve the server version
130 resolve(result);
131 });
132 }).catch(reject);
133 });
134 }
135
136 start() {
137 var self = this;
138
139 return new Promise(function(resolve, reject) {
140 co(function*() {
141 // We are already running, just return
142 if(self.state == 'running') return resolve();
143
144 // Get the version information
145 var result = yield self.discover();
146
147 // Boot all the servers
148 for(var i = 0; i < self.managers.length; i++) {
149 yield self.managers[i].start();
150 }
151
152 // Time to configure the servers by generating the
153 var config = generateConfiguration(self.replSet, self.version, self.nodes, self.configSettings);
154
155 if(self.logger.isInfo()) {
156 self.logger.info(f('initialize replicaset with config %s', JSON.stringify(config)));
157 }
158
159 // Ignore Error
160 var ignoreError = result.version[0] == 2
161 && result.version[1] <= 6 ? true : false;
162
163 // Pick the first manager and execute replicaset configuration
164 var result = yield self.managers[0].executeCommand('admin.$cmd', {
165 replSetInitiate: config
166 }, null, { ignoreError: ignoreError });
167
168 // Did the command fail, error out
169 if(result.ok == 0) {
170 return reject(new Error(f('failed to initialize replicaset with config %s', JSON.stringify(config))));
171 }
172
173 // Push configuration to the history
174 self.configurations.push(config);
175
176 // Waiting
177 var numberOfArbiters = 0;
178 // Count the number of expected arbiters
179 self.nodes.forEach(function(x) {
180 if(x.arbiter) numberOfArbiters = numberOfArbiters + 1;
181 })
182
183 // Now monitor until we have all the servers in a healthy state
184 while(true) {
185 // Wait for 200 ms before trying again
186 yield waitMS(1000);
187
188 // Monitoring state
189 var state = {
190 primaries: 0,
191 secondaries: 0,
192 arbiters: 0
193 }
194
195 // Get the replicaset status
196 try {
197 var result = yield self.managers[0].executeCommand('admin.$cmd', {
198 replSetGetStatus: true
199 });
200 } catch(err) {
201 continue;
202 }
203
204 // Sum up expected servers
205 for(var i = 0; i < result.members.length; i++) {
206 var member = result.members[i];
207
208 if(member.health == 1) {
209 if(member.state == 2) {
210 state.secondaries = state.secondaries + 1;
211 }
212
213 if(member.state == 1) {
214 state.primaries = state.primaries + 1;
215 }
216
217 if(member.state == 7) {
218 state.arbiters = state.arbiters + 1;
219 }
220 }
221 }
222
223 if(self.logger.isInfo()) {
224 self.logger.info(f('replicaset current state %s', JSON.stringify(state)));
225 }
226
227 // Validate the state
228 if(state.primaries == 1
229 && state.arbiters == numberOfArbiters
230 && state.secondaries == (self.nodes.length - numberOfArbiters - 1)) {
231 break;
232 }
233 }
234
235 // // Wait for the primary to appear in ismaster result
236 // yield self.waitForPrimary();
237
238 // Get the last seen election Id
239 var ismaster = yield self.managers[0].ismaster();
240 // Save the current election Id if it exists
241 self.electionId = ismaster.electionId;
242 self.lastKnownPrimary = ismaster.me;
243
244 // We have a stable replicaset
245 resolve();
246 }).catch(reject);
247 });
248 }
249
250 /**
251 * Locate the primary server manager
252 * @method
253 * @returns {Promise}
254 */
255 primary() {
256 var self = this;
257
258 return new Promise(function(resolve, reject) {
259 co(function*() {
260 var manager = null;
261
262 // Go over all the managers
263 for(var i = 0; i < self.managers.length; i++) {
264 var ismaster = yield self.managers[i].ismaster();
265 if(ismaster.ismaster) manager = self.managers[i];
266 }
267
268 if(!manager) reject(new Error('no primary server found in set'));
269 resolve(manager);
270 }).catch(reject);
271 });
272 }
273
274 /**
275 * Return add shard url
276 * @method
277 * return {String}
278 */
279 shardUrl() {
280 var members = this.nodes.map(function(x) {
281 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
282 });
283
284 // Generate the url
285 return f('%s/%s', this.replSet, members.join(','));
286 }
287
288 /**
289 * Return members url
290 * @method
291 * return {String}
292 */
293 url() {
294 var members = this.nodes.map(function(x) {
295 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
296 });
297
298 // Generate the url
299 return f('%s', members.join(','));
300 }
301
302 /**
303 * Locate all the arbiters
304 * @method
305 * @returns {Promise}
306 */
307 arbiters() {
308 var self = this;
309
310 return new Promise(function(resolve, reject) {
311 co(function*() {
312 var arbiters = [];
313
314 // Go over all the managers
315 for(var i = 0; i < self.managers.length; i++) {
316 var ismaster = yield self.managers[i].ismaster();
317 if(ismaster.arbiterOnly) arbiters.push(self.managers[i]);
318 }
319
320 resolve(arbiters);
321 }).catch(reject);
322 });
323 }
324
325 /**
326 * Locate all the secondaries
327 * @method
328 * @returns {Promise}
329 */
330 secondaries() {
331 var self = this;
332
333 return new Promise(function(resolve, reject) {
334 co(function*() {
335 var secondaries = [];
336
337 // Go over all the managers
338 for(var i = 0; i < self.managers.length; i++) {
339 var ismaster = yield self.managers[i].ismaster();
340 // Check if we have a secondary but might be a passive
341 if(ismaster.secondary
342 && ismaster.passives
343 && ismaster.passives.indexOf(ismaster.me) == -1) {
344 secondaries.push(self.managers[i]);
345 } else if(ismaster.secondary
346 && !ismaster.passives) {
347 secondaries.push(self.managers[i]);
348 }
349 }
350
351 resolve(secondaries);
352 }).catch(reject);
353 });
354 }
355
356 /**
357 * Locate all the passives
358 * @method
359 * @returns {Promise}
360 */
361 passives() {
362 var self = this;
363
364 return new Promise(function(resolve, reject) {
365 co(function*() {
366 var secondaries = [];
367
368 // Go over all the managers
369 for(var i = 0; i < self.managers.length; i++) {
370 var ismaster = yield self.managers[i].ismaster();
371 // Check if we have a secondary but might be a passive
372 if(ismaster.secondary
373 && ismaster.passives
374 && ismaster.passives.indexOf(ismaster.me) != -1) {
375 secondaries.push(self.managers[i]);
376 }
377 }
378
379 resolve(secondaries);
380 }).catch(reject);
381 });
382 }
383
384 /**
385 * Block until we have a new primary available
386 * @method
387 * @returns {Promise}
388 */
389 waitForPrimary() {
390 var self = this;
391 var waitedForElectionCycle = false;
392
393 return new Promise(function(resolve, reject) {
394 co(function*() {
395 // Keep going until we have a new primary
396 while(true) {
397 for(var i = 0; i < self.managers.length; i++) {
398 try {
399 var ismaster = yield self.managers[i].ismaster();
400
401 // Do we have an electionId and ismaster
402 if(ismaster.electionId
403 && ismaster.ismaster
404 && !ismaster.electionId.equals(self.electionId)) {
405 // We have a new primary
406 self.electionId = ismaster.electionId;
407 self.lastKnownPrimary = ismaster.me;
408 // Return the manager
409 return resolve(self.managers[i]);
410 } else if(ismaster.ismaster
411 && !waitedForElectionCycle) {
412 // Wait for 31 seconds to allow a full election cycle to pass
413 yield waitMS(self.electionCycleWaitMS);
414 // Set waitedForElectionCycle
415 waitedForElectionCycle = true;
416 } else if(ismaster.ismaster
417 && waitedForElectionCycle) {
418 return resolve();
419 }
420 } catch(err) {
421 yield waitMS(self.retryWaitMS);
422 }
423 }
424
425 // Wait for second and retry detection
426 yield waitMS(1000);
427 }
428 }).catch(reject);
429 });
430 }
431
432 /**
433 * Step down the primary server
434 * @method
435 * @param {boolean} [returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
436 * @param {number} [options.stepDownSecs=60] The number of seconds to wait before stepping down primary.
437 * @param {number} [options.secondaryCatchUpPeriodSecs=null] The number of seconds that the mongod will wait for an electable secondary to catch up to the primary.
438 * @param {boolean} [options.force=false] A boolean that determines whether the primary steps down if no electable and up-to-date secondary exists within the wait period.
439 * @returns {Promise}
440 */
441 stepDownPrimary(returnImmediately, options, credentials) {
442 var self = this;
443 options = options || {};
444
445 return new Promise(function(resolve, reject) {
446 co(function*() {
447 options = clone(options);
448
449 // Step down command
450 var command = {
451 replSetStepDown: typeof options.stepDownSecs == 'number'
452 ? options.stepDownSecs
453 : 60
454 }
455
456 // Remove stepDownSecs
457 delete options['stepDownSecs'];
458 // Mix in any other options
459 for(var name in options) {
460 command[name] = options[name];
461 }
462
463 // Locate the current primary
464 var manager = yield self.primary();
465 if(manager == null) {
466 return reject(new Error('no primary found in the replicaset'));
467 }
468
469 // Pick the first manager and execute replicaset configuration
470 try {
471 var result = yield manager.executeCommand('admin.$cmd', command, credentials);
472 } catch(err) {
473 // We got an error back from the command, if successful the socket is closed
474 if(err.ok == 0) {
475 return reject(err);
476 }
477 }
478
479 // Get the result
480 var r = yield self.discover();
481 // We have an error and the server is > 3.0
482 if(r.version[0] >= 3) {
483 if(result && result.ok == 0) {
484 return reject(result);
485 }
486 }
487
488 // Do we need to return immediately
489 if(returnImmediately) {
490 return resolve();
491 }
492
493 // We want to wait for a new primary to appear
494 yield self.waitForPrimary();
495
496 // Finish up
497 resolve();
498 }).catch(reject);
499 });
500 }
501
502 /**
503 * Get the current replicaset configuration
504 * @method
505 * @param {object} manager The server manager that we wish to remove from the set.
506 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
507 * @returns {Promise}
508 */
509 configuration(manager, credentials) {
510 var self = this;
511
512 return new Promise(function(resolve, reject) {
513 co(function*() {
514
515 // Get the current mongod version
516 var result = yield self.discover();
517
518 // Do we have a mongod version 3.0.0 or higher
519 if(result[0] >= 3) {
520 // Execute the reconfigure command
521 var result = yield manager.executeCommand('admin.$cmd', {
522 replSetGetConfig: true
523 }, credentials);
524
525 if(result && result.ok == 0) {
526 return reject(new Error(f('failed to execute replSetGetConfig against server [%s]', node.name)));
527 }
528
529 resolve(result.config);
530 } else {
531
532 // Get a server instance
533 var server = yield manager.instance(credentials);
534 // Get the configuration document
535 var cursor = server.cursor('local.system.replset', {
536 find: 'local.system.replset'
537 , query: {}
538 , limit: 1
539 });
540
541 // Execute next
542 cursor.next(function(err, d) {
543 if(err) return reject(err);
544 if(!d) return reject(new Error('no replicaset configuration found'));
545 resolve(d);
546 });
547 }
548 }).catch(reject);
549 });
550 }
551
552 /**
553 * Set a new configuration
554 * @method
555 * @param {object} configuration The configuration JSON object
556 * @param {object} [options] Any options for the operation.
557 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
558 * @param {boolean} [options.force=false] Force the server reconfiguration
559 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
560 * @returns {Promise}
561 */
562 reconfigure(config, options, credentials) {
563 options = options || {returnImmediately:false};
564 var self = this;
565
566 // Default returnImmediately to false
567 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
568 // Default force to false
569 var force = typeof options.force == 'boolean' ? options.force : false;
570
571 return new Promise(function(resolve, reject) {
572 co(function*() {
573 // Last known config
574 var lastConfig = self.configurations[self.configurations.length - 1];
575 // Grab the current configuration and clone it (including member object)
576 config = clone(config);
577 config.members = config.members.map(function(x) {
578 return clone(x);
579 });
580
581 // Update the version to the latest + 1
582 config.version = lastConfig.version + 1;
583
584 // Reconfigure the replicaset
585 var primary = yield self.primary();
586 if(!primary) return reject(new Error('no primary available'));
587
588 // Execute the reconfigure command
589 var result = yield primary.executeCommand('admin.$cmd', {
590 replSetReconfig: config, force: force
591 }, credentials, {ignoreError:true});
592
593 if(result && result.ok == 0) {
594 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
595 }
596
597 // Push new configuration to list
598 self.configurations.push(config);
599
600 // If we want to return immediately do so now
601 if(returnImmediately) return resolve(server);
602
603 // Found a valid state
604 var waitedForElectionCycle = false;
605
606 // Wait for the server to get in a stable state
607 while(true) {
608 try {
609 var primary = yield self.primary();
610 if(!primary) {
611 yield waitMS(self.retryWaitMS);
612 continue;
613 }
614
615 // Get the current ismaster
616 var ismaster = yield primary.ismaster();
617
618 // Did we cause a new election
619 if(ismaster.ismaster
620 && ismaster.electionId
621 && !self.electionId.equals(ismaster.electionId)) {
622 yield self.waitForPrimary();
623 return resolve();
624 } else if((ismaster.secondary || ismaster.arbiterOnly)
625 && ismaster.electionId
626 && self.electionId.equals(ismaster.electionId)) {
627 return resolve();
628 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
629 && !waitedForElectionCycle) {
630 // Wait for an election cycle to have passed
631 waitedForElectionCycle = true;
632 yield waitMS(self.electionCycleWaitMS);
633 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
634 && waitedForElectionCycle) {
635 return resolve();
636 } else {
637 yield waitMS(self.retryWaitMS);
638 }
639 } catch(err) {
640 yield waitMS(self.retryWaitMS);
641 }
642 }
643
644 // Should not reach here
645 reject(new Error(f('failed to successfully set a configuration [%s]', JSON.stringify(config))));
646 }).catch(reject);
647 });
648 }
649
650 /**
651 * Adds a new member to the replicaset
652 * @method
653 * @param {object} node server manager we want node configuration from
654 * @returns {Promise}
655 */
656 serverConfiguration(n) {
657 var node = null;
658
659 // Is the node an existing server manager, get the info from the node
660 if(n instanceof Server) {
661 // Locate the known node for this server
662 for(var i = 0; i < this.nodes.length; i++) {
663 var _n = this.nodes[i];
664 if(_n.options.bind_ip == n.host
665 && _n.options.port == n.port) {
666 node = _n;
667 break;
668 }
669 }
670 }
671
672 return node;
673 }
674
675 /**
676 * Adds a new member to the replicaset
677 * @method
678 * @param {object} node All the settings used to boot the mongod process.
679 * @param {object} [options] Any options for the operation.
680 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
681 * @param {boolean} [options.force=false] Force the server reconfiguration
682 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
683 * @returns {Promise}
684 */
685 addMember(node, options, credentials) {
686 options = options || {returnImmediately:false};
687 var self = this;
688
689 // Default returnImmediately to false
690 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
691 // Default force to false
692 var force = typeof options.force == 'boolean' ? options.force : false;
693
694 // Is the node an existing server manager, get the info from the node
695 if(node instanceof Server) {
696 // Locate the known node for this server
697 for(var i = 0; i < this.nodes.length; i++) {
698 var n = this.nodes[i];
699 if(n.options.bind_ip == node.host
700 && n.options.port == node.port) {
701 node = n;
702 break;
703 }
704 }
705 }
706
707 // Return the promise
708 return new Promise(function(resolve, reject) {
709 co(function*() {
710 // Clone the top level settings
711 node = clone(node);
712 // Clone the settings and remove the logpath
713 var opts = clone(node.options);
714 delete opts['logpath'];
715
716 // Add the needed replicaset options
717 opts.replSet = self.options.replSet;
718
719 // Create a new server instance
720 var server = new Server(self.binary, opts, self.options);
721
722 // Purge the directory
723 yield server.purge();
724
725 // Boot the instance
726 yield server.start();
727
728 // No configurations available
729 if(self.configurations.length == 0) {
730 return reject(new Error('no configurations exist yet, did you start the replicaset?'));
731 }
732
733 // Locate max id
734 var max = 0;
735
736 // Grab the current configuration and clone it (including member object)
737 var config = clone(self.configurations[self.configurations.length - 1]);
738 config.members = config.members.map(function(x) {
739 max = x._id > max ? x._id : max;
740 return clone(x);
741 });
742
743 // Let's add our new server to the configuration
744 delete node['options'];
745 // Create the member
746 var member = {
747 _id: max + 1,
748 host: f('%s:%s', opts.bind_ip, opts.port),
749 };
750
751 // Did we specify any special options
752 if(node.arbiter) member.arbiterOnly = true;
753 if(node.builIndexes) member.buildIndexes = true;
754 if(node.hidden) member.hidden = true;
755 if(typeof node.priority == 'number') member.priority = node.priority;
756 if(node.tags) member.tags = node.tags;
757 if(node.slaveDelay) member.slaveDelay = node.slaveDelay;
758 if(node.votes) member.votes = node.votes;
759
760 // Add to the list of members
761 config.members.push(member);
762 // Update the configuration version
763 config.version = config.version + 1;
764
765 // Reconfigure the replicaset
766 var primary = yield self.primary();
767 if(!primary) return reject(new Error('no primary available'));
768
769 // Execute the reconfigure command
770 var result = yield primary.executeCommand('admin.$cmd', {
771 replSetReconfig: config, force: force
772 }, credentials);
773
774 if(result && result.ok == 0) {
775 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
776 }
777
778 // Push new configuration to list
779 self.configurations.push(config);
780
781 // Add manager to list of managers
782 self.managers.push(server);
783
784 // If we want to return immediately do so now
785 if(returnImmediately) return resolve(server);
786
787 // Found a valid state
788 var waitedForElectionCycle = false;
789
790 // Wait for the server to get in a stable state
791 while(true) {
792 try {
793 // Get the ismaster for this server
794 var ismaster = yield server.ismaster();
795 // Did we cause a new election
796 if(ismaster.ismaster
797 && ismaster.electionId
798 && !self.electionId.equals(ismaster.electionId)) {
799 yield self.waitForPrimary();
800 return resolve(server);
801 } else if((ismaster.secondary || ismaster.arbiterOnly)
802 && ismaster.electionId
803 && self.electionId.equals(ismaster.electionId)) {
804 return resolve(server);
805 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
806 && !waitedForElectionCycle) {
807 // Wait for an election cycle to have passed
808 waitedForElectionCycle = true;
809 yield waitMS(self.electionCycleWaitMS);
810 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
811 && waitedForElectionCycle) {
812 // Wait for a primary to appear
813 yield self.waitForPrimary();
814 // Return
815 return resolve(server);
816 } else {
817 yield waitMS(self.retryWaitMS);
818 }
819 } catch(err) {
820 yield waitMS(self.retryWaitMS);
821 }
822 }
823
824 // Should not reach here
825 reject(new Error(f('failed to successfully add a new member with options [%s]', JSON.stringify(node))));
826 }).catch(reject);
827 });
828 }
829
830 /**
831 * Remove a member from the set
832 * @method
833 * @param {object} manager The server manager that we wish to remove from the set.
834 * @param {object} [options] Any options for the operation.
835 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
836 * @param {boolean} [options.force=false] Force the server reconfiguration
837 * @param {boolean} [options.skipWait=false] Skip waiting for the feedback
838 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
839 * @returns {Promise}
840 */
841 removeMember(node, options, credentials) {
842 options = options || {returnImmediately:false};
843 var self = this;
844
845 // Default returnImmediately to false
846 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
847 // Default force to false
848 var force = typeof options.force == 'boolean' ? options.force : false;
849 // Default skipWait
850 var skipWait = typeof options.skipWait == 'boolean' ? options.skipWait : false;
851
852 return new Promise(function(resolve, reject) {
853 co(function*() {
854 // Grab the current configuration and clone it (including member object)
855 var config = clone(self.configurations[self.configurations.length - 1]);
856 config.members = config.members.map(function(x) {
857 return clone(x);
858 });
859
860 // Locate the member and remove it
861 config.members = config.members.filter(function(x) {
862 return x.host != node.name;
863 });
864
865 // Update the configuration version
866 config.version = config.version + 1;
867
868 // Reconfigure the replicaset
869 var primary = yield self.primary();
870 if(!primary) return reject(new Error('no primary available'));
871
872 // Execute the reconfigure command
873 var result = yield primary.executeCommand('admin.$cmd', {
874 replSetReconfig: config, force: force
875 }, credentials, {ignoreError:true});
876
877 // Push new configuration to list
878 self.configurations.push(config);
879
880 // Remove from the list of managers
881 self.managers = self.managers.filter(function(x) {
882 return x.name != node.name;
883 });
884
885 // If we want to return immediately do so now
886 if(returnImmediately) {
887 // Shut down node
888 yield node.stop();
889 // Finished
890 return resolve();
891 }
892
893 // Shut down node
894 yield node.stop();
895 // Wait for a primary to appear
896 yield self.waitForPrimary();
897 // Resolve
898 return resolve();
899 }).catch(reject);
900 });
901 }
902
903 /**
904 * Remove a member from the set
905 * @method
906 * @param {object} node The server manager that we wish to remove from the set.
907 * @param {object} [options] Any options for the operation.
908 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
909 * @param {boolean} [options.maxRetries=30] Number of retries before giving up for the server to come back as secondary.
910 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
911 * @returns {Promise}
912 */
913 maintenance(value, node, options, credentials) {
914 options = options || {returnImmediately:false};
915 var self = this;
916
917 // Default returnImmediately to false
918 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
919 var maxRetries = typeof options.maxRetries == 'number' ? options.maxRetries : 30;
920
921 return new Promise(function(resolve, reject) {
922 co(function*() {
923 // Establish if the node is a secondary
924 var ismaster = yield node.ismaster();
925
926 // Ensure we only call the operation on a server in the right mode
927 if(value == true && !ismaster.secondary) {
928 return reject(new Error(f('the server at %s is not a secondary', node.name)));
929 } else if(value == false && (ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)) {
930 return reject(new Error(f('the server at %s is not in maintenance mode', node.name)));
931 }
932
933 // We have a secondary, execute the command against it
934 var result = yield node.executeCommand('admin.$cmd', {
935 replSetMaintenance: value
936 }, credentials);
937
938 // Return the error
939 if(result && result.ok == 0) {
940 return reject(new Error(f('failed to execute replSetMaintenance for server [%s]', node.name)));
941 }
942
943 // Bring back the node from maintenance but don't wait around
944 if((value == false && returnImmediately) || value == true) {
945 return resolve();
946 }
947
948 // Max waitTime
949 var currentTries = maxRetries;
950
951 // Did we pull the server back from maintenance mode
952 while(true) {
953 if(currentTries == 0) {
954 return reject(new Error(f('server %s failed to come back as a secondary after %s milliseconds waiting', node.name, (maxRetries*1000))));
955 }
956
957 // Wait for 1000 ms before figuring out if the node is back
958 yield waitMS(1000);
959
960 // Get the result
961 var ismaster = yield node.ismaster();
962
963 // Is it back to secondary state
964 if(ismaster.secondary) {
965 return resolve();
966 }
967
968 currentTries = currentTries - 1;
969 }
970
971 resolve();
972 }).catch(reject);
973 });
974 }
975
976 stop() {
977 var self = this;
978
979 return new Promise(function(resolve, reject) {
980 co(function*() {
981 for(var i = 0; i < self.managers.length; i++) {
982 yield self.managers[i].stop();
983 }
984
985 resolve();
986 }).catch(reject);
987 });
988 }
989
990 restart() {
991 var self = this;
992
993 return new Promise(function(resolve, reject) {
994 co(function*() {
995 // Stop the servers
996 yield self.stop();
997 // Purge directories
998 yield self.purge();
999 // Restart the servers
1000 yield self.start();
1001 resolve();
1002 }).catch(reject);
1003 });
1004 }
1005
1006 purge() {
1007 var self = this;
1008
1009 return new Promise(function(resolve, reject) {
1010 co(function*() {
1011 // Purge all directories
1012 for(var i = 0; i < self.managers.length; i++) {
1013 yield self.managers[i].purge();
1014 }
1015
1016 resolve();
1017 }).catch(reject);
1018 });
1019 }
1020}
1021
1022/*
1023 * Generate the replicaset configuration file
1024 */
1025var generateConfiguration = function(_id, version, nodes, settings) {
1026 var members = [];
1027
1028 // Generate members
1029 for(var i = 0; i < nodes.length; i++) {
1030 var node = nodes[i];
1031 var member = {
1032 _id: i+1,
1033 host: f('%s:%s', node.options.bind_ip, node.options.port),
1034 };
1035
1036 // Did we specify any special options
1037 if(node.arbiter) member.arbiterOnly = true;
1038 if(node.builIndexes) member.buildIndexes = true;
1039 if(node.hidden) member.hidden = true;
1040 if(typeof node.priority == 'number') member.priority = node.priority;
1041 if(node.tags) member.tags = node.tags;
1042 if(node.slaveDelay) member.slaveDelay = node.slaveDelay;
1043 if(node.votes) member.votes = node.votes;
1044
1045 // Add to members list
1046 members.push(member);
1047 }
1048
1049 // Configuration passed back
1050 var configuration = {
1051 _id: _id, version:version, members: members
1052 }
1053
1054 if(settings) {
1055 configuration.settings = settings;
1056 }
1057
1058 return configuration;
1059}
1060
1061module.exports = ReplSet;