UNPKG

8.81 kBJavaScriptView Raw
1var util = require('util');
2
3var async = require('async');
4var streams = require('stream-wrapper');
5var parallel = require('parallel-transform');
6var speedometer = require('speedometer');
7var bson = new (require('bson').pure().BSON)();
8
9var diff = require('./diff');
10
11var DEFAULT_CONCURRENCY = 1;
12
13var bsonCopy = function(obj) {
14 return bson.deserialize(bson.serialize(obj));
15};
16
17var extend = function(dest, src) {
18 if(!src) {
19 return dest;
20 }
21
22 Object.keys(src).forEach(function(key) {
23 var v = src[key];
24 dest[key] = v === undefined ? dest[key] : v;
25 });
26
27 return dest;
28};
29
30var noopCallback = function(doc, callback) {
31 callback();
32};
33
34var serializeWhereClause = function(document) {
35 var fields = Object.keys(document).length;
36 return util.format('Object.keys(this).length === %s', fields);
37};
38
39var applyAfterCallback = function(afterCallback, patch, callback) {
40 var update = bsonCopy({
41 before: patch.before,
42 after: patch.after,
43 modified: patch.modified,
44 diff: patch.diff
45 });
46
47 afterCallback(update, callback);
48};
49
50var applyUpdateUsingQuery = function(patch, callback) {
51 var query = bsonCopy(patch.query);
52 query._id = patch.before._id;
53
54 patch.collection.findAndModify({
55 query: query,
56 'new': true,
57 update: patch.modifier
58 }, function(err, after) {
59 // Ensure arity
60 callback(err, after);
61 });
62};
63
64var applyUpdateUsingDocument = function(worker, patch, callback) {
65 async.waterfall([
66 function(next) {
67 // Make sure no additional properties have been added to the document
68 // using the $where clause.
69 // Subdocuments and arrays are already exactly matched.
70 var query = bsonCopy(patch.before);
71 query.$where = serializeWhereClause(patch.before);
72
73 patch.collection.findAndModify({
74 query: query,
75 'new': true,
76 update: patch.modifier
77 }, next);
78 },
79 function(after, _, next) {
80 if(after) {
81 return callback(null, after);
82 }
83
84 var query = bsonCopy(patch.query);
85 query._id = patch.before._id;
86
87 patch.collection.findOne(query, next);
88 },
89 function(document, next) {
90 if(!document) {
91 // The document doesn't meet the criteria anymore
92 return callback(null, null);
93 }
94
95 patch.before = document;
96 patch.modifier = null;
97
98 worker(document, function(err, modifier) {
99 if(err) {
100 err.patch = patch;
101 }
102
103 next(err, modifier);
104 });
105 },
106 function(modifier) {
107 if(!modifier) {
108 return callback(null, null);
109 }
110
111 patch.modifier = modifier;
112 applyUpdateUsingDocument(worker, patch, callback);
113 }
114 ], callback);
115};
116
117var applyUpdateDummy = function(tmpCollection, patch, callback) {
118 var id = patch.before._id;
119 var after;
120
121 async.waterfall([
122 function(next) {
123 tmpCollection.save(patch.before, next);
124 },
125 function(savedDocument, _, next) {
126 tmpCollection.findAndModify({
127 query: { _id: id },
128 'new': true,
129 update: patch.modifier
130 }, next);
131 },
132 function(result, _, next) {
133 after = result;
134 tmpCollection.remove({ _id: id }, next);
135 },
136 function() {
137 callback(null, after);
138 }
139 ], callback);
140};
141
142var loggedTransformStream = function(logCollection, options, fn) {
143 if(!fn) {
144 fn = options;
145 options = null;
146 }
147
148 options = extend({
149 afterCallback: noopCallback,
150 concurrency: DEFAULT_CONCURRENCY
151 }, options);
152
153 return parallel(options.concurrency, function(patch, callback) {
154 var id = patch.before._id;
155 var logDocument;
156
157 async.waterfall([
158 function(next) {
159 logCollection.insert({
160 _id: id,
161 before: patch.before,
162 collection: patch.collection.toString(),
163 query: JSON.stringify(patch.query),
164 modifier: JSON.stringify(patch.modifier),
165 modified: false,
166 createdAt: new Date()
167 }, next);
168 },
169 function(result, next) {
170 logDocument = result[0];
171 fn(patch, next);
172 },
173 function(after, next) {
174 patch.skipped = !after;
175 patch.modified = false;
176
177 if(patch.skipped) {
178 return logCollection.update({ _id: logDocument._id }, { $set: { skipped: true } }, function(err) {
179 callback(err, patch);
180 });
181 }
182
183 patch.after = after;
184 patch.diff = diff.deep(patch.before, patch.after);
185 patch.modified = !!Object.keys(patch.diff).length;
186
187 logCollection.update(
188 { _id: logDocument._id },
189 { $set: { after: after, modified: patch.modified, diff: patch.diff } },
190 function(err) {
191 next(err);
192 }
193 );
194 },
195 function(next) {
196 applyAfterCallback(options.afterCallback, patch, next);
197 },
198 function() {
199 callback(null, patch);
200 }
201 ], function(err) {
202 if(err) {
203 err.patch = patch;
204 }
205 if(err && logDocument) {
206 var documentError = {
207 message: err.message,
208 stack: err.stack
209 };
210
211 logCollection.update(
212 { _id: logDocument._id },
213 { $set: { error: documentError } },
214 function() {
215 callback(err);
216 }
217 );
218
219 return;
220 }
221
222 callback(err);
223 });
224 });
225};
226
227var transformStream = function(options, fn) {
228 if(!fn) {
229 fn = options;
230 options = null;
231 }
232
233 options = extend({
234 afterCallback: noopCallback,
235 concurrency: DEFAULT_CONCURRENCY
236 }, options);
237
238 return parallel(options.concurrency, function(patch, callback) {
239 async.waterfall([
240 function(next) {
241 fn(patch, next);
242 },
243 function(after, next) {
244 patch.skipped = !after;
245 patch.modified = false;
246
247 if(patch.skipped) {
248 return callback(null, patch);
249 }
250
251 patch.after = after;
252 patch.diff = diff.deep(patch.before, patch.after);
253 patch.modified = !!Object.keys(patch.diff).length;
254
255 applyAfterCallback(options.afterCallback, patch, next);
256 },
257 function() {
258 callback(null, patch);
259 }
260 ], function(err) {
261 if(err) {
262 err.patch = patch;
263 }
264
265 callback(err);
266 });
267 });
268};
269
270var patchStream = function(collection, query, options, worker) {
271 if(!worker) {
272 worker = options;
273 options = null;
274 }
275
276 query = query || {};
277 options = extend({ concurrency: DEFAULT_CONCURRENCY }, options);
278
279 var patch = parallel(options.concurrency, function(document, callback) {
280 var clone = bsonCopy(document);
281
282 worker(document, function(err, modifier) {
283 if (err) {
284 err.patch = {
285 before: clone,
286 modifier: modifier
287 };
288
289 return callback(err);
290 }
291 if(!modifier) {
292 return callback();
293 }
294
295 callback(null, { modifier:modifier, before:clone, query:query, collection:collection });
296 });
297 });
298
299 collection
300 .find(query, {}, { timeout: false })
301 .sort({ _id: 1 })
302 .pipe(patch);
303
304 return patch;
305};
306
307var progressStream = function(total) {
308 var delta = {};
309 var count = 0;
310 var modified = 0;
311 var skipped = 0;
312 var started = Date.now();
313 var speed = speedometer(); // documents per second
314
315 return streams.transform({ objectMode: true }, function(patch, encoding, callback) {
316 count++;
317 modified += (patch.modified ? 1 : 0);
318 skipped += (patch.skipped ? 1 : 0);
319
320 var currentSpeed = speed(1);
321 var remaining = Math.max(total - count, 0);
322
323 var diffed = patch.skipped ? delta : diff(patch.before, patch.after, { accumulate: delta, group: true });
324
325 patch.progress = {
326 total: total,
327 count: count,
328 modified: modified,
329 skipped: skipped,
330 speed: currentSpeed,
331 remaining: remaining,
332 eta: Math.round(remaining / currentSpeed),
333 time: Math.round((Date.now() - started) / 1000),
334 percentage: (100 * count / total),
335 diff: bsonCopy(diffed)
336 };
337
338 callback(null, patch);
339 });
340};
341
342var updateUsingQueryStream = function(options) {
343 return transformStream(options, applyUpdateUsingQuery);
344};
345
346var updateUsingDocumentStream = function(worker, options) {
347 return transformStream(options, function(patch, callback) {
348 applyUpdateUsingDocument(worker, patch, callback);
349 });
350};
351
352var updateDummyStream = function(tmpCollection, options) {
353 return transformStream(options, function(patch, callback) {
354 applyUpdateDummy(tmpCollection, patch, callback);
355 });
356};
357
358var loggedUpdateUsingQueryStream = function(logCollection, options) {
359 return loggedTransformStream(logCollection, options, applyUpdateUsingQuery);
360};
361
362var loggedUpdateUsingDocumentStream = function(logCollection, worker, options) {
363 return loggedTransformStream(logCollection, options, function(patch, callback) {
364 applyUpdateUsingDocument(worker, patch, callback);
365 });
366};
367
368var loggedUpdateDummyStream = function(logCollection, tmpCollection, options) {
369 return loggedTransformStream(logCollection, options, function(patch, callback) {
370 applyUpdateDummy(tmpCollection, patch, callback);
371 });
372};
373
374exports.logged = {};
375
376exports.patch = patchStream;
377exports.progress = progressStream;
378
379exports.updateUsingQuery = updateUsingQueryStream;
380exports.updateUsingDocument = updateUsingDocumentStream;
381exports.updateDummy = updateDummyStream;
382
383exports.logged.updateUsingQuery = loggedUpdateUsingQueryStream;
384exports.logged.updateUsingDocument = loggedUpdateUsingDocumentStream;
385exports.logged.updateDummy = loggedUpdateDummyStream;