UNPKG

37 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Logger = require('./logger'),
8 Server = require('./server'),
9 EventEmitter = require('events'),
10 CoreServer = require('mongodb-core').Server,
11 spawn = require('child_process').spawn;
12
13var Promise = require("bluebird");
14
15var clone = function(o) {
16 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
17}
18
19var waitMS = function(ms) {
20 return new Promise(function(resolve, reject) {
21 setTimeout(function() {
22 resolve();
23 }, ms);
24 });
25}
26
27class ReplSet extends EventEmitter {
28 constructor(binary, nodes, options) {
29 super();
30 options = options || {};
31 // Save the default passed in parameters
32 this.nodes = nodes;
33 this.options = clone(options);
34
35 // Create logger instance
36 this.logger = Logger('ReplSet', options);
37
38 // Did we specify special settings for the configuration JSON used
39 // to set up the replicaset (delete it from the internal options after
40 // transferring it to a new variable)
41 if(this.options.configSettings) {
42 this.configSettings = this.options.configSettings;
43 delete this.options['configSettings'];
44 }
45
46 // Ensure we have a list of nodes
47 if(!Array.isArray(this.nodes) || this.nodes.length == 0) {
48 throw new Error('a list of nodes must be passed in');
49 }
50
51 // Ensure we have set basic options
52 if(!options.replSet) throw new Error('replSet must be set');
53
54 // Server state
55 this.state = 'stopped';
56
57 // Unpack default runtime information
58 this.binary = binary || 'mongod';
59
60 // Wait times
61 this.electionCycleWaitMS = typeof this.options.electionCycleWaitMS == 'number'
62 ? this.options.electionCycleWaitMS : 31000;
63 this.retryWaitMS = typeof this.options.retryWaitMS == 'number'
64 ? this.options.retryWaitMS : 5000;
65
66 // Remove the values from the options
67 delete this.options['electionCycleWaitMS'];
68 delete this.options['retryWaitMS'];
69
70 // Self reference
71 var self = this;
72
73 // Basic config settings for replicaset
74 this.version = 1;
75 this.replSet = options.replSet;
76
77 // Contains all the configurations
78 this.configurations = [];
79
80 // Get the current electionId
81 this.electionId = null;
82
83 // Create server managers for each node
84 this.managers = this.nodes.map(function(x) {
85 var opts = clone(x.options);
86 delete opts['logpath'];
87
88 // Add the needed replicaset options
89 opts.replSet = options.replSet;
90
91 // Set server instance
92 var server = new Server(self.binary, opts, options);
93 server.on('state', function(state) {
94 self.emit('state', state);
95 });
96
97 // Create manager
98 return server;
99 });
100 }
101
102 discover() {
103 var self = this;
104
105 return new Promise(function(resolve, reject) {
106 co(function*() {
107 var proc = spawn(self.binary, ['--version']);
108 // Variables receiving data
109 var stdout = '';
110 var stderr = '';
111 // Get the stdout
112 proc.stdout.on('data', function(data) { stdout += data; });
113 // Get the stderr
114 proc.stderr.on('data', function(data) { stderr += data; });
115 // Got an error
116 proc.on('error', function(err) { reject(err); });
117 // Process terminated
118 proc.on('close', function(code) {
119 // Perform version match
120 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
121
122 // Check if we have ssl
123 var sslMatch = stdout.match(/ssl/i)
124 // Final result
125 var result = {
126 version: versionMatch.toString().split('.').map(function(x) {
127 return parseInt(x, 10);
128 }),
129 ssl: sslMatch != null
130 }
131
132 if(self.logger.isInfo()) {
133 self.logger.info(f('mongod discovery returned %s', JSON.stringify(result)));
134 }
135
136 // Resolve the server version
137 resolve(result);
138 });
139 }).catch(reject);
140 });
141 }
142
143 start() {
144 var self = this;
145
146 return new Promise(function(resolve, reject) {
147 co(function*() {
148 // We are already running, just return
149 if(self.state == 'running') return resolve();
150
151 // Emit start event
152 self.emit('state', {
153 event: 'start', topology: 'replSet', nodes: self.nodes, options: self.options,
154 });
155
156 // Get the version information
157 var result = yield self.discover();
158
159 // Boot all the servers
160 for(var i = 0; i < self.managers.length; i++) {
161 yield self.managers[i].start();
162 }
163
164 // Time to configure the servers by generating the
165 var config = generateConfiguration(self.replSet, self.version, self.nodes, self.configSettings);
166
167 if(self.logger.isInfo()) {
168 self.logger.info(f('initialize replicaset with config %s', JSON.stringify(config)));
169 }
170
171 // Ignore Error
172 var ignoreError = result.version[0] == 2
173 && result.version[1] <= 6 ? true : false;
174
175 // Pick the first manager and execute replicaset configuration
176 var result = yield self.managers[0].executeCommand('admin.$cmd', {
177 replSetInitiate: config
178 }, null, { ignoreError: ignoreError });
179
180 // Did the command fail, error out
181 if(result.ok == 0) {
182 return reject(new Error(f('failed to initialize replicaset with config %s', JSON.stringify(config))));
183 }
184
185 // Push configuration to the history
186 self.configurations.push(config);
187
188 // Waiting
189 var numberOfArbiters = 0;
190 // Count the number of expected arbiters
191 self.nodes.forEach(function(x) {
192 if(x.arbiter) numberOfArbiters = numberOfArbiters + 1;
193 })
194
195 // Now monitor until we have all the servers in a healthy state
196 while(true) {
197 // Wait for 200 ms before trying again
198 yield waitMS(1000);
199 // console.log("================= start 6:1")
200
201 // Monitoring state
202 var state = {
203 primaries: 0,
204 secondaries: 0,
205 arbiters: 0
206 }
207
208 // Get the replicaset status
209 try {
210 var result = yield self.managers[0].executeCommand('admin.$cmd', {
211 replSetGetStatus: true
212 });
213 } catch(err) {
214 console.log(err)
215 continue;
216 }
217
218 // Sum up expected servers
219 for(var i = 0; i < result.members.length; i++) {
220 var member = result.members[i];
221
222 if(member.health == 1) {
223 if(member.state == 2) {
224 state.secondaries = state.secondaries + 1;
225 }
226
227 if(member.state == 1) {
228 state.primaries = state.primaries + 1;
229 }
230
231 if(member.state == 7) {
232 state.arbiters = state.arbiters + 1;
233 }
234 }
235 }
236
237 if(self.logger.isInfo()) {
238 self.logger.info(f('replicaset current state %s', JSON.stringify(state)));
239 }
240
241 // Validate the state
242 if(state.primaries == 1
243 && state.arbiters == numberOfArbiters
244 && state.secondaries == (self.nodes.length - numberOfArbiters - 1)) {
245 break;
246 }
247 }
248
249 // Wait for the primary to appear in ismaster result
250 yield self.waitForPrimary();
251 // Get the last seen election Id
252 var ismaster = yield self.managers[0].ismaster();
253 // Save the current election Id if it exists
254 self.electionId = ismaster.electionId;
255 self.lastKnownPrimary = ismaster.me;
256
257 // Emit start event
258 self.emit('state', {
259 event: 'running', topology: 'replSet', nodes: self.nodes, options: self.options,
260 });
261
262 // We have a stable replicaset
263 resolve();
264 }).catch(reject);
265 });
266 }
267
268 /**
269 * Locate the primary server manager
270 * @method
271 * @returns {Promise}
272 */
273 primary() {
274 var self = this;
275
276 return new Promise(function(resolve, reject) {
277 co(function*() {
278 // Go over all the managers
279 for(var i = 0; i < self.managers.length; i++) {
280 var ismaster = yield self.managers[i].ismaster();
281
282 if(ismaster.ismaster) {
283 return resolve(self.managers[i]);
284 }
285 }
286
287 reject(new Error('no primary server found in set'));
288 }).catch(reject);
289 });
290 }
291
292 /**
293 * Return add shard url
294 * @method
295 * return {String}
296 */
297 shardUrl() {
298 var members = this.nodes.map(function(x) {
299 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
300 });
301
302 // Generate the url
303 return f('%s/%s', this.replSet, members.join(','));
304 }
305
306 /**
307 * Return members url
308 * @method
309 * return {String}
310 */
311 url() {
312 var members = this.nodes.map(function(x) {
313 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
314 });
315
316 // Generate the url
317 return f('%s', members.join(','));
318 }
319
320 /**
321 * Locate all the arbiters
322 * @method
323 * @returns {Promise}
324 */
325 arbiters() {
326 var self = this;
327
328 return new Promise(function(resolve, reject) {
329 co(function*() {
330 var arbiters = [];
331
332 // Go over all the managers
333 for(var i = 0; i < self.managers.length; i++) {
334 var ismaster = yield self.managers[i].ismaster();
335 if(ismaster.arbiterOnly) arbiters.push(self.managers[i]);
336 }
337
338 resolve(arbiters);
339 }).catch(reject);
340 });
341 }
342
343 /**
344 * Locate all the secondaries
345 * @method
346 * @returns {Promise}
347 */
348 secondaries() {
349 var self = this;
350
351 return new Promise(function(resolve, reject) {
352 co(function*() {
353 var secondaries = [];
354
355 // Go over all the managers
356 for(var i = 0; i < self.managers.length; i++) {
357 var ismaster = yield self.managers[i].ismaster();
358 // Check if we have a secondary but might be a passive
359 if(ismaster.secondary
360 && ismaster.passives
361 && ismaster.passives.indexOf(ismaster.me) == -1) {
362 secondaries.push(self.managers[i]);
363 } else if(ismaster.secondary
364 && !ismaster.passives) {
365 secondaries.push(self.managers[i]);
366 }
367 }
368
369 resolve(secondaries);
370 }).catch(reject);
371 });
372 }
373
374 /**
375 * Locate all the passives
376 * @method
377 * @returns {Promise}
378 */
379 passives() {
380 var self = this;
381
382 return new Promise(function(resolve, reject) {
383 co(function*() {
384 var secondaries = [];
385
386 // Go over all the managers
387 for(var i = 0; i < self.managers.length; i++) {
388 var ismaster = yield self.managers[i].ismaster();
389 // Check if we have a secondary but might be a passive
390 if(ismaster.secondary
391 && ismaster.passives
392 && ismaster.passives.indexOf(ismaster.me) != -1) {
393 secondaries.push(self.managers[i]);
394 }
395 }
396
397 resolve(secondaries);
398 }).catch(reject);
399 });
400 }
401
402 /**
403 * Block until we have a new primary available
404 * @method
405 * @returns {Promise}
406 */
407 waitForPrimary() {
408 var self = this;
409 var waitedForElectionCycle = false;
410
411 return new Promise(function(resolve, reject) {
412 co(function*() {
413 // Keep going until we have a new primary
414 while(true) {
415 for(var i = 0; i < self.managers.length; i++) {
416 try {
417 var ismaster = yield self.managers[i].ismaster();
418
419 // Do we have an electionId and ismaster
420 if(ismaster.electionId
421 && ismaster.ismaster
422 && !ismaster.electionId.equals(self.electionId)) {
423 // We have a new primary
424 self.electionId = ismaster.electionId;
425 self.lastKnownPrimary = ismaster.me;
426 // Return the manager
427 return resolve(self.managers[i]);
428 } else if(ismaster.ismaster
429 && !waitedForElectionCycle) {
430 // Wait for 31 seconds to allow a full election cycle to pass
431 yield waitMS(self.electionCycleWaitMS);
432 // Set waitedForElectionCycle
433 waitedForElectionCycle = true;
434 } else if(ismaster.ismaster
435 && waitedForElectionCycle) {
436 return resolve();
437 }
438 } catch(err) {
439 yield waitMS(self.retryWaitMS);
440 }
441 }
442
443 // Wait for second and retry detection
444 yield waitMS(1000);
445 }
446 }).catch(reject);
447 });
448 }
449
450 /**
451 * Step down the primary server
452 * @method
453 * @param {boolean} [returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
454 * @param {number} [options.stepDownSecs=60] The number of seconds to wait before stepping down primary.
455 * @param {number} [options.secondaryCatchUpPeriodSecs=null] The number of seconds that the mongod will wait for an electable secondary to catch up to the primary.
456 * @param {boolean} [options.force=false] A boolean that determines whether the primary steps down if no electable and up-to-date secondary exists within the wait period.
457 * @returns {Promise}
458 */
459 stepDownPrimary(returnImmediately, options, credentials) {
460 var self = this;
461 options = options || {};
462
463 return new Promise(function(resolve, reject) {
464 co(function*() {
465 options = clone(options);
466
467 // Step down command
468 var command = {
469 replSetStepDown: typeof options.stepDownSecs == 'number'
470 ? options.stepDownSecs
471 : 60
472 }
473
474 // Remove stepDownSecs
475 delete options['stepDownSecs'];
476 // Mix in any other options
477 for(var name in options) {
478 command[name] = options[name];
479 }
480
481 // Locate the current primary
482 var manager = yield self.primary();
483 if(manager == null) {
484 return reject(new Error('no primary found in the replicaset'));
485 }
486
487 // Pick the first manager and execute replicaset configuration
488 try {
489 var result = yield manager.executeCommand('admin.$cmd', command, credentials);
490 } catch(err) {
491 // We got an error back from the command, if successful the socket is closed
492 if(err.ok == 0) {
493 return reject(err);
494 }
495 }
496
497 // Get the result
498 var r = yield self.discover();
499 // We have an error and the server is > 3.0
500 if(r.version[0] >= 3) {
501 if(result && result.ok == 0) {
502 return reject(result);
503 }
504 }
505
506 // Do we need to return immediately
507 if(returnImmediately) {
508 return resolve();
509 }
510
511 // We want to wait for a new primary to appear
512 yield self.waitForPrimary();
513
514 // Finish up
515 resolve();
516 }).catch(reject);
517 });
518 }
519
520 /**
521 * Get the current replicaset configuration
522 * @method
523 * @param {object} manager The server manager that we wish to use to get the current configuration.
524 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
525 * @returns {Promise}
526 */
527 configuration(manager, credentials) {
528 var self = this;
529
530 return new Promise(function(resolve, reject) {
531 co(function*() {
532
533 // Get the current mongod version
534 var result = yield self.discover();
535
536 // Do we have a mongod version 3.0.0 or higher
537 if(result[0] >= 3) {
538 // Execute the reconfigure command
539 var result = yield manager.executeCommand('admin.$cmd', {
540 replSetGetConfig: true
541 }, credentials);
542
543 if(result && result.ok == 0) {
544 return reject(new Error(f('failed to execute replSetGetConfig against server [%s]', node.name)));
545 }
546
547 resolve(result.config);
548 } else {
549
550 // Get a server instance
551 var server = yield manager.instance(credentials);
552 // Get the configuration document
553 var cursor = server.cursor('local.system.replset', {
554 find: 'local.system.replset'
555 , query: {}
556 , limit: 1
557 });
558
559 // Execute next
560 cursor.next(function(err, d) {
561 if(err) return reject(err);
562 if(!d) return reject(new Error('no replicaset configuration found'));
563 resolve(d);
564 });
565 }
566 }).catch(reject);
567 });
568 }
569
570 /**
571 * Set a new configuration
572 * @method
573 * @param {object} configuration The configuration JSON object
574 * @param {object} [options] Any options for the operation.
575 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
576 * @param {boolean} [options.force=false] Force the server reconfiguration
577 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
578 * @returns {Promise}
579 */
580 reconfigure(config, options, credentials) {
581 options = options || {returnImmediately:false};
582 var self = this;
583
584 // Default returnImmediately to false
585 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
586 // Default force to false
587 var force = typeof options.force == 'boolean' ? options.force : false;
588
589 return new Promise(function(resolve, reject) {
590 co(function*() {
591 // Last known config
592 var lastConfig = self.configurations[self.configurations.length - 1];
593 // Grab the current configuration and clone it (including member object)
594 config = clone(config);
595 config.members = config.members.map(function(x) {
596 return clone(x);
597 });
598
599 // Update the version to the latest + 1
600 config.version = lastConfig.version + 1;
601
602 // Reconfigure the replicaset
603 var primary = yield self.primary();
604 if(!primary) return reject(new Error('no primary available'));
605 // Execute the reconfigure command
606 var result = yield primary.executeCommand('admin.$cmd', {
607 replSetReconfig: config, force: force
608 }, credentials, {ignoreError:true});
609
610 if(result && result.ok == 0) {
611 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
612 }
613
614 // Push new configuration to list
615 self.configurations.push(config);
616
617 // If we want to return immediately do so now
618 if(returnImmediately) return resolve(server);
619
620 // Found a valid state
621 var waitedForElectionCycle = false;
622
623 // Wait for the server to get in a stable state
624 while(true) {
625 try {
626 var primary = yield self.primary();
627 if(!primary) {
628 yield waitMS(self.retryWaitMS);
629 continue;
630 }
631
632 // Get the current ismaster
633 var ismaster = yield primary.ismaster();
634
635 // Did we cause a new election
636 if(ismaster.ismaster
637 && ismaster.electionId
638 && !self.electionId.equals(ismaster.electionId)) {
639 yield self.waitForPrimary();
640 return resolve();
641 } else if((ismaster.secondary || ismaster.arbiterOnly)
642 && ismaster.electionId
643 && self.electionId.equals(ismaster.electionId)) {
644 return resolve();
645 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
646 && !waitedForElectionCycle) {
647 // Wait for an election cycle to have passed
648 waitedForElectionCycle = true;
649 yield waitMS(self.electionCycleWaitMS);
650 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
651 && waitedForElectionCycle) {
652 return resolve();
653 } else {
654 yield waitMS(self.retryWaitMS);
655 }
656 } catch(err) {
657 yield waitMS(self.retryWaitMS);
658 }
659 }
660
661 // Should not reach here
662 reject(new Error(f('failed to successfully set a configuration [%s]', JSON.stringify(config))));
663 }).catch(reject);
664 });
665 }
666
667 /**
668 * Get seed list node configuration
669 * @method
670 * @param {object} node server manager we want node configuration from
671 * @returns {Promise}
672 */
673 serverConfiguration(n) {
674 var node = null;
675
676 // Is the node an existing server manager, get the info from the node
677 if(n instanceof Server) {
678 // Locate the known node for this server
679 for(var i = 0; i < this.nodes.length; i++) {
680 var _n = this.nodes[i];
681 if(_n.options.bind_ip == n.host
682 && _n.options.port == n.port) {
683 node = _n;
684 break;
685 }
686 }
687 }
688
689 return node;
690 }
691
692 /**
693 * Adds a new member to the replicaset
694 * @method
695 * @param {object} node All the settings used to boot the mongod process.
696 * @param {object} [options] Any options for the operation.
697 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
698 * @param {boolean} [options.force=false] Force the server reconfiguration
699 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
700 * @returns {Promise}
701 */
702 addMember(node, options, credentials) {
703 options = options || {returnImmediately:false};
704 var self = this;
705
706 // Default returnImmediately to false
707 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
708 // Default force to false
709 var force = typeof options.force == 'boolean' ? options.force : false;
710
711 // Is the node an existing server manager, get the info from the node
712 if(node instanceof Server) {
713 // Locate the known node for this server
714 for(var i = 0; i < this.nodes.length; i++) {
715 var n = this.nodes[i];
716 if(n.options.bind_ip == node.host
717 && n.options.port == node.port) {
718 node = n;
719 break;
720 }
721 }
722 }
723
724 // Return the promise
725 return new Promise(function(resolve, reject) {
726 co(function*() {
727 // Clone the top level settings
728 node = clone(node);
729 // Clone the settings and remove the logpath
730 var opts = clone(node.options);
731 delete opts['logpath'];
732
733 // Add the needed replicaset options
734 opts.replSet = self.options.replSet;
735
736 // Create a new server instance
737 var server = new Server(self.binary, opts, self.options);
738 server.on('state', function(state) {
739 self.emit('state', state);
740 });
741
742 // If we have an existing manager remove it
743 var newManagers = [];
744
745 // Need to wait for Primary
746 var needWaitForPrimary = false;
747 // Do we already have a manager then stop it, purge it and remove it
748 for(var i = 0; i < self.managers.length; i++) {
749 if(f('%s:%s', self.managers[i].host, self.managers[i].port) == f('%s:%s', server.host, server.port)) {
750 yield self.managers[i].stop();
751 yield self.managers[i].purge();
752 needWaitForPrimary = true;
753 } else {
754 newManagers.push(self.managers[i]);
755 }
756 }
757
758 // Set up the managers
759 self.managers = newManagers;
760
761 // Purge the directory
762 yield server.purge();
763
764 // Boot the instance
765 yield server.start();
766
767 // Wait primary
768 if(needWaitForPrimary) {
769 yield self.waitForPrimary();
770 }
771
772 // No configurations available
773 if(self.configurations.length == 0) {
774 return reject(new Error('no configurations exist yet, did you start the replicaset?'));
775 }
776
777 // Locate max id
778 var max = 0;
779
780 // Grab the current configuration and clone it (including member object)
781 var config = clone(self.configurations[self.configurations.length - 1]);
782 config.members = config.members.map(function(x) {
783 max = x._id > max ? x._id : max;
784 return clone(x);
785 });
786
787 // Let's add our new server to the configuration
788 delete node['options'];
789 // Create the member
790 var member = {
791 _id: max + 1,
792 host: f('%s:%s', opts.bind_ip, opts.port),
793 };
794
795 // Did we specify any special options
796 if(node.arbiter) member.arbiterOnly = true;
797 if(node.builIndexes) member.buildIndexes = true;
798 if(node.hidden) member.hidden = true;
799 if(typeof node.priority == 'number') member.priority = node.priority;
800 if(node.tags) member.tags = node.tags;
801 if(node.slaveDelay) member.slaveDelay = node.slaveDelay;
802 if(node.votes) member.votes = node.votes;
803
804 // Add to the list of members
805 config.members.push(member);
806 // Update the configuration version
807 config.version = config.version + 1;
808
809 // Reconfigure the replicaset
810 var primary = yield self.primary();
811 if(!primary) return reject(new Error('no primary available'));
812
813 // Execute the reconfigure command
814 var result = yield primary.executeCommand('admin.$cmd', {
815 replSetReconfig: config, force: force
816 }, credentials);
817
818 if(result && result.ok == 0) {
819 return reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config))))
820 }
821
822 // Push new configuration to list
823 self.configurations.push(config);
824
825 // Add manager to list of managers
826 self.managers.push(server);
827
828 // If we want to return immediately do so now
829 if(returnImmediately) return resolve(server);
830
831 // Found a valid state
832 var waitedForElectionCycle = false;
833
834 // Wait for the server to get in a stable state
835 while(true) {
836 try {
837 // Get the ismaster for this server
838 var ismaster = yield server.ismaster();
839 // Did we cause a new election
840 if(ismaster.ismaster
841 && ismaster.electionId
842 && !self.electionId.equals(ismaster.electionId)) {
843 yield self.waitForPrimary();
844 return resolve(server);
845 } else if((ismaster.secondary || ismaster.arbiterOnly)
846 && ismaster.electionId
847 && self.electionId.equals(ismaster.electionId)) {
848 return resolve(server);
849 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
850 && !waitedForElectionCycle) {
851 // Wait for an election cycle to have passed
852 waitedForElectionCycle = true;
853 yield waitMS(self.electionCycleWaitMS);
854 } else if((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)
855 && waitedForElectionCycle) {
856 // Wait for a primary to appear
857 yield self.waitForPrimary();
858 // Return
859 return resolve(server);
860 } else {
861 yield waitMS(self.retryWaitMS);
862 }
863 } catch(err) {
864 yield waitMS(self.retryWaitMS);
865 }
866 }
867
868 // Should not reach here
869 reject(new Error(f('failed to successfully add a new member with options [%s]', JSON.stringify(node))));
870 }).catch(reject);
871 });
872 }
873
874 /**
875 * Remove a member from the set
876 * @method
877 * @param {object} manager The server manager that we wish to remove from the set.
878 * @param {object} [options] Any options for the operation.
879 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
880 * @param {boolean} [options.force=false] Force the server reconfiguration
881 * @param {boolean} [options.skipWait=false] Skip waiting for the feedback
882 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
883 * @returns {Promise}
884 */
885 removeMember(node, options, credentials) {
886 options = options || {returnImmediately:false};
887 var self = this;
888
889 // Default returnImmediately to false
890 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
891 // Default force to false
892 var force = typeof options.force == 'boolean' ? options.force : false;
893 // Default skipWait
894 var skipWait = typeof options.skipWait == 'boolean' ? options.skipWait : false;
895
896 return new Promise(function(resolve, reject) {
897 co(function*() {
898 // Grab the current configuration and clone it (including member object)
899 var config = clone(self.configurations[self.configurations.length - 1]);
900 config.members = config.members.map(function(x) {
901 return clone(x);
902 });
903
904 // Locate the member and remove it
905 config.members = config.members.filter(function(x) {
906 return x.host != node.name;
907 });
908
909 // Update the configuration version
910 config.version = config.version + 1;
911
912 // Reconfigure the replicaset
913 var primary = yield self.primary();
914 if(!primary) return reject(new Error('no primary available'));
915
916 // Execute the reconfigure command
917 var result = yield primary.executeCommand('admin.$cmd', {
918 replSetReconfig: config, force: force
919 }, credentials, {ignoreError:true});
920
921 // Push new configuration to list
922 self.configurations.push(config);
923
924 // If we want to return immediately do so now
925 if(returnImmediately) {
926 // Shut down node
927 yield node.stop();
928 // Finished
929 return resolve();
930 }
931
932 // Shut down node
933 yield node.stop();
934 // Wait for a primary to appear
935 yield self.waitForPrimary();
936 // Resolve
937 return resolve();
938 }).catch(reject);
939 });
940 }
941
942 /**
943 * Remove a member from the set
944 * @method
945 * @param {object} node The server manager that we wish to remove from the set.
946 * @param {object} [options] Any options for the operation.
947 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
948 * @param {boolean} [options.maxRetries=30] Number of retries before giving up for the server to come back as secondary.
949 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
950 * @returns {Promise}
951 */
952 maintenance(value, node, options, credentials) {
953 options = options || {returnImmediately:false};
954 var self = this;
955
956 // Default returnImmediately to false
957 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
958 var maxRetries = typeof options.maxRetries == 'number' ? options.maxRetries : 30;
959
960 return new Promise(function(resolve, reject) {
961 co(function*() {
962 // Establish if the node is a secondary
963 var ismaster = yield node.ismaster();
964
965 // Ensure we only call the operation on a server in the right mode
966 if(value == true && !ismaster.secondary) {
967 return reject(new Error(f('the server at %s is not a secondary', node.name)));
968 } else if(value == false && (ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly)) {
969 return reject(new Error(f('the server at %s is not in maintenance mode', node.name)));
970 }
971
972 // We have a secondary, execute the command against it
973 var result = yield node.executeCommand('admin.$cmd', {
974 replSetMaintenance: value
975 }, credentials);
976
977 // Return the error
978 if(result && result.ok == 0) {
979 return reject(new Error(f('failed to execute replSetMaintenance for server [%s]', node.name)));
980 }
981
982 // Bring back the node from maintenance but don't wait around
983 if((value == false && returnImmediately) || value == true) {
984 return resolve();
985 }
986
987 // Max waitTime
988 var currentTries = maxRetries;
989
990 // Did we pull the server back from maintenance mode
991 while(true) {
992 if(currentTries == 0) {
993 return reject(new Error(f('server %s failed to come back as a secondary after %s milliseconds waiting', node.name, (maxRetries*1000))));
994 }
995
996 // Wait for 1000 ms before figuring out if the node is back
997 yield waitMS(1000);
998
999 // Get the result
1000 var ismaster = yield node.ismaster();
1001
1002 // Is it back to secondary state
1003 if(ismaster.secondary) {
1004 return resolve();
1005 }
1006
1007 currentTries = currentTries - 1;
1008 }
1009
1010 resolve();
1011 }).catch(reject);
1012 });
1013 }
1014
1015 stop(signal) {
1016 var self = this;
1017 signal = typeof signal == 'number' ? signals[signal] : signals[9];
1018
1019 return new Promise(function(resolve, reject) {
1020 co(function*() {
1021 for(var i = 0; i < self.managers.length; i++) {
1022 yield self.managers[i].stop(signal);
1023 }
1024
1025 resolve();
1026 }).catch(reject);
1027 });
1028 }
1029
1030 restart(signal, options) {
1031 var self = this;
1032 signal = typeof signal == 'number' ? signals[signal] : signals[9];
1033 options = options || {};
1034
1035 return new Promise(function(resolve, reject) {
1036 co(function*() {
1037 // console.log("=================================== restart 0")
1038 // Stop the servers
1039 yield self.stop(signal);
1040 // console.log("=================================== restart 1")
1041
1042 // Do we wait after stop
1043 if(typeof options.waitMS == 'number') {
1044 yield waitMS(options.waitMS);
1045 }
1046 // console.log("=================================== restart 2")
1047
1048 // Purge directories
1049 yield self.purge();
1050 // console.log("=================================== restart 3")
1051
1052 // Clean out the configuration
1053 self.configurations = [];
1054 // console.log("=================================== restart 4")
1055
1056 // Restart the servers
1057 yield self.start();
1058 // console.log("=================================== restart 5")
1059 resolve();
1060 }).catch(function(e) {
1061 console.log(e.stack)
1062 reject(e);
1063 });
1064 });
1065 }
1066
1067 purge() {
1068 var self = this;
1069
1070 return new Promise(function(resolve, reject) {
1071 co(function*() {
1072 // Purge all directories
1073 for(var i = 0; i < self.managers.length; i++) {
1074 yield self.managers[i].purge();
1075 }
1076
1077 resolve();
1078 }).catch(reject);
1079 });
1080 }
1081}
1082
1083/*
1084 * Generate the replicaset configuration file
1085 */
1086var generateConfiguration = function(_id, version, nodes, settings) {
1087 var members = [];
1088
1089 // Generate members
1090 for(var i = 0; i < nodes.length; i++) {
1091 var node = nodes[i];
1092 var member = {
1093 _id: i+1,
1094 host: f('%s:%s', node.options.bind_ip, node.options.port),
1095 };
1096
1097 // Did we specify any special options
1098 if(node.arbiter) member.arbiterOnly = true;
1099 if(node.builIndexes) member.buildIndexes = true;
1100 if(node.hidden) member.hidden = true;
1101 if(typeof node.priority == 'number') member.priority = node.priority;
1102 if(node.tags) member.tags = node.tags;
1103 if(node.slaveDelay) member.slaveDelay = node.slaveDelay;
1104 if(node.votes) member.votes = node.votes;
1105
1106 // Add to members list
1107 members.push(member);
1108 }
1109
1110 // Configuration passed back
1111 var configuration = {
1112 _id: _id, version:version, members: members
1113 }
1114
1115 if(settings) {
1116 configuration.settings = settings;
1117 }
1118
1119 return configuration;
1120}
1121
1122module.exports = ReplSet;
1123
1124// SIGHUP 1 Term Hangup detected on controlling terminal
1125// or death of controlling process
1126// SIGINT 2 Term Interrupt from keyboard
1127// SIGQUIT 3 Core Quit from keyboard
1128// SIGILL 4 Core Illegal Instruction
1129// SIGABRT 6 Core Abort signal from abort(3)
1130// SIGFPE 8 Core Floating point exception
1131// SIGKILL 9 Term Kill signal
1132// SIGSEGV 11 Core Invalid memory reference
1133// SIGPIPE 13 Term Broken pipe: write to pipe with no readers
1134// SIGALRM 14 Term Timer signal from alarm(2)
1135// SIGTERM 15 Term Termination signal
1136// Signal map
1137var signals = {
1138 1: 'SIGHUP',
1139 2: 'SIGINT',
1140 3: 'SIGQUIT',
1141 4: 'SIGABRT',
1142 6: 'SIGABRT',
1143 8: 'SIGFPE',
1144 9: 'SIGKILL',
1145 11: 'SIGSEGV',
1146 13: 'SIGPIPE',
1147 14: 'SIGALRM',
1148 15: 'SIGTERM',
1149 17: 'SIGSTOP',
1150 19: 'SIGSTOP',
1151 23: 'SIGSTOP'
1152};