UNPKG

2.97 kBJavaScriptView Raw
1var util = require('util');
2var mongojs = require('mongojs');
3var stream = require('stream-wrapper');
4var semver = require('semver');
5var moment = require('moment');
6
7var streams = require('./streams');
8var log = require('./log');
9
10var packageJson = require('../package.json');
11
12var TMP_COLLECTION = '_mongopatch_tmp';
13
14var emit = function(event, dest, src) {
15 src.on(event, function() {
16 var args = Array.prototype.slice.call(arguments);
17 args.unshift(event);
18
19 dest.emit.apply(dest, args);
20 });
21};
22
23var name = function(collection) {
24 return 'patch_' + moment().format('YYMMDD.HHmmss.SSS') + '_' + collection.toString();
25};
26
27var create = function(patch, options) {
28 var applicationDb = mongojs(options.db);
29 var logDb = options.logDb && mongojs(options.logDb);
30
31 var that = stream.passThrough({ objectMode: true });
32
33 that.db = applicationDb;
34 that.id = null;
35
36 that.update = function(collection, query, worker) {
37 if(!worker) {
38 worker = query;
39 query = null;
40 }
41
42 collection = applicationDb.collection(collection);
43
44 that.id = name(collection);
45 that._update = {
46 collection: collection,
47 query: query || {},
48 worker: worker
49 };
50 };
51 that.after = function(callback) {
52 that._after = callback;
53 };
54 that.version = function(version) {
55 that._version = version;
56 };
57
58 var update = function() {
59 var collection = that._update.collection;
60 var query = that._update.query;
61 var worker = that._update.worker;
62
63 var logCollection = logDb && logDb.collection(that.id);
64
65 var opts = { afterCallback: that._after, concurrency: options.parallel };
66 var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
67
68 emit('error', that, stream);
69
70 if(options.dryRun) {
71 var tmpCollection = applicationDb.collection(TMP_COLLECTION);
72 var tmpStream = logCollection ? streams.logged.tmp(logCollection, tmpCollection, opts) : streams.tmp(tmpCollection, opts);
73
74 stream = stream.pipe(tmpStream);
75 } else {
76 var updateStream = logCollection ? streams.logged.update(logCollection, opts) : streams.update(opts);
77
78 stream = stream.pipe(updateStream);
79 }
80
81 emit('error', that, stream);
82
83 collection.count(query, function(err, count) {
84 if(err) {
85 return that.emit('error', err);
86 }
87
88 stream = stream.pipe(streams.progress(count));
89
90 if(options.output) {
91 stream = stream.pipe(log({ patch: that.id, total: count }));
92 }
93
94 stream = stream.pipe(that);
95
96 stream.on('end', function() {
97 applicationDb.close();
98 logDb && logDb.close();
99 });
100
101 stream.resume();
102 });
103 };
104
105 patch(that);
106
107 setImmediate(function() {
108 if(!that._version || !semver.eq(that._version, packageJson.version)) {
109 return that.emit('error', new Error(util.format('Specified version (%s) does not match current system version (%s)',
110 that._version, packageJson.version)));
111 }
112 if(!that._update) {
113 return that.emit('error', new Error('Update missing'));
114 }
115
116 update();
117 });
118
119 return that;
120};
121
122module.exports = create;