1 | var util = require('util');
|
2 |
|
3 | var mongojs = require('mongojs');
|
4 | var semver = require('semver');
|
5 | var moment = require('moment');
|
6 | var async = require('async');
|
7 | var transform = require('stream-wrapper').transform;
|
8 |
|
9 | var streams = require('./streams');
|
10 |
|
11 | var packageJson = require('../package.json');
|
12 |
|
13 | var TMP_COLLECTION = '_mongopatch_tmp';
|
14 |
|
15 | var getCollection = function(db, collection) {
|
16 | if (typeof collection === 'string') {
|
17 | return db && db.collection(collection);
|
18 | }
|
19 |
|
20 | return collection;
|
21 | };
|
22 |
|
23 | var getDatabase = function(db) {
|
24 | return (typeof db === 'string') ? mongojs(db) : db;
|
25 | };
|
26 |
|
27 | var closeDatabase = function(db, original) {
|
28 | var close = function(callback) {
|
29 | db.close(callback);
|
30 | };
|
31 | var noop = function(callback) {
|
32 | callback();
|
33 | };
|
34 |
|
35 | return (typeof original === 'string') && db ? close : noop;
|
36 | };
|
37 |
|
38 | var propagateError = function(src, dest) {
|
39 | src.on('error', function(err) {
|
40 | dest.emit('error', err);
|
41 | });
|
42 | };
|
43 |
|
44 | var name = function(collection) {
|
45 | return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
|
46 | };
|
47 |
|
48 | var create = function(patch, options) {
|
49 | var applicationDb = getDatabase(options.db);
|
50 | var logDb = options.logDb && getDatabase(options.logDb);
|
51 |
|
52 | var closeApplicationDb = closeDatabase(applicationDb, options.db);
|
53 | var closeLogDb = closeDatabase(logDb, options.logDb);
|
54 |
|
55 | var progress = {
|
56 | total: 0,
|
57 | count: 0,
|
58 | modified: 0,
|
59 | speed: 0,
|
60 | remaining: 0,
|
61 | eta: 0,
|
62 | time: 0,
|
63 | percentage: 100,
|
64 | diff: {}
|
65 | };
|
66 |
|
67 | var that = transform({ objectMode: true },
|
68 | function(data, encoding, callback) {
|
69 | progress = data.progress;
|
70 | callback(null, data);
|
71 | },
|
72 | function(callback) {
|
73 | teardown(callback);
|
74 | });
|
75 |
|
76 | that.options = options;
|
77 | that.db = applicationDb;
|
78 | that.logDb = logDb;
|
79 | that.id = null;
|
80 |
|
81 | that.version = function(version) {
|
82 | that._version = version;
|
83 | };
|
84 | that.setup = function(callback) {
|
85 | that._setup = callback;
|
86 | };
|
87 | that.update = function(collection, query, worker) {
|
88 | if (!worker) {
|
89 | worker = query;
|
90 | query = null;
|
91 | }
|
92 |
|
93 | collection = applicationDb.collection(collection);
|
94 |
|
95 | that.id = name(collection);
|
96 | that._update = {
|
97 | collection: collection,
|
98 | query: query || {},
|
99 | worker: worker
|
100 | };
|
101 | };
|
102 | that.after = function(callback) {
|
103 | that._after = callback;
|
104 | };
|
105 | that.teardown = function(callback) {
|
106 | that._teardown = callback;
|
107 | };
|
108 |
|
109 | var setup = function(callback) {
|
110 | if (!that._version || !semver.eq(that._version, packageJson.version)) {
|
111 | var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
|
112 | return callback(err);
|
113 | }
|
114 | if (!that._update) {
|
115 | return callback(new Error('Update missing'));
|
116 | }
|
117 |
|
118 | var setupCallback = that._setup || function(fn) {
|
119 | fn();
|
120 | };
|
121 |
|
122 | async.waterfall([
|
123 | function(next) {
|
124 | applicationDb.getCollectionNames(next);
|
125 | },
|
126 | function(collections, next) {
|
127 | var updateCollectionName = that._update.collection.toString();
|
128 |
|
129 | if (collections.indexOf(updateCollectionName) === -1) {
|
130 | return callback(new Error(util.format('The collection "%s" does not seem to exist', updateCollectionName)));
|
131 | }
|
132 |
|
133 | setupCallback(next);
|
134 | },
|
135 | function() {
|
136 | callback();
|
137 | }
|
138 | ], callback);
|
139 | };
|
140 | var update = function() {
|
141 | var collection = that._update.collection;
|
142 | var query = that._update.query;
|
143 | var worker = that._update.worker;
|
144 |
|
145 | var logCollection = getCollection(logDb, options.logCollection || that.id);
|
146 | var updateStream;
|
147 |
|
148 | var updateOptions = { afterCallback: that._after, concurrency: options.parallel, diffObject: options.diffObject };
|
149 | var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
|
150 |
|
151 | propagateError(stream, that);
|
152 |
|
153 | if (options.update === 'dummy') {
|
154 | var tmpCollection = applicationDb.collection(TMP_COLLECTION);
|
155 |
|
156 | updateStream = logCollection ?
|
157 | streams.logged.updateDummy(logCollection, tmpCollection, updateOptions) :
|
158 | streams.updateDummy(tmpCollection, updateOptions);
|
159 | } else if (options.update === 'query') {
|
160 | updateStream = logCollection ?
|
161 | streams.logged.updateUsingQuery(logCollection, updateOptions) :
|
162 | streams.updateUsingQuery(updateOptions);
|
163 | } else if (options.update === 'document') {
|
164 | updateStream = logCollection ?
|
165 | streams.logged.updateUsingDocument(logCollection, worker, updateOptions) :
|
166 | streams.updateUsingDocument(worker, updateOptions);
|
167 | }
|
168 |
|
169 | stream = stream.pipe(updateStream);
|
170 | propagateError(stream, that);
|
171 |
|
172 | collection.count(query, function(err, count) {
|
173 | if (err) {
|
174 | return that.emit('error', err);
|
175 | }
|
176 |
|
177 | stream = stream.pipe(streams.progress(count));
|
178 | propagateError(stream, that);
|
179 |
|
180 | stream.pipe(that);
|
181 | });
|
182 | };
|
183 | var teardown = function(callback) {
|
184 | var teardownCallback = that._teardown || function(_, fn) {
|
185 | fn();
|
186 | };
|
187 |
|
188 | var stats = {
|
189 | total: progress.count,
|
190 | modified: progress.modified,
|
191 | time: progress.time,
|
192 | speed: progress.time ? (progress.count / progress.time) : 0,
|
193 | diff: progress.diff
|
194 | };
|
195 |
|
196 | teardownCallback(stats, function(err) {
|
197 | if (err) {
|
198 | return callback(err);
|
199 | }
|
200 |
|
201 | async.parallel([
|
202 | closeApplicationDb,
|
203 | closeLogDb
|
204 | ], err => callback(err));
|
205 | });
|
206 | };
|
207 |
|
208 | patch(that);
|
209 |
|
210 | setImmediate(function() {
|
211 | setup(function(err) {
|
212 | if (err) {
|
213 | return that.emit('error', err);
|
214 | }
|
215 |
|
216 | update();
|
217 | });
|
218 | });
|
219 |
|
220 | return that;
|
221 | };
|
222 |
|
223 | module.exports = create;
|