UNPKG

11 kBJavaScriptView Raw
1"use strict"
2
3var co = require('co'),
4 f = require('util').format,
5 mkdirp = require('mkdirp'),
6 rimraf = require('rimraf'),
7 Server = require('./server'),
8 Logger = require('./logger'),
9 ReplSet = require('./replset'),
10 ConfigServers = require('./config_servers'),
11 Mongos = require('./mongos'),
12 CoreServer = require('mongodb-core').Server,
13 spawn = require('child_process').spawn;
14
15var clone = function(o) {
16 var obj = {}; for(var name in o) obj[name] = o[name]; return obj;
17}
18
19var waitMS = function(ms) {
20 return new Promise(function(resolve, reject) {
21 setTimeout(function() {
22 resolve();
23 }, ms);
24 });
25}
26
27var reportError = function(self, reject) {
28 return function(err) {
29 self.logger.error(f('%s at %s', err.message, err.stack));
30 reject(err);
31 }
32}
33
34class Sharded {
35 constructor(options) {
36 options = options || {};
37 // Unpack default runtime information
38 this.mongod = options.mongod || 'mongod';
39 this.mongos = options.mongos || 'mongos';
40
41 // Create logger instance
42 this.logger = Logger('Sharded', options);
43
44 // All pieces of the topology
45 this.shards = [];
46 this.configurationServers = null;
47 this.proxies = [];
48 }
49
50 discover() {
51 var self = this;
52
53 return new Promise(function(resolve, reject) {
54 co(function*() {
55 var proc = spawn(self.mongod, ['--version']);
56 // Variables receiving data
57 var stdout = '';
58 var stderr = '';
59 // Get the stdout
60 proc.stdout.on('data', function(data) { stdout += data; });
61 // Get the stderr
62 proc.stderr.on('data', function(data) { stderr += data; });
63 // Got an error
64 proc.on('error', function(err) { reject(err); });
65 // Process terminated
66 proc.on('close', function(code) {
67 // Perform version match
68 var versionMatch = stdout.match(/[0-9]+\.[0-9]+\.[0-9]+/)
69
70 // Check if we have ssl
71 var sslMatch = stdout.match(/ssl/i)
72
73 // Resolve the server version
74 resolve({
75 version: versionMatch.toString().split('.').map(function(x) {
76 return parseInt(x, 10);
77 }),
78 ssl: sslMatch != null
79 });
80 });
81 }).catch(reportError(self, reject));
82 });
83 }
84
85 addShard(nodes, options) {
86 var self = this;
87
88 return new Promise(function(resolve, reject) {
89 co(function*() {
90 options = options || {};
91 // Create a shard
92 var shard = new ReplSet(self.mongod, nodes, options);
93 // Add shard to list of shards
94 self.shards.push(shard);
95 resolve();
96 }).catch(reportError(self, reject));
97 });
98 }
99
100 addConfigurationServers(nodes, options) {
101 var self = this;
102
103 return new Promise(function(resolve, reject) {
104 co(function*() {
105 options = options || {};
106 // Establish the version of the mongod process
107 var result = yield self.discover();
108 var version = result.version;
109
110 // If configuration server has not been set up
111 options = clone(options);
112 // Clone the nodes
113 nodes = JSON.parse(JSON.stringify(nodes));
114 // Add config server to each of the nodes
115 nodes = nodes.map(function(x) {
116 if(x.arbiter) {
117 delete x['arbiter'];
118 }
119
120 if(!x.arbiter) {
121 x.options.configsvr = null;
122 }
123
124 return x;
125 });
126
127 // Check if we have 3.2.0 or higher where we need to boot up a replicaset
128 // not a set of configuration server
129 if(version[0] >= 4 || (version[0] == 3 && version[1] >= 2)) {
130 self.configurationServers = new ReplSet(self.mongod, nodes, options);
131 } else {
132 self.configurationServers = new ConfigServers(self.mongod, nodes.map(function(x) {
133 return x.options;
134 }), options)
135 }
136
137 resolve();
138 }).catch(reportError(self, reject));
139 });
140 }
141
142 addProxies(nodes, options) {
143 var self = this;
144
145 return new Promise(function(resolve, reject) {
146 co(function*() {
147 options = options || {};
148
149 // Clone the options
150 options = clone(options);
151
152 // For each node create a proxy
153 for(var i = 0; i < nodes.length; i++) {
154 var proxy = new Mongos(self.mongos, nodes[i], options);
155 self.proxies.push(proxy);
156 }
157
158 resolve();
159 }).catch(reportError(self, reject));
160 });
161 }
162
163 enableSharding(db, credentials) {
164 var self = this;
165
166 return new Promise(function(resolve, reject) {
167 co(function*() {
168 // Get a proxy
169 var proxy = self.proxies[0];
170
171 if(self.logger.isInfo()) {
172 self.logger.info(f('enable sharding for db %s', db));
173 }
174
175 // Execute the enable sharding command
176 var result = yield proxy.executeCommand('admin.$cmd', {
177 enableSharding: db
178 }, credentials);
179
180 if(self.logger.isInfo()) {
181 self.logger.info(f('successfully enabled sharding for db %s with result [%s]', db, JSON.stringify(result)));
182 }
183
184 // Resolve
185 resolve();
186 }).catch(reportError(self, reject));
187 });
188 }
189
190 shardCollection(db, collection, shardKey, options, credentials) {
191 var self = this;
192
193 return new Promise(function(resolve, reject) {
194 co(function*() {
195 options = options || {};
196 options = clone(options);
197 // Get a proxy
198 var proxy = self.proxies[0];
199
200 // Create shard collection command
201 var command = {
202 shardCollection: f('%s.%s', db, collection), key: shardKey
203 }
204
205 // Unique shard key
206 if(options.unique) {
207 command.unique = true;
208 }
209
210 if(self.logger.isInfo()) {
211 self.logger.info(f('shard collection for %s.%s with command [%s]', db, collection, JSON.stringify(command)));
212 }
213
214 // Execute the enable sharding command
215 var result = yield proxy.executeCommand('admin.$cmd', command, credentials);
216
217 if(self.logger.isInfo()) {
218 self.logger.info(f('successfully sharded collection for %s.%s with command [%s] and result [%s]', db, collection, JSON.stringify(command), JSON.stringify(result)));
219 }
220
221 // Resolve
222 resolve();
223 }).catch(reportError(self, reject));
224 });
225 }
226
227 start() {
228 var self = this;
229
230 return new Promise(function(resolve, reject) {
231 co(function*() {
232 // Boot up the shards first
233 for(var i = 0; i < self.shards.length; i++) {
234 if(self.logger.isInfo()) {
235 self.logger.info(f('start shard %s', self.shards[i].shardUrl()));
236 }
237
238 // Purge directories
239 yield self.shards[i].purge();
240 // Start shard
241 yield self.shards[i].start();
242 }
243
244 if(self.logger.isInfo()) {
245 self.logger.info(f('start configuration server %s', self.configurationServers.url()));
246 }
247
248 // Purge directories
249 yield self.configurationServers.purge();
250 // Boot up the configuration servers
251 yield self.configurationServers.start();
252
253 // Boot up the proxies
254 for(var i = 0; i < self.proxies.length; i++) {
255 if(self.logger.isInfo()) {
256 self.logger.info(f('start proxy at %s', self.proxies[i].name));
257 }
258
259 // Purge directories
260 yield self.proxies[i].purge();
261 // Start proxy
262 yield self.proxies[i].start();
263 }
264
265 // Connect and add the shards
266 var proxy = self.proxies[0];
267 if(!proxy) return reject('no mongos process found');
268
269 // Add all the shards
270 for(var i = 0; i < self.shards.length; i++) {
271 if(self.logger.isInfo()) {
272 self.logger.info(f('add shard at %s', self.shards[i].shardUrl()));
273 }
274
275 // Add the shard
276 var result = yield proxy.executeCommand('admin.$cmd', {
277 addShard: self.shards[i].shardUrl()
278 }, null, {
279 reExecuteOnError: true
280 });
281
282 if(self.logger.isInfo()) {
283 self.logger.info(f('add shard at %s with result [%s]', self.shards[i].shardUrl(), JSON.stringify(result)));
284 }
285 }
286
287 if(self.logger.isInfo()) {
288 self.logger.info(f('sharded topology is up'));
289 }
290
291 resolve();
292 }).catch(reportError(self, reject));
293 });
294 }
295
296 purge() {
297 var self = this;
298
299 return new Promise(function(resolve, reject) {
300 co(function*() {
301 // We are already running, just return
302 if(self.state == 'running') return resolve();
303
304 if(self.logger.isInfo()) {
305 self.logger.info(f('purging mongo proxy directories'));
306 }
307
308 // Shutdown all the proxies
309 for(var i = 0; i < self.proxies.length; i++) {
310 yield self.proxies[i].purge();
311 }
312
313 if(self.logger.isInfo()) {
314 self.logger.info(f('purging configuration server directories'));
315 }
316
317 // Shutdown configuration server
318 if(self.configurationServers) {
319 yield self.configurationServers.purge();
320 }
321
322 if(self.logger.isInfo()) {
323 self.logger.info(f('puring shard directories'));
324 }
325
326 // Shutdown all the shards
327 for(var i = 0; i < self.shards.length; i++) {
328 yield self.shards[i].purge();
329 }
330
331 if(self.logger.isInfo()) {
332 self.logger.info(f('done purging directories for topology'));
333 }
334
335 // Set the state to running
336 self.state == 'running';
337
338 // Resolve
339 resolve();
340 }).catch(reportError(self, reject));
341 });
342 }
343
344 stop() {
345 var self = this;
346
347 return new Promise(function(resolve, reject) {
348 co(function*() {
349 // We are already running, just return
350 if(self.state == 'running') return resolve();
351
352 if(self.logger.isInfo()) {
353 self.logger.info(f('Shutting down mongos proxies'));
354 }
355
356 // Shutdown all the proxies
357 for(var i = 0; i < self.proxies.length; i++) {
358 yield self.proxies[i].stop();
359 }
360
361 if(self.logger.isInfo()) {
362 self.logger.info(f('Shutting down configuration servers'));
363 }
364
365 // Shutdown configuration server
366 yield self.configurationServers.stop();
367
368
369 if(self.logger.isInfo()) {
370 self.logger.info(f('Shutting down shards'));
371 }
372
373 // Shutdown all the shards
374 for(var i = 0; i < self.shards.length; i++) {
375 yield self.shards[i].stop();
376 }
377
378 if(self.logger.isInfo()) {
379 self.logger.info(f('done shutting down sharding topology'));
380 }
381
382 // Set the state to running
383 self.state == 'running';
384
385 // Resolve
386 resolve();
387 }).catch(reportError(self, reject));
388 });
389 }
390
391 restart() {
392 var self = this;
393
394 return new Promise(function(resolve, reject) {
395 co(function*() {
396 }).catch(reportError(self, reject));
397 });
398 }
399}
400
401module.exports = Sharded;