UNPKG

10.4 kBJavaScriptView Raw
1var fs = require('fs');
2var util = require('util');
3var path = require('path');
4
5var async = require('async');
6var streams = require('stream-wrapper');
7var parallel = require('parallel-transform');
8var speedometer = require('speedometer');
9var stringify = require('json-stable-stringify');
10var bson = new (require('bson').pure().BSON)();
11var mongojs = require('mongojs');
12var traverse = require('traverse');
13
14var diff = require('./diff');
15
16var DEFAULT_CONCURRENCY = 1;
17var ATTEMPT_LIMIT = 5;
18
19var JSON_STRINGIFY_SOURCE = fs.readFileSync(require.resolve('json-stable-stringify'), 'utf-8');
20var COMPARE_TEMPLATE = fs.readFileSync(path.join(__dirname, 'compare.js.txt'), 'utf-8');
21
22var bsonCopy = function(obj) {
23 return bson.deserialize(bson.serialize(obj));
24};
25
26var extend = function(dest, src) {
27 if (!src) {
28 return dest;
29 }
30
31 Object.keys(src).forEach(function(key) {
32 var v = src[key];
33 dest[key] = v === undefined ? dest[key] : v;
34 });
35
36 return dest;
37};
38
39var noopCallback = function(doc, callback) {
40 callback();
41};
42
43var applyAfterCallback = function(afterCallback, patch, callback) {
44 var update = bsonCopy({
45 before: patch.before,
46 after: patch.after,
47 modified: patch.modified,
48 diff: patch.diff
49 });
50
51 afterCallback(update, callback);
52};
53
54var applyUpdateUsingQuery = function(patch, callback) {
55 var query = bsonCopy(patch.query);
56 query._id = patch.before._id;
57
58 patch.collection.findAndModify({
59 query: query,
60 'new': true,
61 update: patch.modifier
62 }, function(err, after) {
63 // Ensure arity
64 callback(err, after);
65 });
66};
67
68var applyUpdateUsingWhere = function(patch, callback) {
69 var document = traverse(patch.before).map(function(value) {
70 if (value instanceof mongojs.ObjectId) {
71 this.update(value.toJSON(), true);
72 } else if (value instanceof Date) {
73 this.update(value.toJSON(), true);
74 } else if (value instanceof mongojs.NumberLong) {
75 this.update(value.toNumber(), true);
76 } else if (value instanceof mongojs.Timestamp) {
77 this.update(util.format('%s:%s', value.getLowBits(), value.getHighBits()), true);
78 }
79 });
80
81 document = JSON.stringify(stringify(document));
82
83 var where = util.format(COMPARE_TEMPLATE, JSON_STRINGIFY_SOURCE, document);
84 var query = { _id: patch.before._id };
85 query.$where = where;
86
87 patch.collection.findAndModify({
88 query: query,
89 'new': true,
90 update: patch.modifier
91 }, function(err, after) {
92 callback(err, after);
93 });
94};
95
96var applyUpdateUsingDocument = function(worker, patch, callback) {
97 if (patch.attempts === ATTEMPT_LIMIT) {
98 // In some cases a document doesn't match itself when used
99 // as a query (perhaps a bug). Try one last time using the slow
100 // where operator.
101
102 patch.attempts++;
103 return applyUpdateUsingWhere(patch, callback);
104 }
105
106 async.waterfall([
107 function(next) {
108 // Make sure if additional properties have been added to the root
109 // of the document we still satisfy the query.
110 var query = { $and: [bsonCopy(patch.before), bsonCopy(patch.query)] };
111
112 patch.collection.findAndModify({
113 query: query,
114 'new': true,
115 update: patch.modifier
116 }, next);
117 },
118 function(after, _, next) {
119 if (after) {
120 return callback(null, after);
121 }
122
123 var query = bsonCopy(patch.query);
124 query._id = patch.before._id;
125
126 patch.collection.findOne(query, next);
127 },
128 function(document, next) {
129 if (!document) {
130 // The document doesn't meet the criteria anymore
131 return callback(null, null);
132 }
133
134 patch.before = document;
135 patch.modifier = null;
136
137 worker(document, function(err, modifier) {
138 if (err) {
139 err.patch = patch;
140 }
141
142 next(err, modifier);
143 });
144 },
145 function(modifier) {
146 if (!modifier) {
147 return callback(null, null);
148 }
149
150 patch.modifier = modifier;
151 patch.attempts++;
152
153 applyUpdateUsingDocument(worker, patch, callback);
154 }
155 ], callback);
156};
157
158var applyUpdateDummy = function(tmpCollection, patch, callback) {
159 var id = patch.before._id;
160 var after;
161
162 async.waterfall([
163 function(next) {
164 tmpCollection.save(patch.before, next);
165 },
166 function(savedDocument, next) {
167 tmpCollection.findAndModify({
168 query: { _id: id },
169 'new': true,
170 update: patch.modifier
171 }, next);
172 },
173 function(result, _, next) {
174 after = result;
175 tmpCollection.remove({ _id: id }, next);
176 },
177 function() {
178 callback(null, after);
179 }
180 ], callback);
181};
182
183var loggedTransformStream = function(logCollection, options, fn) {
184 if (!fn) {
185 fn = options;
186 options = null;
187 }
188
189 options = extend({
190 afterCallback: noopCallback,
191 diffObject: false,
192 concurrency: DEFAULT_CONCURRENCY
193 }, options);
194
195 return parallel(options.concurrency, function(patch, callback) {
196 var logDocument;
197 async.waterfall([
198 function(next) {
199 logCollection.insert({
200 before: patch.before,
201 collection: patch.collection.toString(),
202 query: JSON.stringify(patch.query),
203 modifier: JSON.stringify(patch.modifier),
204 modified: false,
205 createdAt: new Date()
206 }, next);
207 },
208 function(result, next) {
209 logDocument = result;
210 patch.attempts = patch.attempts || 1;
211
212 fn(patch, next);
213 },
214 function(after, next) {
215 patch.skipped = !after;
216 patch.modified = false;
217
218 if (patch.skipped) {
219 return logCollection.update({ _id: logDocument._id }, { $set: { modified: false, skipped: true, attempts: patch.attempts } },
220 function(err) {
221 callback(err, patch);
222 }
223 );
224 }
225
226 patch.after = after;
227 patch.diff = diff.deep(patch.before, patch.after, { object: options.diffObject });
228 patch.modified = !!Object.keys(patch.diff).length;
229
230 logCollection.update(
231 { _id: logDocument._id },
232 { $set: { after: after, modified: patch.modified, skipped: false, diff: patch.diff, attempts: patch.attempts } },
233 next
234 );
235 },
236 function(_, next) {
237 applyAfterCallback(options.afterCallback, patch, next);
238 },
239 function() {
240 callback(null, patch);
241 }
242 ], function(err) {
243 if (err) {
244 err.patch = patch;
245 }
246 if (err && logDocument) {
247 var documentError = {
248 message: err.message,
249 stack: err.stack
250 };
251
252 logCollection.update(
253 { _id: logDocument._id },
254 { $set: { error: documentError, attempts: patch.attempts } },
255 function() {
256 callback(err);
257 }
258 );
259
260 return;
261 }
262
263 callback(err);
264 });
265 });
266};
267
268var transformStream = function(options, fn) {
269 if (!fn) {
270 fn = options;
271 options = null;
272 }
273
274 options = extend({
275 afterCallback: noopCallback,
276 diffObject: false,
277 concurrency: DEFAULT_CONCURRENCY
278 }, options);
279
280 return parallel(options.concurrency, function(patch, callback) {
281 async.waterfall([
282 function(next) {
283 patch.attempts = patch.attempts || 1;
284 fn(patch, next);
285 },
286 function(after, next) {
287 patch.skipped = !after;
288 patch.modified = false;
289
290 if (patch.skipped) {
291 return callback(null, patch);
292 }
293
294 patch.after = after;
295 patch.diff = diff.deep(patch.before, patch.after, { object: options.diffObject });
296 patch.modified = !!Object.keys(patch.diff).length;
297
298 applyAfterCallback(options.afterCallback, patch, next);
299 },
300 function() {
301 callback(null, patch);
302 }
303 ], function(err) {
304 if (err) {
305 err.patch = patch;
306 }
307
308 callback(err);
309 });
310 });
311};
312
313var patchStream = function(collection, query, options, worker) {
314 if (!worker) {
315 worker = options;
316 options = null;
317 }
318
319 query = query || {};
320 options = extend({ concurrency: DEFAULT_CONCURRENCY }, options);
321
322 var patch = parallel(options.concurrency, function(document, callback) {
323 var clone = bsonCopy(document);
324
325 worker(document, function(err, modifier) {
326 if (err) {
327 err.patch = {
328 before: clone,
329 modifier: modifier
330 };
331
332 return callback(err);
333 }
334 if (!modifier) {
335 return callback();
336 }
337
338 callback(null, { modifier: modifier, before: clone, query: query, collection: collection });
339 });
340 });
341
342 collection
343 .find(query, {}, { timeout: false })
344 .sort({ _id: 1 })
345 .pipe(patch);
346
347 return patch;
348};
349
350var progressStream = function(total) {
351 var delta = {};
352 var count = 0;
353 var modified = 0;
354 var skipped = 0;
355 var started = Date.now();
356 var speed = speedometer(); // documents per second
357
358 return streams.transform({ objectMode: true }, function(patch, encoding, callback) {
359 count++;
360 modified += (patch.modified ? 1 : 0);
361 skipped += (patch.skipped ? 1 : 0);
362
363 var currentSpeed = speed(1);
364 var remaining = Math.max(total - count, 0);
365
366 var diffed = patch.skipped ? delta : diff(patch.before, patch.after, { accumulate: delta, group: true });
367
368 patch.progress = {
369 total: total,
370 count: count,
371 modified: modified,
372 skipped: skipped,
373 speed: currentSpeed,
374 remaining: remaining,
375 eta: Math.round(remaining / currentSpeed),
376 time: Math.round((Date.now() - started) / 1000),
377 percentage: (100 * count / total),
378 diff: bsonCopy(diffed)
379 };
380
381 callback(null, patch);
382 });
383};
384
385var updateUsingQueryStream = function(options) {
386 return transformStream(options, applyUpdateUsingQuery);
387};
388
389var updateUsingDocumentStream = function(worker, options) {
390 return transformStream(options, function(patch, callback) {
391 applyUpdateUsingDocument(worker, patch, callback);
392 });
393};
394
395var updateDummyStream = function(tmpCollection, options) {
396 return transformStream(options, function(patch, callback) {
397 applyUpdateDummy(tmpCollection, patch, callback);
398 });
399};
400
401var loggedUpdateUsingQueryStream = function(logCollection, options) {
402 return loggedTransformStream(logCollection, options, applyUpdateUsingQuery);
403};
404
405var loggedUpdateUsingDocumentStream = function(logCollection, worker, options) {
406 return loggedTransformStream(logCollection, options, function(patch, callback) {
407 applyUpdateUsingDocument(worker, patch, callback);
408 });
409};
410
411var loggedUpdateDummyStream = function(logCollection, tmpCollection, options) {
412 return loggedTransformStream(logCollection, options, function(patch, callback) {
413 applyUpdateDummy(tmpCollection, patch, callback);
414 });
415};
416
417exports.logged = {};
418
419exports.patch = patchStream;
420exports.progress = progressStream;
421
422exports.updateUsingQuery = updateUsingQueryStream;
423exports.updateUsingDocument = updateUsingDocumentStream;
424exports.updateDummy = updateDummyStream;
425
426exports.logged.updateUsingQuery = loggedUpdateUsingQueryStream;
427exports.logged.updateUsingDocument = loggedUpdateUsingDocumentStream;
428exports.logged.updateDummy = loggedUpdateDummyStream;