1 | var util = require('util');
|
2 |
|
3 | var mongojs = require('mongojs');
|
4 | var stream = require('stream-wrapper');
|
5 | var semver = require('semver');
|
6 | var moment = require('moment');
|
7 | var async = require('async');
|
8 |
|
9 | var streams = require('./streams');
|
10 | var log = require('./log');
|
11 |
|
12 | var packageJson = require('../package.json');
|
13 |
|
14 | var TMP_COLLECTION = '_mongopatch_tmp';
|
15 |
|
16 | var emit = function(event, dest, src) {
|
17 | src.on(event, function() {
|
18 | var args = Array.prototype.slice.call(arguments);
|
19 | args.unshift(event);
|
20 |
|
21 | dest.emit.apply(dest, args);
|
22 | });
|
23 | };
|
24 |
|
25 | var name = function(collection) {
|
26 | return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
|
27 | };
|
28 |
|
29 | var create = function(patch, options) {
|
30 | var applicationDb = mongojs(options.db);
|
31 | var logDb = options.logDb && mongojs(options.logDb);
|
32 |
|
33 | var that = stream.transform({ objectMode: true },
|
34 | function(data, encoding, callback) {
|
35 | callback(null, data);
|
36 | },
|
37 | function(callback) {
|
38 | teardown(callback);
|
39 | });
|
40 |
|
41 | var progress = {
|
42 | total: 0,
|
43 | count: 0,
|
44 | modified: 0,
|
45 | speed: 0,
|
46 | remaining: 0,
|
47 | eta: 0,
|
48 | time: 0,
|
49 | percentage: 100,
|
50 | diff: {}
|
51 | };
|
52 |
|
53 | that.options = options;
|
54 | that.db = applicationDb;
|
55 | that.id = null;
|
56 |
|
57 | that.version = function(version) {
|
58 | that._version = version;
|
59 | };
|
60 | that.setup = function(callback) {
|
61 | that._setup = callback;
|
62 | };
|
63 | that.update = function(collection, query, worker) {
|
64 | if(!worker) {
|
65 | worker = query;
|
66 | query = null;
|
67 | }
|
68 |
|
69 | collection = applicationDb.collection(collection);
|
70 |
|
71 | that.id = name(collection);
|
72 | that._update = {
|
73 | collection: collection,
|
74 | query: query || {},
|
75 | worker: worker
|
76 | };
|
77 | };
|
78 | that.after = function(callback) {
|
79 | that._after = callback;
|
80 | };
|
81 | that.teardown = function(callback) {
|
82 | that._teardown = callback;
|
83 | };
|
84 |
|
85 | var setup = function(callback) {
|
86 | if(!that._version || !semver.eq(that._version, packageJson.version)) {
|
87 | var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
|
88 | return callback(err);
|
89 | }
|
90 | if(!that._update) {
|
91 | return callback(new Error('Update missing'));
|
92 | }
|
93 |
|
94 | var setupCallback = that._setup || function(fn) {
|
95 | fn();
|
96 | };
|
97 |
|
98 | setupCallback(function(err) {
|
99 | if(err) {
|
100 | return callback(err);
|
101 | }
|
102 |
|
103 | that.on('data', function(data) {
|
104 | progress = data.progress;
|
105 | });
|
106 |
|
107 | callback();
|
108 | });
|
109 | };
|
110 | var update = function() {
|
111 | var collection = that._update.collection;
|
112 | var query = that._update.query;
|
113 | var worker = that._update.worker;
|
114 |
|
115 | var logCollection = logDb && logDb.collection(that.id);
|
116 |
|
117 | var opts = { afterCallback: that._after, concurrency: options.parallel };
|
118 | var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
|
119 |
|
120 | emit('error', that, stream);
|
121 |
|
122 | if(options.dryRun) {
|
123 | var tmpCollection = applicationDb.collection(TMP_COLLECTION);
|
124 | var tmpStream = logCollection ? streams.logged.tmp(logCollection, tmpCollection, opts) : streams.tmp(tmpCollection, opts);
|
125 |
|
126 | stream = stream.pipe(tmpStream);
|
127 | } else {
|
128 | var updateStream = logCollection ? streams.logged.update(logCollection, opts) : streams.update(opts);
|
129 |
|
130 | stream = stream.pipe(updateStream);
|
131 | }
|
132 |
|
133 | emit('error', that, stream);
|
134 |
|
135 | collection.count(query, function(err, count) {
|
136 | if(err) {
|
137 | return that.emit('error', err);
|
138 | }
|
139 |
|
140 | stream = stream.pipe(streams.progress(count));
|
141 |
|
142 | if(options.output) {
|
143 | stream = stream.pipe(log({ patch: that.id, total: count }));
|
144 | that.on('error', log.error);
|
145 | }
|
146 |
|
147 | stream
|
148 | .pipe(that)
|
149 | .resume();
|
150 | });
|
151 | };
|
152 | var teardown = function(callback) {
|
153 | var teardownCallback = that._teardown || function(_, fn) {
|
154 | fn();
|
155 | };
|
156 |
|
157 | var stats = {
|
158 | total: progress.count,
|
159 | modified: progress.modified,
|
160 | time: progress.time,
|
161 | speed: progress.time ? (progress.count / progress.time) : 0,
|
162 | diff: progress.diff
|
163 | };
|
164 |
|
165 | teardownCallback(stats, function(err) {
|
166 | if(err) {
|
167 | return callback(err);
|
168 | }
|
169 |
|
170 | async.parallel([
|
171 | function(next) {
|
172 | applicationDb.close(next);
|
173 | },
|
174 | function(next) {
|
175 | if(!logDb) {
|
176 | return next();
|
177 | }
|
178 |
|
179 | logDb.close(next);
|
180 | }
|
181 | ], callback);
|
182 | });
|
183 | };
|
184 |
|
185 | patch(that);
|
186 |
|
187 | setup(function(err) {
|
188 | if(err) {
|
189 | return that.emit('error', err);
|
190 | }
|
191 |
|
192 | update();
|
193 | });
|
194 |
|
195 | return that;
|
196 | };
|
197 |
|
198 | module.exports = create;
|