UNPKG

21.5 kBJavaScriptView Raw
1"use strict";
2
3var _createClass = (function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; })();
4
5function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
6
7var co = require('co'),
8 f = require('util').format,
9 mkdirp = require('mkdirp'),
10 rimraf = require('rimraf'),
11 Server = require('./server'),
12 Logger = require('./logger'),
13 ReplSet = require('./replset'),
14 ConfigServers = require('./config_servers'),
15 Mongos = require('./mongos'),
16 CoreServer = require('mongodb-core').Server,
17 spawn = require('child_process').spawn;
18
19var clone = function clone(o) {
20 var obj = {};for (var name in o) {
21 obj[name] = o[name];
22 }return obj;
23};
24
25var waitMS = function waitMS(ms) {
26 return new Promise(function (resolve, reject) {
27 setTimeout(function () {
28 resolve();
29 }, ms);
30 });
31};
32
33var reportError = function reportError(self, reject) {
34 return function (err) {
35 self.logger.error(f('%s at %s', err.message, err.stack));
36 reject(err);
37 };
38};
39
40var Sharded = (function () {
41 function Sharded(options) {
42 _classCallCheck(this, Sharded);
43
44 options = options || {};
45 // Unpack default runtime information
46 this.mongod = options.mongod || 'mongod';
47 this.mongos = options.mongos || 'mongos';
48
49 // Create logger instance
50 this.logger = Logger('Sharded', options);
51
52 // All pieces of the topology
53 this.shards = [];
54 this.configurationServers = null;
55 this.proxies = [];
56 }
57
58 _createClass(Sharded, [{
59 key: 'discover',
60 value: function discover() {
61 var self = this;
62
63 return new Promise(function (resolve, reject) {
64 co(regeneratorRuntime.mark(function _callee() {
65 var proc, stdout, stderr;
66 return regeneratorRuntime.wrap(function _callee$(_context) {
67 while (1) {
68 switch (_context.prev = _context.next) {
69 case 0:
70 proc = spawn(self.mongod, ['--version']);
71 // Variables receiving data
72
73 stdout = '';
74 stderr = '';
75 // Get the stdout
76
77 proc.stdout.on('data', function (data) {
78 stdout += data;
79 });
80 // Get the stderr
81 proc.stderr.on('data', function (data) {
82 stderr += data;
83 });
84 // Got an error
85 proc.on('error', function (err) {
86 reject(err);
87 });
88 // Process terminated
89 proc.on('close', function (code) {
90 // Perform version match
91 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/);
92
93 // Check if we have ssl
94 var sslMatch = stdout.match(/ssl/i);
95
96 // Resolve the server version
97 resolve({
98 version: versionMatch.toString().split('.').map(function (x) {
99 return parseInt(x, 10);
100 }),
101 ssl: sslMatch != null
102 });
103 });
104
105 case 7:
106 case 'end':
107 return _context.stop();
108 }
109 }
110 }, _callee, this);
111 })).catch(reportError(self, reject));
112 });
113 }
114 }, {
115 key: 'addShard',
116 value: function addShard(nodes, options) {
117 var self = this;
118
119 return new Promise(function (resolve, reject) {
120 co(regeneratorRuntime.mark(function _callee2() {
121 var shard;
122 return regeneratorRuntime.wrap(function _callee2$(_context2) {
123 while (1) {
124 switch (_context2.prev = _context2.next) {
125 case 0:
126 options = options || {};
127 // Create a shard
128 shard = new ReplSet(self.mongod, nodes, options);
129 // Add shard to list of shards
130
131 self.shards.push(shard);
132 resolve();
133
134 case 4:
135 case 'end':
136 return _context2.stop();
137 }
138 }
139 }, _callee2, this);
140 })).catch(reportError(self, reject));
141 });
142 }
143 }, {
144 key: 'addConfigurationServers',
145 value: function addConfigurationServers(nodes, options) {
146 var self = this;
147
148 return new Promise(function (resolve, reject) {
149 co(regeneratorRuntime.mark(function _callee3() {
150 var result, version;
151 return regeneratorRuntime.wrap(function _callee3$(_context3) {
152 while (1) {
153 switch (_context3.prev = _context3.next) {
154 case 0:
155 options = options || {};
156 // Establish the version of the mongod process
157 _context3.next = 3;
158 return self.discover();
159
160 case 3:
161 result = _context3.sent;
162 version = result.version;
163
164 // If configuration server has not been set up
165
166 options = clone(options);
167 // Clone the nodes
168 nodes = JSON.parse(JSON.stringify(nodes));
169 // Add config server to each of the nodes
170 nodes = nodes.map(function (x) {
171 if (x.arbiter) {
172 delete x['arbiter'];
173 }
174
175 if (!x.arbiter) {
176 x.options.configsvr = null;
177 }
178
179 return x;
180 });
181
182 // Check if we have 3.2.0 or higher where we need to boot up a replicaset
183 // not a set of configuration server
184 if (version[0] >= 4 || version[0] == 3 && version[1] >= 2) {
185 self.configurationServers = new ReplSet(self.mongod, nodes, options);
186 } else {
187 self.configurationServers = new ConfigServers(self.mongod, nodes.map(function (x) {
188 return x.options;
189 }), options);
190 }
191
192 resolve();
193
194 case 10:
195 case 'end':
196 return _context3.stop();
197 }
198 }
199 }, _callee3, this);
200 })).catch(reportError(self, reject));
201 });
202 }
203 }, {
204 key: 'addProxies',
205 value: function addProxies(nodes, options) {
206 var self = this;
207
208 return new Promise(function (resolve, reject) {
209 co(regeneratorRuntime.mark(function _callee4() {
210 var i, proxy;
211 return regeneratorRuntime.wrap(function _callee4$(_context4) {
212 while (1) {
213 switch (_context4.prev = _context4.next) {
214 case 0:
215 options = options || {};
216
217 // Clone the options
218 options = clone(options);
219
220 // For each node create a proxy
221 for (i = 0; i < nodes.length; i++) {
222 proxy = new Mongos(self.mongos, nodes[i], options);
223
224 self.proxies.push(proxy);
225 }
226
227 resolve();
228
229 case 4:
230 case 'end':
231 return _context4.stop();
232 }
233 }
234 }, _callee4, this);
235 })).catch(reportError(self, reject));
236 });
237 }
238 }, {
239 key: 'enableSharding',
240 value: function enableSharding(db, credentials) {
241 var self = this;
242
243 return new Promise(function (resolve, reject) {
244 co(regeneratorRuntime.mark(function _callee5() {
245 var proxy, result;
246 return regeneratorRuntime.wrap(function _callee5$(_context5) {
247 while (1) {
248 switch (_context5.prev = _context5.next) {
249 case 0:
250 // Get a proxy
251 proxy = self.proxies[0];
252
253 if (self.logger.isInfo()) {
254 self.logger.info(f('enable sharding for db %s', db));
255 }
256
257 // Execute the enable sharding command
258 _context5.next = 4;
259 return proxy.executeCommand('admin.$cmd', {
260 enableSharding: db
261 }, credentials);
262
263 case 4:
264 result = _context5.sent;
265
266 if (self.logger.isInfo()) {
267 self.logger.info(f('successfully enabled sharding for db %s with result [%s]', db, JSON.stringify(result)));
268 }
269
270 // Resolve
271 resolve();
272
273 case 7:
274 case 'end':
275 return _context5.stop();
276 }
277 }
278 }, _callee5, this);
279 })).catch(reportError(self, reject));
280 });
281 }
282 }, {
283 key: 'shardCollection',
284 value: function shardCollection(db, collection, shardKey, options, credentials) {
285 var self = this;
286
287 return new Promise(function (resolve, reject) {
288 co(regeneratorRuntime.mark(function _callee6() {
289 var proxy, command, result;
290 return regeneratorRuntime.wrap(function _callee6$(_context6) {
291 while (1) {
292 switch (_context6.prev = _context6.next) {
293 case 0:
294 options = options || {};
295 options = clone(options);
296 // Get a proxy
297 proxy = self.proxies[0];
298
299 // Create shard collection command
300
301 command = {
302 shardCollection: f('%s.%s', db, collection), key: shardKey
303 };
304
305 // Unique shard key
306
307 if (options.unique) {
308 command.unique = true;
309 }
310
311 if (self.logger.isInfo()) {
312 self.logger.info(f('shard collection for %s.%s with command [%s]', db, collection, JSON.stringify(command)));
313 }
314
315 // Execute the enable sharding command
316 _context6.next = 8;
317 return proxy.executeCommand('admin.$cmd', command, credentials);
318
319 case 8:
320 result = _context6.sent;
321
322 if (self.logger.isInfo()) {
323 self.logger.info(f('successfully sharded collection for %s.%s with command [%s] and result [%s]', db, collection, JSON.stringify(command), JSON.stringify(result)));
324 }
325
326 // Resolve
327 resolve();
328
329 case 11:
330 case 'end':
331 return _context6.stop();
332 }
333 }
334 }, _callee6, this);
335 })).catch(reportError(self, reject));
336 });
337 }
338 }, {
339 key: 'start',
340 value: function start() {
341 var self = this;
342
343 return new Promise(function (resolve, reject) {
344 co(regeneratorRuntime.mark(function _callee7() {
345 var i, proxy, result;
346 return regeneratorRuntime.wrap(function _callee7$(_context7) {
347 while (1) {
348 switch (_context7.prev = _context7.next) {
349 case 0:
350 i = 0;
351
352 case 1:
353 if (!(i < self.shards.length)) {
354 _context7.next = 10;
355 break;
356 }
357
358 if (self.logger.isInfo()) {
359 self.logger.info(f('start shard %s', self.shards[i].shardUrl()));
360 }
361
362 // Purge directories
363 _context7.next = 5;
364 return self.shards[i].purge();
365
366 case 5:
367 _context7.next = 7;
368 return self.shards[i].start();
369
370 case 7:
371 i++;
372 _context7.next = 1;
373 break;
374
375 case 10:
376
377 if (self.logger.isInfo()) {
378 self.logger.info(f('start configuration server %s', self.configurationServers.url()));
379 }
380
381 // Purge directories
382 _context7.next = 13;
383 return self.configurationServers.purge();
384
385 case 13:
386 _context7.next = 15;
387 return self.configurationServers.start();
388
389 case 15:
390 i = 0;
391
392 case 16:
393 if (!(i < self.proxies.length)) {
394 _context7.next = 25;
395 break;
396 }
397
398 if (self.logger.isInfo()) {
399 self.logger.info(f('start proxy at %s', self.proxies[i].name));
400 }
401
402 // Purge directories
403 _context7.next = 20;
404 return self.proxies[i].purge();
405
406 case 20:
407 _context7.next = 22;
408 return self.proxies[i].start();
409
410 case 22:
411 i++;
412 _context7.next = 16;
413 break;
414
415 case 25:
416
417 // Connect and add the shards
418 proxy = self.proxies[0];
419
420 if (proxy) {
421 _context7.next = 28;
422 break;
423 }
424
425 return _context7.abrupt('return', reject('no mongos process found'));
426
427 case 28:
428 i = 0;
429
430 case 29:
431 if (!(i < self.shards.length)) {
432 _context7.next = 38;
433 break;
434 }
435
436 if (self.logger.isInfo()) {
437 self.logger.info(f('add shard at %s', self.shards[i].shardUrl()));
438 }
439
440 // Add the shard
441 _context7.next = 33;
442 return proxy.executeCommand('admin.$cmd', {
443 addShard: self.shards[i].shardUrl()
444 }, null, {
445 reExecuteOnError: true
446 });
447
448 case 33:
449 result = _context7.sent;
450
451 if (self.logger.isInfo()) {
452 self.logger.info(f('add shard at %s with result [%s]', self.shards[i].shardUrl(), JSON.stringify(result)));
453 }
454
455 case 35:
456 i++;
457 _context7.next = 29;
458 break;
459
460 case 38:
461
462 if (self.logger.isInfo()) {
463 self.logger.info(f('sharded topology is up'));
464 }
465
466 resolve();
467
468 case 40:
469 case 'end':
470 return _context7.stop();
471 }
472 }
473 }, _callee7, this);
474 })).catch(reportError(self, reject));
475 });
476 }
477 }, {
478 key: 'purge',
479 value: function purge() {
480 var self = this;
481
482 return new Promise(function (resolve, reject) {
483 co(regeneratorRuntime.mark(function _callee8() {
484 var i;
485 return regeneratorRuntime.wrap(function _callee8$(_context8) {
486 while (1) {
487 switch (_context8.prev = _context8.next) {
488 case 0:
489 if (!(self.state == 'running')) {
490 _context8.next = 2;
491 break;
492 }
493
494 return _context8.abrupt('return', resolve());
495
496 case 2:
497
498 if (self.logger.isInfo()) {
499 self.logger.info(f('purging mongo proxy directories'));
500 }
501
502 // Shutdown all the proxies
503 i = 0;
504
505 case 4:
506 if (!(i < self.proxies.length)) {
507 _context8.next = 10;
508 break;
509 }
510
511 _context8.next = 7;
512 return self.proxies[i].purge();
513
514 case 7:
515 i++;
516 _context8.next = 4;
517 break;
518
519 case 10:
520
521 if (self.logger.isInfo()) {
522 self.logger.info(f('purging configuration server directories'));
523 }
524
525 // Shutdown configuration server
526
527 if (!self.configurationServers) {
528 _context8.next = 14;
529 break;
530 }
531
532 _context8.next = 14;
533 return self.configurationServers.purge();
534
535 case 14:
536
537 if (self.logger.isInfo()) {
538 self.logger.info(f('puring shard directories'));
539 }
540
541 // Shutdown all the shards
542 i = 0;
543
544 case 16:
545 if (!(i < self.shards.length)) {
546 _context8.next = 22;
547 break;
548 }
549
550 _context8.next = 19;
551 return self.shards[i].purge();
552
553 case 19:
554 i++;
555 _context8.next = 16;
556 break;
557
558 case 22:
559
560 if (self.logger.isInfo()) {
561 self.logger.info(f('done purging directories for topology'));
562 }
563
564 // Set the state to running
565 self.state == 'running';
566
567 // Resolve
568 resolve();
569
570 case 25:
571 case 'end':
572 return _context8.stop();
573 }
574 }
575 }, _callee8, this);
576 })).catch(reportError(self, reject));
577 });
578 }
579 }, {
580 key: 'stop',
581 value: function stop() {
582 var self = this;
583
584 return new Promise(function (resolve, reject) {
585 co(regeneratorRuntime.mark(function _callee9() {
586 var i;
587 return regeneratorRuntime.wrap(function _callee9$(_context9) {
588 while (1) {
589 switch (_context9.prev = _context9.next) {
590 case 0:
591 if (!(self.state == 'running')) {
592 _context9.next = 2;
593 break;
594 }
595
596 return _context9.abrupt('return', resolve());
597
598 case 2:
599
600 if (self.logger.isInfo()) {
601 self.logger.info(f('Shutting down mongos proxies'));
602 }
603
604 // Shutdown all the proxies
605 i = 0;
606
607 case 4:
608 if (!(i < self.proxies.length)) {
609 _context9.next = 10;
610 break;
611 }
612
613 _context9.next = 7;
614 return self.proxies[i].stop();
615
616 case 7:
617 i++;
618 _context9.next = 4;
619 break;
620
621 case 10:
622
623 if (self.logger.isInfo()) {
624 self.logger.info(f('Shutting down configuration servers'));
625 }
626
627 // Shutdown configuration server
628 _context9.next = 13;
629 return self.configurationServers.stop();
630
631 case 13:
632
633 if (self.logger.isInfo()) {
634 self.logger.info(f('Shutting down shards'));
635 }
636
637 // Shutdown all the shards
638 i = 0;
639
640 case 15:
641 if (!(i < self.shards.length)) {
642 _context9.next = 21;
643 break;
644 }
645
646 _context9.next = 18;
647 return self.shards[i].stop();
648
649 case 18:
650 i++;
651 _context9.next = 15;
652 break;
653
654 case 21:
655
656 if (self.logger.isInfo()) {
657 self.logger.info(f('done shutting down sharding topology'));
658 }
659
660 // Set the state to running
661 self.state == 'running';
662
663 // Resolve
664 resolve();
665
666 case 24:
667 case 'end':
668 return _context9.stop();
669 }
670 }
671 }, _callee9, this);
672 })).catch(reportError(self, reject));
673 });
674 }
675 }, {
676 key: 'restart',
677 value: function restart() {
678 var self = this;
679
680 return new Promise(function (resolve, reject) {
681 co(regeneratorRuntime.mark(function _callee10() {
682 return regeneratorRuntime.wrap(function _callee10$(_context10) {
683 while (1) {
684 switch (_context10.prev = _context10.next) {
685 case 0:
686 case 'end':
687 return _context10.stop();
688 }
689 }
690 }, _callee10, this);
691 })).catch(reportError(self, reject));
692 });
693 }
694 }]);
695
696 return Sharded;
697})();
698
699module.exports = Sharded;