UNPKG

6.7 kBJavaScriptView Raw
1var async = require('async');
2var streams = require('stream-wrapper');
3var parallel = require('parallel-transform');
4var speedometer = require('speedometer');
5var bson = new (require('bson').pure().BSON)();
6
7var diff = require('./diff');
8
9var DEFAULT_CONCURRENCY = 1;
10
11var bsonCopy = function(obj) {
12 return bson.deserialize(bson.serialize(obj));
13};
14
15var extend = function(dest, src) {
16 if(!src) {
17 return dest;
18 }
19
20 Object.keys(src).forEach(function(key) {
21 var v = src[key];
22 dest[key] = v === undefined ? dest[key] : v;
23 });
24
25 return dest;
26};
27
28var noopCallback = function(doc, callback) {
29 callback();
30};
31
32var applyAfterCallback = function(afterCallback, patch, callback) {
33 var update = bsonCopy({
34 before: patch.before,
35 after: patch.after,
36 modified: patch.modified,
37 diff: patch.diff
38 });
39
40 afterCallback(update, callback);
41};
42
43var applyUpdate = function(patch, callback) {
44 var query = bsonCopy(patch.query);
45 extend(query, { _id: patch.before._id });
46
47 patch.collection.findAndModify({
48 query: query,
49 'new': true,
50 update: patch.modifier
51 }, function(err, after) {
52 // Ensure arity
53 callback(err, after);
54 });
55};
56
57var applyTmp = function(tmpCollection, patch, callback) {
58 var id = patch.before._id;
59 var after;
60
61 async.waterfall([
62 function(next) {
63 tmpCollection.save(patch.before, next);
64 },
65 function(savedDocument, _, next) {
66 tmpCollection.findAndModify({
67 query: { _id: id },
68 'new': true,
69 update: patch.modifier
70 }, next);
71 },
72 function(result, _, next) {
73 after = result;
74 tmpCollection.remove({ _id: id }, next);
75 },
76 function() {
77 callback(null, after);
78 }
79 ], callback);
80};
81
82var loggedTransformStream = function(logCollection, options, fn) {
83 if(!fn) {
84 fn = options;
85 options = null;
86 }
87
88 options = extend({
89 afterCallback: noopCallback,
90 concurrency: DEFAULT_CONCURRENCY
91 }, options);
92
93 return parallel(options.concurrency, function(patch, callback) {
94 var id = patch.before._id;
95 var logDocument;
96
97 async.waterfall([
98 function(next) {
99 logCollection.insert({
100 _id: id,
101 before: patch.before,
102 collection: patch.collection.toString(),
103 query: JSON.stringify(patch.query),
104 modifier: JSON.stringify(patch.modifier),
105 modified: false,
106 createdAt: new Date()
107 }, next);
108 },
109 function(result, next) {
110 logDocument = result[0];
111 fn(patch, next);
112 },
113 function(after, next) {
114 patch.skipped = !after;
115 patch.modified = false;
116
117 if(patch.skipped) {
118 return logCollection.update({ _id: logDocument._id }, { $set: { skipped: true } }, function(err) {
119 callback(err, patch);
120 });
121 }
122
123 patch.after = after;
124 patch.diff = diff.deep(patch.before, patch.after);
125 patch.modified = !!Object.keys(patch.diff).length;
126
127 logCollection.update(
128 { _id: logDocument._id },
129 { $set: { after: after, modified: patch.modified, diff: patch.diff } },
130 function(err) {
131 next(err);
132 }
133 );
134 },
135 function(next) {
136 applyAfterCallback(options.afterCallback, patch, next);
137 },
138 function() {
139 callback(null, patch);
140 }
141 ], function(err) {
142 if(err) {
143 err.patch = patch;
144 }
145 if(err && logDocument) {
146 var documentError = {
147 message: err.message,
148 stack: err.stack
149 };
150
151 logCollection.update(
152 { _id: logDocument._id },
153 { $set: { error: documentError } },
154 function() {
155 callback(err);
156 }
157 );
158
159 return;
160 }
161
162 callback(err);
163 });
164 });
165};
166
167var transformStream = function(options, fn) {
168 if(!fn) {
169 fn = options;
170 options = null;
171 }
172
173 options = extend({
174 afterCallback: noopCallback,
175 concurrency: DEFAULT_CONCURRENCY
176 }, options);
177
178 return parallel(options.concurrency, function(patch, callback) {
179 async.waterfall([
180 function(next) {
181 fn(patch, next);
182 },
183 function(after, next) {
184 patch.skipped = !after;
185 patch.modified = false;
186
187 if(patch.skipped) {
188 return callback(null, patch);
189 }
190
191 patch.after = after;
192 patch.diff = diff.deep(patch.before, patch.after);
193 patch.modified = !!Object.keys(patch.diff).length;
194
195 applyAfterCallback(options.afterCallback, patch, next);
196 },
197 function() {
198 callback(null, patch);
199 }
200 ], function(err) {
201 if(err) {
202 err.patch = patch;
203 }
204
205 callback(err);
206 });
207 });
208};
209
210var patchStream = function(collection, query, options, worker) {
211 if(!worker) {
212 worker = options;
213 options = null;
214 }
215
216 query = query || {};
217 options = extend({ concurrency: DEFAULT_CONCURRENCY }, options);
218
219 var patch = parallel(options.concurrency, function(document, callback) {
220 var clone = bsonCopy(document);
221
222 worker(document, function(err, modifier) {
223 if (err) {
224 err.patch = {
225 before: clone,
226 modifier: modifier
227 };
228
229 return callback(err);
230 }
231 if(!modifier) {
232 return callback();
233 }
234
235 callback(null, { modifier:modifier, before:clone, query:query, collection:collection });
236 });
237 });
238
239 collection
240 .find(query, {}, { timeout: false })
241 .sort({ _id: 1 })
242 .pipe(patch);
243
244 return patch;
245};
246
247var progressStream = function(total) {
248 var delta = {};
249 var count = 0;
250 var modified = 0;
251 var skipped = 0;
252 var started = Date.now();
253 var speed = speedometer(); // documents per second
254
255 return streams.transform({ objectMode: true }, function(patch, encoding, callback) {
256 count++;
257 modified += (patch.modified ? 1 : 0);
258 skipped += (patch.skipped ? 1 : 0);
259
260 var currentSpeed = speed(1);
261 var remaining = Math.max(total - count, 0);
262
263 patch.progress = {
264 total: total,
265 count: count,
266 modified: modified,
267 skipped: skipped,
268 speed: currentSpeed,
269 remaining: remaining,
270 eta: Math.round(remaining / currentSpeed),
271 time: Math.round((Date.now() - started) / 1000),
272 percentage: (100 * count / total),
273 diff: bsonCopy(diff(patch.before, patch.after, { accumulate: delta, group: true }))
274 };
275
276 callback(null, patch);
277 });
278};
279
280var updateStream = function(options) {
281 return transformStream(options, applyUpdate);
282};
283
284var tmpStream = function(tmpCollection, options) {
285 return transformStream(options, function(patch, callback) {
286 applyTmp(tmpCollection, patch, callback);
287 });
288};
289
290var loggedUpdateStream = function(logCollection, options) {
291 return loggedTransformStream(logCollection, options, applyUpdate);
292};
293
294var loggedTmpStream = function(logCollection, tmpCollection, options) {
295 return loggedTransformStream(logCollection, options, function(patch, callback) {
296 applyTmp(tmpCollection, patch, callback);
297 });
298};
299
300exports.logged = {};
301
302exports.patch = patchStream;
303exports.progress = progressStream;
304exports.update = updateStream;
305exports.tmp = tmpStream;
306
307exports.logged.update = loggedUpdateStream;
308exports.logged.tmp = loggedTmpStream;