1 | var util = require('util');
|
2 |
|
3 | var mongojs = require('mongojs');
|
4 | var semver = require('semver');
|
5 | var moment = require('moment');
|
6 | var async = require('async');
|
7 | var transform = require('stream-wrapper').transform;
|
8 |
|
9 | var streams = require('./streams');
|
10 |
|
11 | var packageJson = require('../package.json');
|
12 |
|
13 | var TMP_COLLECTION = '_mongopatch_tmp';
|
14 |
|
15 | var propagateError = function(src, dest) {
|
16 | src.on('error', function(err) {
|
17 | dest.emit('error', err);
|
18 | });
|
19 | };
|
20 |
|
21 | var name = function(collection) {
|
22 | return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
|
23 | };
|
24 |
|
25 | var create = function(patch, options) {
|
26 | var applicationDb = mongojs(options.db);
|
27 | var logDb = options.logDb && mongojs(options.logDb);
|
28 |
|
29 | var progress = {
|
30 | total: 0,
|
31 | count: 0,
|
32 | modified: 0,
|
33 | speed: 0,
|
34 | remaining: 0,
|
35 | eta: 0,
|
36 | time: 0,
|
37 | percentage: 100,
|
38 | diff: {}
|
39 | };
|
40 |
|
41 | var that = transform({ objectMode: true },
|
42 | function(data, encoding, callback) {
|
43 | progress = data.progress;
|
44 | callback(null, data);
|
45 | },
|
46 | function(callback) {
|
47 | teardown(callback);
|
48 | });
|
49 |
|
50 | that.options = options;
|
51 | that.db = applicationDb;
|
52 | that.id = null;
|
53 |
|
54 | that.version = function(version) {
|
55 | that._version = version;
|
56 | };
|
57 | that.setup = function(callback) {
|
58 | that._setup = callback;
|
59 | };
|
60 | that.update = function(collection, query, worker) {
|
61 | if(!worker) {
|
62 | worker = query;
|
63 | query = null;
|
64 | }
|
65 |
|
66 | collection = applicationDb.collection(collection);
|
67 |
|
68 | that.id = name(collection);
|
69 | that._update = {
|
70 | collection: collection,
|
71 | query: query || {},
|
72 | worker: worker
|
73 | };
|
74 | };
|
75 | that.after = function(callback) {
|
76 | that._after = callback;
|
77 | };
|
78 | that.teardown = function(callback) {
|
79 | that._teardown = callback;
|
80 | };
|
81 |
|
82 | var setup = function(callback) {
|
83 | if(!that._version || !semver.eq(that._version, packageJson.version)) {
|
84 | var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
|
85 | return callback(err);
|
86 | }
|
87 | if(!that._update) {
|
88 | return callback(new Error('Update missing'));
|
89 | }
|
90 |
|
91 | var setupCallback = that._setup || function(fn) {
|
92 | fn();
|
93 | };
|
94 |
|
95 | async.waterfall([
|
96 | function(next) {
|
97 | applicationDb.getCollectionNames(next);
|
98 | },
|
99 | function(collections, next) {
|
100 | var updateCollectionName = that._update.collection.toString();
|
101 | var exists = collections.some(function(name) {
|
102 | return (applicationDb.toString() + '.' + name) === updateCollectionName;
|
103 | });
|
104 |
|
105 | if(!exists) {
|
106 | return callback(new Error(util.format('The collection "%s" does not seem to exist', updateCollectionName)));
|
107 | }
|
108 |
|
109 | setupCallback(next);
|
110 | },
|
111 | function() {
|
112 | callback();
|
113 | }
|
114 | ], callback);
|
115 | };
|
116 | var update = function() {
|
117 | var collection = that._update.collection;
|
118 | var query = that._update.query;
|
119 | var worker = that._update.worker;
|
120 |
|
121 | var logCollection = logDb && logDb.collection(that.id);
|
122 | var updateStream;
|
123 |
|
124 | var updateOptions = { afterCallback: that._after, concurrency: options.parallel };
|
125 | var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
|
126 |
|
127 | propagateError(stream, that);
|
128 |
|
129 | if(options.update === 'dummy') {
|
130 | var tmpCollection = applicationDb.collection(TMP_COLLECTION);
|
131 |
|
132 | updateStream = logCollection ?
|
133 | streams.logged.updateDummy(logCollection, tmpCollection, updateOptions) :
|
134 | streams.updateDummy(tmpCollection, updateOptions);
|
135 | } else if(options.update === 'query') {
|
136 | updateStream = logCollection ?
|
137 | streams.logged.updateUsingQuery(logCollection, updateOptions) :
|
138 | streams.updateUsingQuery(updateOptions);
|
139 | } else if(options.update === 'document') {
|
140 | updateStream = logCollection ?
|
141 | streams.logged.updateUsingDocument(logCollection, worker, updateOptions) :
|
142 | streams.updateUsingDocument(worker, updateOptions);
|
143 | }
|
144 |
|
145 | stream = stream.pipe(updateStream);
|
146 | propagateError(stream, that);
|
147 |
|
148 | collection.count(query, function(err, count) {
|
149 | if(err) {
|
150 | return that.emit('error', err);
|
151 | }
|
152 |
|
153 | stream = stream.pipe(streams.progress(count));
|
154 | propagateError(stream, that);
|
155 |
|
156 | stream.pipe(that);
|
157 | });
|
158 | };
|
159 | var teardown = function(callback) {
|
160 | var teardownCallback = that._teardown || function(_, fn) {
|
161 | fn();
|
162 | };
|
163 |
|
164 | var stats = {
|
165 | total: progress.count,
|
166 | modified: progress.modified,
|
167 | time: progress.time,
|
168 | speed: progress.time ? (progress.count / progress.time) : 0,
|
169 | diff: progress.diff
|
170 | };
|
171 |
|
172 | teardownCallback(stats, function(err) {
|
173 | if(err) {
|
174 | return callback(err);
|
175 | }
|
176 |
|
177 | async.parallel([
|
178 | function(next) {
|
179 | applicationDb.close(next);
|
180 | },
|
181 | function(next) {
|
182 | if(!logDb) {
|
183 | return next();
|
184 | }
|
185 |
|
186 | logDb.close(next);
|
187 | }
|
188 | ], callback);
|
189 | });
|
190 | };
|
191 |
|
192 | patch(that);
|
193 |
|
194 | setImmediate(function() {
|
195 | setup(function(err) {
|
196 | if(err) {
|
197 | return that.emit('error', err);
|
198 | }
|
199 |
|
200 | update();
|
201 | });
|
202 | });
|
203 |
|
204 | return that;
|
205 | };
|
206 |
|
207 | module.exports = create;
|