UNPKG

4.84 kBJavaScriptView Raw
1var util = require('util');
2
3var mongojs = require('mongojs');
4var semver = require('semver');
5var moment = require('moment');
6var async = require('async');
7var transform = require('stream-wrapper').transform;
8
9var streams = require('./streams');
10
11var packageJson = require('../package.json');
12
13var TMP_COLLECTION = '_mongopatch_tmp';
14
15var propagateError = function(src, dest) {
16 src.on('error', function(err) {
17 dest.emit('error', err);
18 });
19};
20
21var name = function(collection) {
22 return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
23};
24
25var create = function(patch, options) {
26 var applicationDb = mongojs(options.db);
27 var logDb = options.logDb && mongojs(options.logDb);
28
29 var progress = {
30 total: 0,
31 count: 0,
32 modified: 0,
33 speed: 0,
34 remaining: 0,
35 eta: 0,
36 time: 0,
37 percentage: 100,
38 diff: {}
39 };
40
41 var that = transform({ objectMode: true },
42 function(data, encoding, callback) {
43 progress = data.progress;
44 callback(null, data);
45 },
46 function(callback) {
47 teardown(callback);
48 });
49
50 that.options = options;
51 that.db = applicationDb;
52 that.id = null;
53
54 that.version = function(version) {
55 that._version = version;
56 };
57 that.setup = function(callback) {
58 that._setup = callback;
59 };
60 that.update = function(collection, query, worker) {
61 if(!worker) {
62 worker = query;
63 query = null;
64 }
65
66 collection = applicationDb.collection(collection);
67
68 that.id = name(collection);
69 that._update = {
70 collection: collection,
71 query: query || {},
72 worker: worker
73 };
74 };
75 that.after = function(callback) {
76 that._after = callback;
77 };
78 that.teardown = function(callback) {
79 that._teardown = callback;
80 };
81
82 var setup = function(callback) {
83 if(!that._version || !semver.eq(that._version, packageJson.version)) {
84 var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
85 return callback(err);
86 }
87 if(!that._update) {
88 return callback(new Error('Update missing'));
89 }
90
91 var setupCallback = that._setup || function(fn) {
92 fn();
93 };
94
95 async.waterfall([
96 function(next) {
97 applicationDb.getCollectionNames(next);
98 },
99 function(collections, next) {
100 var updateCollectionName = that._update.collection.toString();
101 var exists = collections.some(function(name) {
102 return (applicationDb.toString() + '.' + name) === updateCollectionName;
103 });
104
105 if(!exists) {
106 return callback(new Error(util.format('The collection "%s" does not seem to exist', updateCollectionName)));
107 }
108
109 setupCallback(next);
110 },
111 function() {
112 callback();
113 }
114 ], callback);
115 };
116 var update = function() {
117 var collection = that._update.collection;
118 var query = that._update.query;
119 var worker = that._update.worker;
120
121 var logCollection = logDb && logDb.collection(that.id);
122 var updateStream;
123
124 var updateOptions = { afterCallback: that._after, concurrency: options.parallel };
125 var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
126
127 propagateError(stream, that);
128
129 if(options.update === 'dummy') {
130 var tmpCollection = applicationDb.collection(TMP_COLLECTION);
131
132 updateStream = logCollection ?
133 streams.logged.updateDummy(logCollection, tmpCollection, updateOptions) :
134 streams.updateDummy(tmpCollection, updateOptions);
135 } else if(options.update === 'query') {
136 updateStream = logCollection ?
137 streams.logged.updateUsingQuery(logCollection, updateOptions) :
138 streams.updateUsingQuery(updateOptions);
139 } else if(options.update === 'document') {
140 updateStream = logCollection ?
141 streams.logged.updateUsingDocument(logCollection, worker, updateOptions) :
142 streams.updateUsingDocument(worker, updateOptions);
143 }
144
145 stream = stream.pipe(updateStream);
146 propagateError(stream, that);
147
148 collection.count(query, function(err, count) {
149 if(err) {
150 return that.emit('error', err);
151 }
152
153 stream = stream.pipe(streams.progress(count));
154 propagateError(stream, that);
155
156 stream.pipe(that);
157 });
158 };
159 var teardown = function(callback) {
160 var teardownCallback = that._teardown || function(_, fn) {
161 fn();
162 };
163
164 var stats = {
165 total: progress.count,
166 modified: progress.modified,
167 time: progress.time,
168 speed: progress.time ? (progress.count / progress.time) : 0,
169 diff: progress.diff
170 };
171
172 teardownCallback(stats, function(err) {
173 if(err) {
174 return callback(err);
175 }
176
177 async.parallel([
178 function(next) {
179 applicationDb.close(next);
180 },
181 function(next) {
182 if(!logDb) {
183 return next();
184 }
185
186 logDb.close(next);
187 }
188 ], callback);
189 });
190 };
191
192 patch(that);
193
194 setImmediate(function() {
195 setup(function(err) {
196 if(err) {
197 return that.emit('error', err);
198 }
199
200 update();
201 });
202 });
203
204 return that;
205};
206
207module.exports = create;