UNPKG

4.27 kBJavaScriptView Raw
1var util = require('util');
2
3var mongojs = require('mongojs');
4var stream = require('stream-wrapper');
5var semver = require('semver');
6var moment = require('moment');
7var async = require('async');
8
9var streams = require('./streams');
10var log = require('./log');
11
12var packageJson = require('../package.json');
13
14var TMP_COLLECTION = '_mongopatch_tmp';
15
16var emit = function(event, dest, src) {
17 src.on(event, function() {
18 var args = Array.prototype.slice.call(arguments);
19 args.unshift(event);
20
21 dest.emit.apply(dest, args);
22 });
23};
24
25var name = function(collection) {
26 return 'patch_' + moment().format('YYYYMMDD.HHmmss.SSS') + '_' + collection.toString();
27};
28
29var create = function(patch, options) {
30 var applicationDb = mongojs(options.db);
31 var logDb = options.logDb && mongojs(options.logDb);
32
33 var that = stream.transform({ objectMode: true },
34 function(data, encoding, callback) {
35 callback(null, data);
36 },
37 function(callback) {
38 teardown(callback);
39 });
40
41 var progress = {
42 total: 0,
43 count: 0,
44 modified: 0,
45 speed: 0,
46 remaining: 0,
47 eta: 0,
48 time: 0,
49 percentage: 100,
50 diff: {}
51 };
52
53 that.options = options;
54 that.db = applicationDb;
55 that.id = null;
56
57 that.version = function(version) {
58 that._version = version;
59 };
60 that.setup = function(callback) {
61 that._setup = callback;
62 };
63 that.update = function(collection, query, worker) {
64 if(!worker) {
65 worker = query;
66 query = null;
67 }
68
69 collection = applicationDb.collection(collection);
70
71 that.id = name(collection);
72 that._update = {
73 collection: collection,
74 query: query || {},
75 worker: worker
76 };
77 };
78 that.after = function(callback) {
79 that._after = callback;
80 };
81 that.teardown = function(callback) {
82 that._teardown = callback;
83 };
84
85 var setup = function(callback) {
86 if(!that._version || !semver.eq(that._version, packageJson.version)) {
87 var err = new Error(util.format('Specified version (%s) does not match current system version (%s)', that._version, packageJson.version));
88 return callback(err);
89 }
90 if(!that._update) {
91 return callback(new Error('Update missing'));
92 }
93
94 var setupCallback = that._setup || function(fn) {
95 fn();
96 };
97
98 setupCallback(function(err) {
99 if(err) {
100 return callback(err);
101 }
102
103 that.on('data', function(data) {
104 progress = data.progress;
105 });
106
107 callback();
108 });
109 };
110 var update = function() {
111 var collection = that._update.collection;
112 var query = that._update.query;
113 var worker = that._update.worker;
114
115 var logCollection = logDb && logDb.collection(that.id);
116
117 var opts = { afterCallback: that._after, concurrency: options.parallel };
118 var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
119
120 emit('error', that, stream);
121
122 if(options.dryRun) {
123 var tmpCollection = applicationDb.collection(TMP_COLLECTION);
124 var tmpStream = logCollection ? streams.logged.tmp(logCollection, tmpCollection, opts) : streams.tmp(tmpCollection, opts);
125
126 stream = stream.pipe(tmpStream);
127 } else {
128 var updateStream = logCollection ? streams.logged.update(logCollection, opts) : streams.update(opts);
129
130 stream = stream.pipe(updateStream);
131 }
132
133 emit('error', that, stream);
134
135 collection.count(query, function(err, count) {
136 if(err) {
137 return that.emit('error', err);
138 }
139
140 stream = stream.pipe(streams.progress(count));
141
142 if(options.output) {
143 stream = stream.pipe(log({ patch: that.id, total: count }));
144 that.on('error', log.error);
145 }
146
147 stream
148 .pipe(that)
149 .resume();
150 });
151 };
152 var teardown = function(callback) {
153 var teardownCallback = that._teardown || function(_, fn) {
154 fn();
155 };
156
157 var stats = {
158 total: progress.count,
159 modified: progress.modified,
160 time: progress.time,
161 speed: progress.time ? (progress.count / progress.time) : 0,
162 diff: progress.diff
163 };
164
165 teardownCallback(stats, function(err) {
166 if(err) {
167 return callback(err);
168 }
169
170 async.parallel([
171 function(next) {
172 applicationDb.close(next);
173 },
174 function(next) {
175 if(!logDb) {
176 return next();
177 }
178
179 logDb.close(next);
180 }
181 ], callback);
182 });
183 };
184
185 patch(that);
186
187 setup(function(err) {
188 if(err) {
189 return that.emit('error', err);
190 }
191
192 update();
193 });
194
195 return that;
196};
197
198module.exports = create;