UNPKG

57 kBJavaScriptView Raw
1"use strict";
2
3var _createClass = (function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; })();
4
5function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
6
7var co = require('co'),
8 f = require('util').format,
9 mkdirp = require('mkdirp'),
10 rimraf = require('rimraf'),
11 Logger = require('./logger'),
12 Server = require('./server'),
13 CoreServer = require('mongodb-core').Server,
14 spawn = require('child_process').spawn;
15
16var clone = function clone(o) {
17 var obj = {};for (var name in o) {
18 obj[name] = o[name];
19 }return obj;
20};
21
22var waitMS = function waitMS(ms) {
23 return new Promise(function (resolve, reject) {
24 setTimeout(function () {
25 resolve();
26 }, ms);
27 });
28};
29
30var ReplSet = (function () {
31 function ReplSet(binary, nodes, options) {
32 _classCallCheck(this, ReplSet);
33
34 options = options || {};
35 // Save the default passed in parameters
36 this.nodes = nodes;
37 this.options = clone(options);
38
39 // Create logger instance
40 this.logger = Logger('ReplSet', options);
41
42 // Did we specify special settings for the configuration JSON used
43 // to set up the replicaset (delete it from the internal options after
44 // transferring it to a new variable)
45 if (this.options.configSettings) {
46 this.configSettings = this.options.configSettings;
47 delete this.options['configSettings'];
48 }
49
50 // Ensure we have a list of nodes
51 if (!Array.isArray(this.nodes) || this.nodes.length == 0) {
52 throw new Error('a list of nodes must be passed in');
53 }
54
55 // Ensure we have set basic options
56 if (!options.replSet) throw new Error('replSet must be set');
57
58 // Server state
59 this.state = 'stopped';
60
61 // Unpack default runtime information
62 this.binary = binary || 'mongod';
63
64 // Wait times
65 this.electionCycleWaitMS = typeof this.options.electionCycleWaitMS == 'number' ? this.options.electionCycleWaitMS : 31000;
66 this.retryWaitMS = typeof this.options.retryWaitMS == 'number' ? this.options.retryWaitMS : 5000;
67
68 // Remove the values from the options
69 delete this.options['electionCycleWaitMS'];
70 delete this.options['retryWaitMS'];
71
72 // Self reference
73 var self = this;
74
75 // Basic config settings for replicaset
76 this.version = 1;
77 this.replSet = options.replSet;
78
79 // Contains all the configurations
80 this.configurations = [];
81
82 // Get the current electionId
83 this.electionId = null;
84
85 // Create server managers for each node
86 this.managers = this.nodes.map(function (x) {
87 var opts = clone(x.options);
88 delete opts['logpath'];
89
90 // Add the needed replicaset options
91 opts.replSet = options.replSet;
92
93 // Set server instance
94 var server = new Server(self.binary, opts, options);
95
96 // Create manager
97 return server;
98 });
99 }
100
101 _createClass(ReplSet, [{
102 key: 'discover',
103 value: function discover() {
104 var self = this;
105
106 return new Promise(function (resolve, reject) {
107 co(regeneratorRuntime.mark(function _callee() {
108 var proc, stdout, stderr;
109 return regeneratorRuntime.wrap(function _callee$(_context) {
110 while (1) {
111 switch (_context.prev = _context.next) {
112 case 0:
113 proc = spawn(self.binary, ['--version']);
114 // Variables receiving data
115
116 stdout = '';
117 stderr = '';
118 // Get the stdout
119
120 proc.stdout.on('data', function (data) {
121 stdout += data;
122 });
123 // Get the stderr
124 proc.stderr.on('data', function (data) {
125 stderr += data;
126 });
127 // Got an error
128 proc.on('error', function (err) {
129 reject(err);
130 });
131 // Process terminated
132 proc.on('close', function (code) {
133 // Perform version match
134 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/);
135
136 // Check if we have ssl
137 var sslMatch = stdout.match(/ssl/i);
138 // Final result
139 var result = {
140 version: versionMatch.toString().split('.').map(function (x) {
141 return parseInt(x, 10);
142 }),
143 ssl: sslMatch != null
144 };
145
146 if (self.logger.isInfo()) {
147 self.logger.info(f('mongod discovery returned %s', JSON.stringify(result)));
148 }
149
150 // Resolve the server version
151 resolve(result);
152 });
153
154 case 7:
155 case 'end':
156 return _context.stop();
157 }
158 }
159 }, _callee, this);
160 })).catch(reject);
161 });
162 }
163 }, {
164 key: 'start',
165 value: function start() {
166 var self = this;
167
168 return new Promise(function (resolve, reject) {
169 co(regeneratorRuntime.mark(function _callee2() {
170 var result, i, config, ignoreError, numberOfArbiters, state, member, ismaster;
171 return regeneratorRuntime.wrap(function _callee2$(_context2) {
172 while (1) {
173 switch (_context2.prev = _context2.next) {
174 case 0:
175 if (!(self.state == 'running')) {
176 _context2.next = 2;
177 break;
178 }
179
180 return _context2.abrupt('return', resolve());
181
182 case 2:
183 _context2.next = 4;
184 return self.discover();
185
186 case 4:
187 result = _context2.sent;
188 i = 0;
189
190 case 6:
191 if (!(i < self.managers.length)) {
192 _context2.next = 12;
193 break;
194 }
195
196 _context2.next = 9;
197 return self.managers[i].start();
198
199 case 9:
200 i++;
201 _context2.next = 6;
202 break;
203
204 case 12:
205
206 // Time to configure the servers by generating the
207 config = generateConfiguration(self.replSet, self.version, self.nodes, self.configSettings);
208
209 if (self.logger.isInfo()) {
210 self.logger.info(f('initialize replicaset with config %s', JSON.stringify(config)));
211 }
212
213 // Ignore Error
214 ignoreError = result.version[0] == 2 && result.version[1] <= 6 ? true : false;
215
216 // Pick the first manager and execute replicaset configuration
217
218 _context2.next = 17;
219 return self.managers[0].executeCommand('admin.$cmd', {
220 replSetInitiate: config
221 }, null, { ignoreError: ignoreError });
222
223 case 17:
224 result = _context2.sent;
225
226 if (!(result.ok == 0)) {
227 _context2.next = 20;
228 break;
229 }
230
231 return _context2.abrupt('return', reject(new Error(f('failed to initialize replicaset with config %s', JSON.stringify(config)))));
232
233 case 20:
234
235 // Push configuration to the history
236 self.configurations.push(config);
237
238 // Waiting
239 numberOfArbiters = 0;
240 // Count the number of expected arbiters
241
242 self.nodes.forEach(function (x) {
243 if (x.arbiter) numberOfArbiters = numberOfArbiters + 1;
244 });
245
246 // Now monitor until we have all the servers in a healthy state
247
248 case 23:
249 if (!true) {
250 _context2.next = 42;
251 break;
252 }
253
254 _context2.next = 26;
255 return waitMS(1000);
256
257 case 26:
258
259 // Monitoring state
260 state = {
261 primaries: 0,
262 secondaries: 0,
263 arbiters: 0
264 };
265
266 // Get the replicaset status
267
268 _context2.prev = 27;
269 _context2.next = 30;
270 return self.managers[0].executeCommand('admin.$cmd', {
271 replSetGetStatus: true
272 });
273
274 case 30:
275 result = _context2.sent;
276 _context2.next = 36;
277 break;
278
279 case 33:
280 _context2.prev = 33;
281 _context2.t0 = _context2['catch'](27);
282 return _context2.abrupt('continue', 23);
283
284 case 36:
285
286 // Sum up expected servers
287 for (i = 0; i < result.members.length; i++) {
288 member = result.members[i];
289
290 if (member.health == 1) {
291 if (member.state == 2) {
292 state.secondaries = state.secondaries + 1;
293 }
294
295 if (member.state == 1) {
296 state.primaries = state.primaries + 1;
297 }
298
299 if (member.state == 7) {
300 state.arbiters = state.arbiters + 1;
301 }
302 }
303 }
304
305 if (self.logger.isInfo()) {
306 self.logger.info(f('replicaset current state %s', JSON.stringify(state)));
307 }
308
309 // Validate the state
310
311 if (!(state.primaries == 1 && state.arbiters == numberOfArbiters && state.secondaries == self.nodes.length - numberOfArbiters - 1)) {
312 _context2.next = 40;
313 break;
314 }
315
316 return _context2.abrupt('break', 42);
317
318 case 40:
319 _context2.next = 23;
320 break;
321
322 case 42:
323 _context2.next = 44;
324 return self.managers[0].ismaster();
325
326 case 44:
327 ismaster = _context2.sent;
328
329 // Save the current election Id if it exists
330 self.electionId = ismaster.electionId;
331 self.lastKnownPrimary = ismaster.me;
332
333 // We have a stable replicaset
334 resolve();
335
336 case 48:
337 case 'end':
338 return _context2.stop();
339 }
340 }
341 }, _callee2, this, [[27, 33]]);
342 })).catch(reject);
343 });
344 }
345
346 /**
347 * Locate the primary server manager
348 * @method
349 * @returns {Promise}
350 */
351
352 }, {
353 key: 'primary',
354 value: function primary() {
355 var self = this;
356
357 return new Promise(function (resolve, reject) {
358 co(regeneratorRuntime.mark(function _callee3() {
359 var manager, i, ismaster;
360 return regeneratorRuntime.wrap(function _callee3$(_context3) {
361 while (1) {
362 switch (_context3.prev = _context3.next) {
363 case 0:
364 manager = null;
365
366 // Go over all the managers
367
368 i = 0;
369
370 case 2:
371 if (!(i < self.managers.length)) {
372 _context3.next = 10;
373 break;
374 }
375
376 _context3.next = 5;
377 return self.managers[i].ismaster();
378
379 case 5:
380 ismaster = _context3.sent;
381
382 if (ismaster.ismaster) manager = self.managers[i];
383
384 case 7:
385 i++;
386 _context3.next = 2;
387 break;
388
389 case 10:
390
391 if (!manager) reject(new Error('no primary server found in set'));
392 resolve(manager);
393
394 case 12:
395 case 'end':
396 return _context3.stop();
397 }
398 }
399 }, _callee3, this);
400 })).catch(reject);
401 });
402 }
403
404 /**
405 * Return add shard url
406 * @method
407 * return {String}
408 */
409
410 }, {
411 key: 'shardUrl',
412 value: function shardUrl() {
413 var members = this.nodes.map(function (x) {
414 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
415 });
416
417 // Generate the url
418 return f('%s/%s', this.replSet, members.join(','));
419 }
420
421 /**
422 * Return members url
423 * @method
424 * return {String}
425 */
426
427 }, {
428 key: 'url',
429 value: function url() {
430 var members = this.nodes.map(function (x) {
431 return f('%s:%s', x.options.bind_ip || 'localhost', x.options.port);
432 });
433
434 // Generate the url
435 return f('%s', members.join(','));
436 }
437
438 /**
439 * Locate all the arbiters
440 * @method
441 * @returns {Promise}
442 */
443
444 }, {
445 key: 'arbiters',
446 value: function arbiters() {
447 var self = this;
448
449 return new Promise(function (resolve, reject) {
450 co(regeneratorRuntime.mark(function _callee4() {
451 var arbiters, i, ismaster;
452 return regeneratorRuntime.wrap(function _callee4$(_context4) {
453 while (1) {
454 switch (_context4.prev = _context4.next) {
455 case 0:
456 arbiters = [];
457
458 // Go over all the managers
459
460 i = 0;
461
462 case 2:
463 if (!(i < self.managers.length)) {
464 _context4.next = 10;
465 break;
466 }
467
468 _context4.next = 5;
469 return self.managers[i].ismaster();
470
471 case 5:
472 ismaster = _context4.sent;
473
474 if (ismaster.arbiterOnly) arbiters.push(self.managers[i]);
475
476 case 7:
477 i++;
478 _context4.next = 2;
479 break;
480
481 case 10:
482
483 resolve(arbiters);
484
485 case 11:
486 case 'end':
487 return _context4.stop();
488 }
489 }
490 }, _callee4, this);
491 })).catch(reject);
492 });
493 }
494
495 /**
496 * Locate all the secondaries
497 * @method
498 * @returns {Promise}
499 */
500
501 }, {
502 key: 'secondaries',
503 value: function secondaries() {
504 var self = this;
505
506 return new Promise(function (resolve, reject) {
507 co(regeneratorRuntime.mark(function _callee5() {
508 var secondaries, i, ismaster;
509 return regeneratorRuntime.wrap(function _callee5$(_context5) {
510 while (1) {
511 switch (_context5.prev = _context5.next) {
512 case 0:
513 secondaries = [];
514
515 // Go over all the managers
516
517 i = 0;
518
519 case 2:
520 if (!(i < self.managers.length)) {
521 _context5.next = 10;
522 break;
523 }
524
525 _context5.next = 5;
526 return self.managers[i].ismaster();
527
528 case 5:
529 ismaster = _context5.sent;
530
531 // Check if we have a secondary but might be a passive
532 if (ismaster.secondary && ismaster.passives && ismaster.passives.indexOf(ismaster.me) == -1) {
533 secondaries.push(self.managers[i]);
534 } else if (ismaster.secondary && !ismaster.passives) {
535 secondaries.push(self.managers[i]);
536 }
537
538 case 7:
539 i++;
540 _context5.next = 2;
541 break;
542
543 case 10:
544
545 resolve(secondaries);
546
547 case 11:
548 case 'end':
549 return _context5.stop();
550 }
551 }
552 }, _callee5, this);
553 })).catch(reject);
554 });
555 }
556
557 /**
558 * Locate all the passives
559 * @method
560 * @returns {Promise}
561 */
562
563 }, {
564 key: 'passives',
565 value: function passives() {
566 var self = this;
567
568 return new Promise(function (resolve, reject) {
569 co(regeneratorRuntime.mark(function _callee6() {
570 var secondaries, i, ismaster;
571 return regeneratorRuntime.wrap(function _callee6$(_context6) {
572 while (1) {
573 switch (_context6.prev = _context6.next) {
574 case 0:
575 secondaries = [];
576
577 // Go over all the managers
578
579 i = 0;
580
581 case 2:
582 if (!(i < self.managers.length)) {
583 _context6.next = 10;
584 break;
585 }
586
587 _context6.next = 5;
588 return self.managers[i].ismaster();
589
590 case 5:
591 ismaster = _context6.sent;
592
593 // Check if we have a secondary but might be a passive
594 if (ismaster.secondary && ismaster.passives && ismaster.passives.indexOf(ismaster.me) != -1) {
595 secondaries.push(self.managers[i]);
596 }
597
598 case 7:
599 i++;
600 _context6.next = 2;
601 break;
602
603 case 10:
604
605 resolve(secondaries);
606
607 case 11:
608 case 'end':
609 return _context6.stop();
610 }
611 }
612 }, _callee6, this);
613 })).catch(reject);
614 });
615 }
616
617 /**
618 * Block until we have a new primary available
619 * @method
620 * @returns {Promise}
621 */
622
623 }, {
624 key: 'waitForPrimary',
625 value: function waitForPrimary() {
626 var self = this;
627 var waitedForElectionCycle = false;
628
629 return new Promise(function (resolve, reject) {
630 co(regeneratorRuntime.mark(function _callee7() {
631 var i, ismaster;
632 return regeneratorRuntime.wrap(function _callee7$(_context7) {
633 while (1) {
634 switch (_context7.prev = _context7.next) {
635 case 0:
636 if (!true) {
637 _context7.next = 34;
638 break;
639 }
640
641 i = 0;
642
643 case 2:
644 if (!(i < self.managers.length)) {
645 _context7.next = 30;
646 break;
647 }
648
649 _context7.prev = 3;
650 _context7.next = 6;
651 return self.managers[i].ismaster();
652
653 case 6:
654 ismaster = _context7.sent;
655
656 if (!(ismaster.electionId && ismaster.ismaster && !ismaster.electionId.equals(self.electionId))) {
657 _context7.next = 13;
658 break;
659 }
660
661 // We have a new primary
662 self.electionId = ismaster.electionId;
663 self.lastKnownPrimary = ismaster.me;
664 // Return the manager
665 return _context7.abrupt('return', resolve(self.managers[i]));
666
667 case 13:
668 if (!(ismaster.ismaster && !waitedForElectionCycle)) {
669 _context7.next = 19;
670 break;
671 }
672
673 _context7.next = 16;
674 return waitMS(self.electionCycleWaitMS);
675
676 case 16:
677 // Set waitedForElectionCycle
678 waitedForElectionCycle = true;
679 _context7.next = 21;
680 break;
681
682 case 19:
683 if (!(ismaster.ismaster && waitedForElectionCycle)) {
684 _context7.next = 21;
685 break;
686 }
687
688 return _context7.abrupt('return', resolve());
689
690 case 21:
691 _context7.next = 27;
692 break;
693
694 case 23:
695 _context7.prev = 23;
696 _context7.t0 = _context7['catch'](3);
697 _context7.next = 27;
698 return waitMS(self.retryWaitMS);
699
700 case 27:
701 i++;
702 _context7.next = 2;
703 break;
704
705 case 30:
706 _context7.next = 32;
707 return waitMS(1000);
708
709 case 32:
710 _context7.next = 0;
711 break;
712
713 case 34:
714 case 'end':
715 return _context7.stop();
716 }
717 }
718 }, _callee7, this, [[3, 23]]);
719 })).catch(reject);
720 });
721 }
722
723 /**
724 * Step down the primary server
725 * @method
726 * @param {boolean} [returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
727 * @param {number} [options.stepDownSecs=60] The number of seconds to wait before stepping down primary.
728 * @param {number} [options.secondaryCatchUpPeriodSecs=null] The number of seconds that the mongod will wait for an electable secondary to catch up to the primary.
729 * @param {boolean} [options.force=false] A boolean that determines whether the primary steps down if no electable and up-to-date secondary exists within the wait period.
730 * @returns {Promise}
731 */
732
733 }, {
734 key: 'stepDownPrimary',
735 value: function stepDownPrimary(returnImmediately, options, credentials) {
736 var self = this;
737 options = options || {};
738
739 return new Promise(function (resolve, reject) {
740 co(regeneratorRuntime.mark(function _callee8() {
741 var command, name, manager, result, r;
742 return regeneratorRuntime.wrap(function _callee8$(_context8) {
743 while (1) {
744 switch (_context8.prev = _context8.next) {
745 case 0:
746 options = clone(options);
747
748 // Step down command
749 command = {
750 replSetStepDown: typeof options.stepDownSecs == 'number' ? options.stepDownSecs : 60
751 };
752
753 // Remove stepDownSecs
754
755 delete options['stepDownSecs'];
756 // Mix in any other options
757 for (name in options) {
758 command[name] = options[name];
759 }
760
761 // Locate the current primary
762 _context8.next = 6;
763 return self.primary();
764
765 case 6:
766 manager = _context8.sent;
767
768 if (!(manager == null)) {
769 _context8.next = 9;
770 break;
771 }
772
773 return _context8.abrupt('return', reject(new Error('no primary found in the replicaset')));
774
775 case 9:
776 _context8.prev = 9;
777 _context8.next = 12;
778 return manager.executeCommand('admin.$cmd', command, credentials);
779
780 case 12:
781 result = _context8.sent;
782 _context8.next = 19;
783 break;
784
785 case 15:
786 _context8.prev = 15;
787 _context8.t0 = _context8['catch'](9);
788
789 if (!(_context8.t0.ok == 0)) {
790 _context8.next = 19;
791 break;
792 }
793
794 return _context8.abrupt('return', reject(_context8.t0));
795
796 case 19:
797 _context8.next = 21;
798 return self.discover();
799
800 case 21:
801 r = _context8.sent;
802
803 if (!(r.version[0] >= 3)) {
804 _context8.next = 25;
805 break;
806 }
807
808 if (!(result && result.ok == 0)) {
809 _context8.next = 25;
810 break;
811 }
812
813 return _context8.abrupt('return', reject(result));
814
815 case 25:
816 if (!returnImmediately) {
817 _context8.next = 27;
818 break;
819 }
820
821 return _context8.abrupt('return', resolve());
822
823 case 27:
824 _context8.next = 29;
825 return self.waitForPrimary();
826
827 case 29:
828
829 // Finish up
830 resolve();
831
832 case 30:
833 case 'end':
834 return _context8.stop();
835 }
836 }
837 }, _callee8, this, [[9, 15]]);
838 })).catch(reject);
839 });
840 }
841
842 /**
843 * Get the current replicaset configuration
844 * @method
845 * @param {object} manager The server manager that we wish to remove from the set.
846 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
847 * @returns {Promise}
848 */
849
850 }, {
851 key: 'configuration',
852 value: function configuration(manager, credentials) {
853 var self = this;
854
855 return new Promise(function (resolve, reject) {
856 co(regeneratorRuntime.mark(function _callee9() {
857 var result, server, cursor;
858 return regeneratorRuntime.wrap(function _callee9$(_context9) {
859 while (1) {
860 switch (_context9.prev = _context9.next) {
861 case 0:
862 _context9.next = 2;
863 return self.discover();
864
865 case 2:
866 result = _context9.sent;
867
868 if (!(result[0] >= 3)) {
869 _context9.next = 12;
870 break;
871 }
872
873 _context9.next = 6;
874 return manager.executeCommand('admin.$cmd', {
875 replSetGetConfig: true
876 }, credentials);
877
878 case 6:
879 result = _context9.sent;
880
881 if (!(result && result.ok == 0)) {
882 _context9.next = 9;
883 break;
884 }
885
886 return _context9.abrupt('return', reject(new Error(f('failed to execute replSetGetConfig against server [%s]', node.name))));
887
888 case 9:
889
890 resolve(result.config);
891 _context9.next = 17;
892 break;
893
894 case 12:
895 _context9.next = 14;
896 return manager.instance(credentials);
897
898 case 14:
899 server = _context9.sent;
900
901 // Get the configuration document
902 cursor = server.cursor('local.system.replset', {
903 find: 'local.system.replset',
904 query: {},
905 limit: 1
906 });
907
908 // Execute next
909
910 cursor.next(function (err, d) {
911 if (err) return reject(err);
912 if (!d) return reject(new Error('no replicaset configuration found'));
913 resolve(d);
914 });
915
916 case 17:
917 case 'end':
918 return _context9.stop();
919 }
920 }
921 }, _callee9, this);
922 })).catch(reject);
923 });
924 }
925
926 /**
927 * Set a new configuration
928 * @method
929 * @param {object} configuration The configuration JSON object
930 * @param {object} [options] Any options for the operation.
931 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
932 * @param {boolean} [options.force=false] Force the server reconfiguration
933 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
934 * @returns {Promise}
935 */
936
937 }, {
938 key: 'reconfigure',
939 value: function reconfigure(config, options, credentials) {
940 options = options || { returnImmediately: false };
941 var self = this;
942
943 // Default returnImmediately to false
944 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
945 // Default force to false
946 var force = typeof options.force == 'boolean' ? options.force : false;
947
948 return new Promise(function (resolve, reject) {
949 co(regeneratorRuntime.mark(function _callee10() {
950 var lastConfig, primary, result, waitedForElectionCycle, ismaster;
951 return regeneratorRuntime.wrap(function _callee10$(_context10) {
952 while (1) {
953 switch (_context10.prev = _context10.next) {
954 case 0:
955 // Last known config
956 lastConfig = self.configurations[self.configurations.length - 1];
957 // Grab the current configuration and clone it (including member object)
958
959 config = clone(config);
960 config.members = config.members.map(function (x) {
961 return clone(x);
962 });
963
964 // Update the version to the latest + 1
965 config.version = lastConfig.version + 1;
966
967 // Reconfigure the replicaset
968 _context10.next = 6;
969 return self.primary();
970
971 case 6:
972 primary = _context10.sent;
973
974 if (primary) {
975 _context10.next = 9;
976 break;
977 }
978
979 return _context10.abrupt('return', reject(new Error('no primary available')));
980
981 case 9:
982 _context10.next = 11;
983 return primary.executeCommand('admin.$cmd', {
984 replSetReconfig: config, force: force
985 }, credentials, { ignoreError: true });
986
987 case 11:
988 result = _context10.sent;
989
990 if (!(result && result.ok == 0)) {
991 _context10.next = 14;
992 break;
993 }
994
995 return _context10.abrupt('return', reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config)))));
996
997 case 14:
998
999 // Push new configuration to list
1000 self.configurations.push(config);
1001
1002 // If we want to return immediately do so now
1003
1004 if (!returnImmediately) {
1005 _context10.next = 17;
1006 break;
1007 }
1008
1009 return _context10.abrupt('return', resolve(server));
1010
1011 case 17:
1012
1013 // Found a valid state
1014 waitedForElectionCycle = false;
1015
1016 // Wait for the server to get in a stable state
1017
1018 case 18:
1019 if (!true) {
1020 _context10.next = 60;
1021 break;
1022 }
1023
1024 _context10.prev = 19;
1025 _context10.next = 22;
1026 return self.primary();
1027
1028 case 22:
1029 primary = _context10.sent;
1030
1031 if (primary) {
1032 _context10.next = 27;
1033 break;
1034 }
1035
1036 _context10.next = 26;
1037 return waitMS(self.retryWaitMS);
1038
1039 case 26:
1040 return _context10.abrupt('continue', 18);
1041
1042 case 27:
1043 _context10.next = 29;
1044 return primary.ismaster();
1045
1046 case 29:
1047 ismaster = _context10.sent;
1048
1049 if (!(ismaster.ismaster && ismaster.electionId && !self.electionId.equals(ismaster.electionId))) {
1050 _context10.next = 36;
1051 break;
1052 }
1053
1054 _context10.next = 33;
1055 return self.waitForPrimary();
1056
1057 case 33:
1058 return _context10.abrupt('return', resolve());
1059
1060 case 36:
1061 if (!((ismaster.secondary || ismaster.arbiterOnly) && ismaster.electionId && self.electionId.equals(ismaster.electionId))) {
1062 _context10.next = 40;
1063 break;
1064 }
1065
1066 return _context10.abrupt('return', resolve());
1067
1068 case 40:
1069 if (!((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly) && !waitedForElectionCycle)) {
1070 _context10.next = 46;
1071 break;
1072 }
1073
1074 // Wait for an election cycle to have passed
1075 waitedForElectionCycle = true;
1076 _context10.next = 44;
1077 return waitMS(self.electionCycleWaitMS);
1078
1079 case 44:
1080 _context10.next = 52;
1081 break;
1082
1083 case 46:
1084 if (!((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly) && waitedForElectionCycle)) {
1085 _context10.next = 50;
1086 break;
1087 }
1088
1089 return _context10.abrupt('return', resolve());
1090
1091 case 50:
1092 _context10.next = 52;
1093 return waitMS(self.retryWaitMS);
1094
1095 case 52:
1096 _context10.next = 58;
1097 break;
1098
1099 case 54:
1100 _context10.prev = 54;
1101 _context10.t0 = _context10['catch'](19);
1102 _context10.next = 58;
1103 return waitMS(self.retryWaitMS);
1104
1105 case 58:
1106 _context10.next = 18;
1107 break;
1108
1109 case 60:
1110
1111 // Should not reach here
1112 reject(new Error(f('failed to successfully set a configuration [%s]', JSON.stringify(config))));
1113
1114 case 61:
1115 case 'end':
1116 return _context10.stop();
1117 }
1118 }
1119 }, _callee10, this, [[19, 54]]);
1120 })).catch(reject);
1121 });
1122 }
1123
1124 /**
1125 * Adds a new member to the replicaset
1126 * @method
1127 * @param {object} node server manager we want node configuration from
1128 * @returns {Promise}
1129 */
1130
1131 }, {
1132 key: 'serverConfiguration',
1133 value: function serverConfiguration(n) {
1134 var node = null;
1135
1136 // Is the node an existing server manager, get the info from the node
1137 if (n instanceof Server) {
1138 // Locate the known node for this server
1139 for (var i = 0; i < this.nodes.length; i++) {
1140 var _n = this.nodes[i];
1141 if (_n.options.bind_ip == n.host && _n.options.port == n.port) {
1142 node = _n;
1143 break;
1144 }
1145 }
1146 }
1147
1148 return node;
1149 }
1150
1151 /**
1152 * Adds a new member to the replicaset
1153 * @method
1154 * @param {object} node All the settings used to boot the mongod process.
1155 * @param {object} [options] Any options for the operation.
1156 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
1157 * @param {boolean} [options.force=false] Force the server reconfiguration
1158 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
1159 * @returns {Promise}
1160 */
1161
1162 }, {
1163 key: 'addMember',
1164 value: function addMember(node, options, credentials) {
1165 options = options || { returnImmediately: false };
1166 var self = this;
1167
1168 // Default returnImmediately to false
1169 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
1170 // Default force to false
1171 var force = typeof options.force == 'boolean' ? options.force : false;
1172
1173 // Is the node an existing server manager, get the info from the node
1174 if (node instanceof Server) {
1175 // Locate the known node for this server
1176 for (var i = 0; i < this.nodes.length; i++) {
1177 var n = this.nodes[i];
1178 if (n.options.bind_ip == node.host && n.options.port == node.port) {
1179 node = n;
1180 break;
1181 }
1182 }
1183 }
1184
1185 // Return the promise
1186 return new Promise(function (resolve, reject) {
1187 co(regeneratorRuntime.mark(function _callee11() {
1188 var opts, server, max, config, member, primary, result, waitedForElectionCycle, ismaster;
1189 return regeneratorRuntime.wrap(function _callee11$(_context11) {
1190 while (1) {
1191 switch (_context11.prev = _context11.next) {
1192 case 0:
1193 // Clone the top level settings
1194 node = clone(node);
1195 // Clone the settings and remove the logpath
1196 opts = clone(node.options);
1197
1198 delete opts['logpath'];
1199
1200 // Add the needed replicaset options
1201 opts.replSet = self.options.replSet;
1202
1203 // Create a new server instance
1204 server = new Server(self.binary, opts, self.options);
1205
1206 // Purge the directory
1207
1208 _context11.next = 7;
1209 return server.purge();
1210
1211 case 7:
1212 _context11.next = 9;
1213 return server.start();
1214
1215 case 9:
1216 if (!(self.configurations.length == 0)) {
1217 _context11.next = 11;
1218 break;
1219 }
1220
1221 return _context11.abrupt('return', reject(new Error('no configurations exist yet, did you start the replicaset?')));
1222
1223 case 11:
1224
1225 // Locate max id
1226 max = 0;
1227
1228 // Grab the current configuration and clone it (including member object)
1229
1230 config = clone(self.configurations[self.configurations.length - 1]);
1231
1232 config.members = config.members.map(function (x) {
1233 max = x._id > max ? x._id : max;
1234 return clone(x);
1235 });
1236
1237 // Let's add our new server to the configuration
1238 delete node['options'];
1239 // Create the member
1240 member = {
1241 _id: max + 1,
1242 host: f('%s:%s', opts.bind_ip, opts.port)
1243 };
1244
1245 // Did we specify any special options
1246
1247 if (node.arbiter) member.arbiterOnly = true;
1248 if (node.builIndexes) member.buildIndexes = true;
1249 if (node.hidden) member.hidden = true;
1250 if (typeof node.priority == 'number') member.priority = node.priority;
1251 if (node.tags) member.tags = node.tags;
1252 if (node.slaveDelay) member.slaveDelay = node.slaveDelay;
1253 if (node.votes) member.votes = node.votes;
1254
1255 // Add to the list of members
1256 config.members.push(member);
1257 // Update the configuration version
1258 config.version = config.version + 1;
1259
1260 // Reconfigure the replicaset
1261 _context11.next = 27;
1262 return self.primary();
1263
1264 case 27:
1265 primary = _context11.sent;
1266
1267 if (primary) {
1268 _context11.next = 30;
1269 break;
1270 }
1271
1272 return _context11.abrupt('return', reject(new Error('no primary available')));
1273
1274 case 30:
1275 _context11.next = 32;
1276 return primary.executeCommand('admin.$cmd', {
1277 replSetReconfig: config, force: force
1278 }, credentials);
1279
1280 case 32:
1281 result = _context11.sent;
1282
1283 if (!(result && result.ok == 0)) {
1284 _context11.next = 35;
1285 break;
1286 }
1287
1288 return _context11.abrupt('return', reject(new Error(f('failed to execute replSetReconfig with configuration [%s]', JSON.stringify(config)))));
1289
1290 case 35:
1291
1292 // Push new configuration to list
1293 self.configurations.push(config);
1294
1295 // Add manager to list of managers
1296 self.managers.push(server);
1297
1298 // If we want to return immediately do so now
1299
1300 if (!returnImmediately) {
1301 _context11.next = 39;
1302 break;
1303 }
1304
1305 return _context11.abrupt('return', resolve(server));
1306
1307 case 39:
1308
1309 // Found a valid state
1310 waitedForElectionCycle = false;
1311
1312 // Wait for the server to get in a stable state
1313
1314 case 40:
1315 if (!true) {
1316 _context11.next = 77;
1317 break;
1318 }
1319
1320 _context11.prev = 41;
1321 _context11.next = 44;
1322 return server.ismaster();
1323
1324 case 44:
1325 ismaster = _context11.sent;
1326
1327 if (!(ismaster.ismaster && ismaster.electionId && !self.electionId.equals(ismaster.electionId))) {
1328 _context11.next = 51;
1329 break;
1330 }
1331
1332 _context11.next = 48;
1333 return self.waitForPrimary();
1334
1335 case 48:
1336 return _context11.abrupt('return', resolve(server));
1337
1338 case 51:
1339 if (!((ismaster.secondary || ismaster.arbiterOnly) && ismaster.electionId && self.electionId.equals(ismaster.electionId))) {
1340 _context11.next = 55;
1341 break;
1342 }
1343
1344 return _context11.abrupt('return', resolve(server));
1345
1346 case 55:
1347 if (!((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly) && !waitedForElectionCycle)) {
1348 _context11.next = 61;
1349 break;
1350 }
1351
1352 // Wait for an election cycle to have passed
1353 waitedForElectionCycle = true;
1354 _context11.next = 59;
1355 return waitMS(self.electionCycleWaitMS);
1356
1357 case 59:
1358 _context11.next = 69;
1359 break;
1360
1361 case 61:
1362 if (!((ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly) && waitedForElectionCycle)) {
1363 _context11.next = 67;
1364 break;
1365 }
1366
1367 _context11.next = 64;
1368 return self.waitForPrimary();
1369
1370 case 64:
1371 return _context11.abrupt('return', resolve(server));
1372
1373 case 67:
1374 _context11.next = 69;
1375 return waitMS(self.retryWaitMS);
1376
1377 case 69:
1378 _context11.next = 75;
1379 break;
1380
1381 case 71:
1382 _context11.prev = 71;
1383 _context11.t0 = _context11['catch'](41);
1384 _context11.next = 75;
1385 return waitMS(self.retryWaitMS);
1386
1387 case 75:
1388 _context11.next = 40;
1389 break;
1390
1391 case 77:
1392
1393 // Should not reach here
1394 reject(new Error(f('failed to successfully add a new member with options [%s]', JSON.stringify(node))));
1395
1396 case 78:
1397 case 'end':
1398 return _context11.stop();
1399 }
1400 }
1401 }, _callee11, this, [[41, 71]]);
1402 })).catch(reject);
1403 });
1404 }
1405
1406 /**
1407 * Remove a member from the set
1408 * @method
1409 * @param {object} manager The server manager that we wish to remove from the set.
1410 * @param {object} [options] Any options for the operation.
1411 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
1412 * @param {boolean} [options.force=false] Force the server reconfiguration
1413 * @param {boolean} [options.skipWait=false] Skip waiting for the feedback
1414 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
1415 * @returns {Promise}
1416 */
1417
1418 }, {
1419 key: 'removeMember',
1420 value: function removeMember(node, options, credentials) {
1421 options = options || { returnImmediately: false };
1422 var self = this;
1423
1424 // Default returnImmediately to false
1425 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
1426 // Default force to false
1427 var force = typeof options.force == 'boolean' ? options.force : false;
1428 // Default skipWait
1429 var skipWait = typeof options.skipWait == 'boolean' ? options.skipWait : false;
1430
1431 return new Promise(function (resolve, reject) {
1432 co(regeneratorRuntime.mark(function _callee12() {
1433 var config, primary, result;
1434 return regeneratorRuntime.wrap(function _callee12$(_context12) {
1435 while (1) {
1436 switch (_context12.prev = _context12.next) {
1437 case 0:
1438 // Grab the current configuration and clone it (including member object)
1439 config = clone(self.configurations[self.configurations.length - 1]);
1440
1441 config.members = config.members.map(function (x) {
1442 return clone(x);
1443 });
1444
1445 // Locate the member and remove it
1446 config.members = config.members.filter(function (x) {
1447 return x.host != node.name;
1448 });
1449
1450 // Update the configuration version
1451 config.version = config.version + 1;
1452
1453 // Reconfigure the replicaset
1454 _context12.next = 6;
1455 return self.primary();
1456
1457 case 6:
1458 primary = _context12.sent;
1459
1460 if (primary) {
1461 _context12.next = 9;
1462 break;
1463 }
1464
1465 return _context12.abrupt('return', reject(new Error('no primary available')));
1466
1467 case 9:
1468 _context12.next = 11;
1469 return primary.executeCommand('admin.$cmd', {
1470 replSetReconfig: config, force: force
1471 }, credentials, { ignoreError: true });
1472
1473 case 11:
1474 result = _context12.sent;
1475
1476 // Push new configuration to list
1477 self.configurations.push(config);
1478
1479 // Remove from the list of managers
1480 self.managers = self.managers.filter(function (x) {
1481 return x.name != node.name;
1482 });
1483
1484 // If we want to return immediately do so now
1485
1486 if (!returnImmediately) {
1487 _context12.next = 18;
1488 break;
1489 }
1490
1491 _context12.next = 17;
1492 return node.stop();
1493
1494 case 17:
1495 return _context12.abrupt('return', resolve());
1496
1497 case 18:
1498 _context12.next = 20;
1499 return node.stop();
1500
1501 case 20:
1502 _context12.next = 22;
1503 return self.waitForPrimary();
1504
1505 case 22:
1506 return _context12.abrupt('return', resolve());
1507
1508 case 23:
1509 case 'end':
1510 return _context12.stop();
1511 }
1512 }
1513 }, _callee12, this);
1514 })).catch(reject);
1515 });
1516 }
1517
1518 /**
1519 * Remove a member from the set
1520 * @method
1521 * @param {object} node The server manager that we wish to remove from the set.
1522 * @param {object} [options] Any options for the operation.
1523 * @param {boolean} [options.returnImmediately=false] Return immediately after executing stepdown, otherwise block until new primary is available.
1524 * @param {boolean} [options.maxRetries=30] Number of retries before giving up for the server to come back as secondary.
1525 * @param {object} [credentials] Credentials needed to perform an admin authenticated command.
1526 * @returns {Promise}
1527 */
1528
1529 }, {
1530 key: 'maintenance',
1531 value: function maintenance(value, node, options, credentials) {
1532 options = options || { returnImmediately: false };
1533 var self = this;
1534
1535 // Default returnImmediately to false
1536 var returnImmediately = typeof options.returnImmediately == 'boolean' ? options.returnImmediately : false;
1537 var maxRetries = typeof options.maxRetries == 'number' ? options.maxRetries : 30;
1538
1539 return new Promise(function (resolve, reject) {
1540 co(regeneratorRuntime.mark(function _callee13() {
1541 var ismaster, result, currentTries;
1542 return regeneratorRuntime.wrap(function _callee13$(_context13) {
1543 while (1) {
1544 switch (_context13.prev = _context13.next) {
1545 case 0:
1546 _context13.next = 2;
1547 return node.ismaster();
1548
1549 case 2:
1550 ismaster = _context13.sent;
1551
1552 if (!(value == true && !ismaster.secondary)) {
1553 _context13.next = 7;
1554 break;
1555 }
1556
1557 return _context13.abrupt('return', reject(new Error(f('the server at %s is not a secondary', node.name))));
1558
1559 case 7:
1560 if (!(value == false && (ismaster.ismaster || ismaster.secondary || ismaster.arbiterOnly))) {
1561 _context13.next = 9;
1562 break;
1563 }
1564
1565 return _context13.abrupt('return', reject(new Error(f('the server at %s is not in maintenance mode', node.name))));
1566
1567 case 9:
1568 _context13.next = 11;
1569 return node.executeCommand('admin.$cmd', {
1570 replSetMaintenance: value
1571 }, credentials);
1572
1573 case 11:
1574 result = _context13.sent;
1575
1576 if (!(result && result.ok == 0)) {
1577 _context13.next = 14;
1578 break;
1579 }
1580
1581 return _context13.abrupt('return', reject(new Error(f('failed to execute replSetMaintenance for server [%s]', node.name))));
1582
1583 case 14:
1584 if (!(value == false && returnImmediately || value == true)) {
1585 _context13.next = 16;
1586 break;
1587 }
1588
1589 return _context13.abrupt('return', resolve());
1590
1591 case 16:
1592
1593 // Max waitTime
1594 currentTries = maxRetries;
1595
1596 // Did we pull the server back from maintenance mode
1597
1598 case 17:
1599 if (!true) {
1600 _context13.next = 30;
1601 break;
1602 }
1603
1604 if (!(currentTries == 0)) {
1605 _context13.next = 20;
1606 break;
1607 }
1608
1609 return _context13.abrupt('return', reject(new Error(f('server %s failed to come back as a secondary after %s milliseconds waiting', node.name, maxRetries * 1000))));
1610
1611 case 20:
1612 _context13.next = 22;
1613 return waitMS(1000);
1614
1615 case 22:
1616 _context13.next = 24;
1617 return node.ismaster();
1618
1619 case 24:
1620 ismaster = _context13.sent;
1621
1622 if (!ismaster.secondary) {
1623 _context13.next = 27;
1624 break;
1625 }
1626
1627 return _context13.abrupt('return', resolve());
1628
1629 case 27:
1630
1631 currentTries = currentTries - 1;
1632 _context13.next = 17;
1633 break;
1634
1635 case 30:
1636
1637 resolve();
1638
1639 case 31:
1640 case 'end':
1641 return _context13.stop();
1642 }
1643 }
1644 }, _callee13, this);
1645 })).catch(reject);
1646 });
1647 }
1648 }, {
1649 key: 'stop',
1650 value: function stop() {
1651 var self = this;
1652
1653 return new Promise(function (resolve, reject) {
1654 co(regeneratorRuntime.mark(function _callee14() {
1655 var i;
1656 return regeneratorRuntime.wrap(function _callee14$(_context14) {
1657 while (1) {
1658 switch (_context14.prev = _context14.next) {
1659 case 0:
1660 i = 0;
1661
1662 case 1:
1663 if (!(i < self.managers.length)) {
1664 _context14.next = 7;
1665 break;
1666 }
1667
1668 _context14.next = 4;
1669 return self.managers[i].stop();
1670
1671 case 4:
1672 i++;
1673 _context14.next = 1;
1674 break;
1675
1676 case 7:
1677
1678 resolve();
1679
1680 case 8:
1681 case 'end':
1682 return _context14.stop();
1683 }
1684 }
1685 }, _callee14, this);
1686 })).catch(reject);
1687 });
1688 }
1689 }, {
1690 key: 'restart',
1691 value: function restart() {
1692 var self = this;
1693
1694 return new Promise(function (resolve, reject) {
1695 co(regeneratorRuntime.mark(function _callee15() {
1696 return regeneratorRuntime.wrap(function _callee15$(_context15) {
1697 while (1) {
1698 switch (_context15.prev = _context15.next) {
1699 case 0:
1700 _context15.next = 2;
1701 return self.stop();
1702
1703 case 2:
1704 _context15.next = 4;
1705 return self.purge();
1706
1707 case 4:
1708 _context15.next = 6;
1709 return self.start();
1710
1711 case 6:
1712 resolve();
1713
1714 case 7:
1715 case 'end':
1716 return _context15.stop();
1717 }
1718 }
1719 }, _callee15, this);
1720 })).catch(reject);
1721 });
1722 }
1723 }, {
1724 key: 'purge',
1725 value: function purge() {
1726 var self = this;
1727
1728 return new Promise(function (resolve, reject) {
1729 co(regeneratorRuntime.mark(function _callee16() {
1730 var i;
1731 return regeneratorRuntime.wrap(function _callee16$(_context16) {
1732 while (1) {
1733 switch (_context16.prev = _context16.next) {
1734 case 0:
1735 i = 0;
1736
1737 case 1:
1738 if (!(i < self.managers.length)) {
1739 _context16.next = 7;
1740 break;
1741 }
1742
1743 _context16.next = 4;
1744 return self.managers[i].purge();
1745
1746 case 4:
1747 i++;
1748 _context16.next = 1;
1749 break;
1750
1751 case 7:
1752
1753 resolve();
1754
1755 case 8:
1756 case 'end':
1757 return _context16.stop();
1758 }
1759 }
1760 }, _callee16, this);
1761 })).catch(reject);
1762 });
1763 }
1764 }]);
1765
1766 return ReplSet;
1767})();
1768
1769/*
1770 * Generate the replicaset configuration file
1771 */
1772
1773var generateConfiguration = function generateConfiguration(_id, version, nodes, settings) {
1774 var members = [];
1775
1776 // Generate members
1777 for (var i = 0; i < nodes.length; i++) {
1778 var node = nodes[i];
1779 var member = {
1780 _id: i + 1,
1781 host: f('%s:%s', node.options.bind_ip, node.options.port)
1782 };
1783
1784 // Did we specify any special options
1785 if (node.arbiter) member.arbiterOnly = true;
1786 if (node.builIndexes) member.buildIndexes = true;
1787 if (node.hidden) member.hidden = true;
1788 if (typeof node.priority == 'number') member.priority = node.priority;
1789 if (node.tags) member.tags = node.tags;
1790 if (node.slaveDelay) member.slaveDelay = node.slaveDelay;
1791 if (node.votes) member.votes = node.votes;
1792
1793 // Add to members list
1794 members.push(member);
1795 }
1796
1797 // Configuration passed back
1798 var configuration = {
1799 _id: _id, version: version, members: members
1800 };
1801
1802 if (settings) {
1803 configuration.settings = settings;
1804 }
1805
1806 return configuration;
1807};
1808
1809module.exports = ReplSet;