UNPKG

5.32 kBJavaScriptView Raw
1var util = require('util');
2
3var mongojs = require('mongojs');
4var semver = require('semver');
5var moment = require('moment');
6var async = require('async');
7var transform = require('stream-wrapper').transform;
8
9var streams = require('./streams');
10
11var packageJson = require('../package.json');
12
13var TMP_COLLECTION = '_mongopatch_tmp';
14
15var getCollection = function(db, collection) {
16 if (typeof collection === 'string') {
17 return db && db.collection(collection);
18 }
19
20 return collection;
21};
22
23var getDatabase = function(db) {
24 return (typeof db === 'string') ? mongojs(db) : db;
25};
26
27var closeDatabase = function(db, original) {
28 var close = function(callback) {
29 db.close(callback);
30 };
31 var noop = function(callback) {
32 callback();
33 };
34
35 return (typeof original === 'string') && db ? close : noop;
36};
37
38var propagateError = function(src, dest) {
39 src.on('error', function(err) {
40 dest.emit('error', err);
41 });
42};
43
44var name = function(collection) {
45 return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
46};
47
48var create = function(patch, options) {
49 var applicationDb = getDatabase(options.db);
50 var logDb = options.logDb && getDatabase(options.logDb);
51
52 var closeApplicationDb = closeDatabase(applicationDb, options.db);
53 var closeLogDb = closeDatabase(logDb, options.logDb);
54
55 var progress = {
56 total: 0,
57 count: 0,
58 modified: 0,
59 speed: 0,
60 remaining: 0,
61 eta: 0,
62 time: 0,
63 percentage: 100,
64 diff: {}
65 };
66
67 var that = transform({ objectMode: true },
68 function(data, encoding, callback) {
69 progress = data.progress;
70 callback(null, data);
71 },
72 function(callback) {
73 teardown(callback);
74 });
75
76 that.options = options;
77 that.db = applicationDb;
78 that.logDb = logDb;
79 that.id = null;
80
81 that.version = function(version) {
82 that._version = version;
83 };
84 that.setup = function(callback) {
85 that._setup = callback;
86 };
87 that.update = function(collection, query, worker) {
88 if (!worker) {
89 worker = query;
90 query = null;
91 }
92
93 collection = applicationDb.collection(collection);
94
95 that.id = name(collection);
96 that._update = {
97 collection: collection,
98 query: query || {},
99 worker: worker
100 };
101 };
102 that.after = function(callback) {
103 that._after = callback;
104 };
105 that.teardown = function(callback) {
106 that._teardown = callback;
107 };
108
109 var setup = function(callback) {
110 if (!that._version || !semver.eq(that._version, packageJson.version)) {
111 var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
112 return callback(err);
113 }
114 if (!that._update) {
115 return callback(new Error('Update missing'));
116 }
117
118 var setupCallback = that._setup || function(fn) {
119 fn();
120 };
121
122 async.waterfall([
123 function(next) {
124 applicationDb.getCollectionNames(next);
125 },
126 function(collections, next) {
127 var updateCollectionName = that._update.collection.toString();
128
129 if (collections.indexOf(updateCollectionName) === -1) {
130 return callback(new Error(util.format('The collection "%s" does not seem to exist', updateCollectionName)));
131 }
132
133 setupCallback(next);
134 },
135 function() {
136 callback();
137 }
138 ], callback);
139 };
140 var update = function() {
141 var collection = that._update.collection;
142 var query = that._update.query;
143 var worker = that._update.worker;
144
145 var logCollection = getCollection(logDb, options.logCollection || that.id);
146 var updateStream;
147
148 var updateOptions = { afterCallback: that._after, concurrency: options.parallel, diffObject: options.diffObject };
149 var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
150
151 propagateError(stream, that);
152
153 if (options.update === 'dummy') {
154 var tmpCollection = applicationDb.collection(TMP_COLLECTION);
155
156 updateStream = logCollection ?
157 streams.logged.updateDummy(logCollection, tmpCollection, updateOptions) :
158 streams.updateDummy(tmpCollection, updateOptions);
159 } else if (options.update === 'query') {
160 updateStream = logCollection ?
161 streams.logged.updateUsingQuery(logCollection, updateOptions) :
162 streams.updateUsingQuery(updateOptions);
163 } else if (options.update === 'document') {
164 updateStream = logCollection ?
165 streams.logged.updateUsingDocument(logCollection, worker, updateOptions) :
166 streams.updateUsingDocument(worker, updateOptions);
167 }
168
169 stream = stream.pipe(updateStream);
170 propagateError(stream, that);
171
172 collection.count(query, function(err, count) {
173 if (err) {
174 return that.emit('error', err);
175 }
176
177 stream = stream.pipe(streams.progress(count));
178 propagateError(stream, that);
179
180 stream.pipe(that);
181 });
182 };
183 var teardown = function(callback) {
184 var teardownCallback = that._teardown || function(_, fn) {
185 fn();
186 };
187
188 var stats = {
189 total: progress.count,
190 modified: progress.modified,
191 time: progress.time,
192 speed: progress.time ? (progress.count / progress.time) : 0,
193 diff: progress.diff
194 };
195
196 teardownCallback(stats, function(err) {
197 if (err) {
198 return callback(err);
199 }
200
201 async.parallel([
202 closeApplicationDb,
203 closeLogDb
204 ], err => callback(err));
205 });
206 };
207
208 patch(that);
209
210 setImmediate(function() {
211 setup(function(err) {
212 if (err) {
213 return that.emit('error', err);
214 }
215
216 update();
217 });
218 });
219
220 return that;
221};
222
223module.exports = create;