UNPKG

6.18 kBJavaScriptView Raw
1var async = require('async');
2var streams = require('stream-wrapper');
3var parallel = require('parallel-transform');
4var speedometer = require('speedometer');
5var bson = new (require('bson').pure().BSON)();
6
7var diff = require('./diff');
8
9var DEFAULT_CONCURRENCY = 1;
10
11var bsonCopy = function(obj) {
12 return bson.deserialize(bson.serialize(obj));
13};
14
15var extend = function(dest, src) {
16 if(!src) {
17 return dest;
18 }
19
20 Object.keys(src).forEach(function(key) {
21 var v = src[key];
22 dest[key] = v === undefined ? dest[key] : v;
23 });
24
25 return dest;
26};
27
28var noopCallback = function(doc, callback) {
29 callback();
30};
31
32var applyAfterCallback = function(afterCallback, patch, callback) {
33 var update = bsonCopy({
34 before: patch.before,
35 after: patch.after,
36 modified: patch.modified,
37 diff: patch.diff
38 });
39
40 afterCallback(update, callback);
41};
42
43var applyUpdate = function(patch, callback) {
44 patch.collection.findAndModify({
45 query: { _id: patch.before._id },
46 'new': true,
47 update: patch.modifier
48 }, function(err, after) {
49 // Ensure arity
50 callback(err, after);
51 });
52};
53
54var applyTmp = function(tmpCollection, patch, callback) {
55 var id = patch.before._id;
56 var after;
57
58 async.waterfall([
59 function(next) {
60 tmpCollection.save(patch.before, next);
61 },
62 function(savedDocument, _, next) {
63 tmpCollection.findAndModify({
64 query: { _id: id },
65 'new': true,
66 update: patch.modifier
67 }, next);
68 },
69 function(result, _, next) {
70 after = result;
71 tmpCollection.remove({ _id: id }, next);
72 },
73 function() {
74 callback(null, after);
75 }
76 ], callback);
77};
78
79var loggedTransformStream = function(logCollection, options, fn) {
80 if(!fn) {
81 fn = options;
82 options = null;
83 }
84
85 options = extend({
86 afterCallback: noopCallback,
87 concurrency: DEFAULT_CONCURRENCY
88 }, options);
89
90 return parallel(options.concurrency, function(patch, callback) {
91 var id = patch.before._id;
92 var logDocument;
93
94 async.waterfall([
95 function(next) {
96 logCollection.insert({
97 _id: id,
98 before: patch.before,
99 collection: patch.collection.toString(),
100 query: JSON.stringify(patch.query),
101 modifier: JSON.stringify(patch.modifier),
102 modified: false,
103 createdAt: new Date()
104 }, next);
105 },
106 function(result, next) {
107 logDocument = result[0];
108 fn(patch, next);
109 },
110 function(after, next) {
111 patch.after = after;
112 patch.diff = diff.deep(patch.before, patch.after);
113 patch.modified = !!Object.keys(patch.diff).length;
114
115 logCollection.update(
116 { _id: logDocument._id },
117 { $set: { after: after, modified: patch.modified, diff: patch.diff } },
118 function(err) {
119 next(err);
120 }
121 );
122 },
123 function(next) {
124 applyAfterCallback(options.afterCallback, patch, next);
125 },
126 function() {
127 callback(null, patch);
128 }
129 ], function(err) {
130 if(err) {
131 err.patch = patch;
132 }
133 if(err && logDocument) {
134 var documentError = {
135 message: err.message,
136 stack: err.stack
137 };
138
139 logCollection.update(
140 { _id: logDocument._id },
141 { $set: { error: documentError } },
142 function() {
143 callback(err);
144 }
145 );
146
147 return;
148 }
149
150 callback(err);
151 });
152 });
153};
154
155var transformStream = function(options, fn) {
156 if(!fn) {
157 fn = options;
158 options = null;
159 }
160
161 options = extend({
162 afterCallback: noopCallback,
163 concurrency: DEFAULT_CONCURRENCY
164 }, options);
165
166 return parallel(options.concurrency, function(patch, callback) {
167 async.waterfall([
168 function(next) {
169 fn(patch, next);
170 },
171 function(after, next) {
172 patch.after = after;
173 patch.diff = diff.deep(patch.before, patch.after);
174 patch.modified = !!Object.keys(patch.diff).length;
175
176 applyAfterCallback(options.afterCallback, patch, next);
177 },
178 function() {
179 callback(null, patch);
180 }
181 ], function(err) {
182 if(err) {
183 err.patch = patch;
184 }
185
186 callback(err);
187 });
188 });
189};
190
191var patchStream = function(collection, query, options, worker) {
192 if(!worker) {
193 worker = options;
194 options = null;
195 }
196
197 query = query || {};
198 options = extend({ concurrency: DEFAULT_CONCURRENCY }, options);
199
200 var patch = parallel(options.concurrency, function(document, callback) {
201 var clone = bsonCopy(document);
202
203 worker(document, function(err, modifier) {
204 if (err) {
205 err.patch = {
206 before: clone,
207 modifier: modifier
208 };
209
210 return callback(err);
211 }
212 if(!modifier) {
213 return callback();
214 }
215
216 callback(null, { modifier:modifier, before:clone, query:query, collection:collection });
217 });
218 });
219
220 collection
221 .find(query)
222 .sort({ _id: 1 })
223 .pipe(patch);
224
225 return patch;
226};
227
228var progressStream = function(total) {
229 var delta = {};
230 var count = 0;
231 var modified = 0;
232 var started = Date.now();
233 var speed = speedometer(); // documents per second
234
235 return streams.transform({ objectMode: true }, function(patch, encoding, callback) {
236 count++;
237 modified += (patch.modified ? 1 : 0);
238
239 var currentSpeed = speed(1);
240 var remaining = Math.max(total - count, 0);
241
242 patch.progress = {
243 total: total,
244 count: count,
245 modified: modified,
246 speed: currentSpeed,
247 remaining: remaining,
248 eta: Math.round(remaining / currentSpeed),
249 time: Math.round((Date.now() - started) / 1000),
250 percentage: (100 * count / total),
251 diff: bsonCopy(diff(patch.before, patch.after, { accumulate: delta, group: true }))
252 };
253
254 callback(null, patch);
255 });
256};
257
258var updateStream = function(options) {
259 return transformStream(options, applyUpdate);
260};
261
262var tmpStream = function(tmpCollection, options) {
263 return transformStream(options, function(patch, callback) {
264 applyTmp(tmpCollection, patch, callback);
265 });
266};
267
268var loggedUpdateStream = function(logCollection, options) {
269 return loggedTransformStream(logCollection, options, applyUpdate);
270};
271
272var loggedTmpStream = function(logCollection, tmpCollection, options) {
273 return loggedTransformStream(logCollection, options, function(patch, callback) {
274 applyTmp(tmpCollection, patch, callback);
275 });
276};
277
278exports.logged = {};
279
280exports.patch = patchStream;
281exports.progress = progressStream;
282exports.update = updateStream;
283exports.tmp = tmpStream;
284
285exports.logged.update = loggedUpdateStream;
286exports.logged.tmp = loggedTmpStream;