1 | var util = require('util');
|
2 | var mongojs = require('mongojs');
|
3 | var stream = require('stream-wrapper');
|
4 | var semver = require('semver');
|
5 | var moment = require('moment');
|
6 |
|
7 | var streams = require('./streams');
|
8 | var log = require('./log');
|
9 |
|
10 | var packageJson = require('../package.json');
|
11 |
|
12 | var TMP_COLLECTION = '_mongopatch_tmp';
|
13 |
|
14 | var emit = function(event, dest, src) {
|
15 | src.on(event, function() {
|
16 | var args = Array.prototype.slice.call(arguments);
|
17 | args.unshift(event);
|
18 |
|
19 | dest.emit.apply(dest, args);
|
20 | });
|
21 | };
|
22 |
|
23 | var name = function(collection) {
|
24 | return 'patch_' + moment().format('YYMMDD.HHmmss.SSS') + '_' + collection.toString();
|
25 | };
|
26 |
|
27 | var create = function(patch, options) {
|
28 | var applicationDb = mongojs(options.db);
|
29 | var logDb = options.logDb && mongojs(options.logDb);
|
30 |
|
31 | var that = stream.passThrough({ objectMode: true });
|
32 |
|
33 | that.db = applicationDb;
|
34 | that.id = null;
|
35 |
|
36 | that.update = function(collection, query, worker) {
|
37 | if(!worker) {
|
38 | worker = query;
|
39 | query = null;
|
40 | }
|
41 |
|
42 | collection = applicationDb.collection(collection);
|
43 |
|
44 | that.id = name(collection);
|
45 | that._update = {
|
46 | collection: collection,
|
47 | query: query || {},
|
48 | worker: worker
|
49 | };
|
50 | };
|
51 | that.after = function(callback) {
|
52 | that._after = callback;
|
53 | };
|
54 | that.version = function(version) {
|
55 | that._version = version;
|
56 | };
|
57 |
|
58 | var update = function() {
|
59 | var collection = that._update.collection;
|
60 | var query = that._update.query;
|
61 | var worker = that._update.worker;
|
62 |
|
63 | var logCollection = logDb && logDb.collection(that.id);
|
64 |
|
65 | var opts = { afterCallback: that._after, concurrency: options.parallel };
|
66 | var stream = streams.patch(collection, query, { concurrency: options.parallel }, worker);
|
67 |
|
68 | emit('error', that, stream);
|
69 |
|
70 | if(options.dryRun) {
|
71 | var tmpCollection = applicationDb.collection(TMP_COLLECTION);
|
72 | var tmpStream = logCollection ? streams.logged.tmp(logCollection, tmpCollection, opts) : streams.tmp(tmpCollection, opts);
|
73 |
|
74 | stream = stream.pipe(tmpStream);
|
75 | } else {
|
76 | var updateStream = logCollection ? streams.logged.update(logCollection, opts) : streams.update(opts);
|
77 |
|
78 | stream = stream.pipe(updateStream);
|
79 | }
|
80 |
|
81 | emit('error', that, stream);
|
82 |
|
83 | collection.count(query, function(err, count) {
|
84 | if(err) {
|
85 | return that.emit('error', err);
|
86 | }
|
87 |
|
88 | stream = stream.pipe(streams.progress(count));
|
89 |
|
90 | if(options.output) {
|
91 | stream = stream.pipe(log({ patch: that.id, total: count }));
|
92 | }
|
93 |
|
94 | stream = stream.pipe(that);
|
95 |
|
96 | stream.on('end', function() {
|
97 | applicationDb.close();
|
98 | logDb && logDb.close();
|
99 | });
|
100 |
|
101 | stream.resume();
|
102 | });
|
103 | };
|
104 |
|
105 | patch(that);
|
106 |
|
107 | setImmediate(function() {
|
108 | if(!that._version || !semver.eq(that._version, packageJson.version)) {
|
109 | return that.emit('error', new Error(util.format('Specified version (%s) does not match current system version (%s)',
|
110 | that._version, packageJson.version)));
|
111 | }
|
112 | if(!that._update) {
|
113 | return that.emit('error', new Error('Update missing'));
|
114 | }
|
115 |
|
116 | update();
|
117 | });
|
118 |
|
119 | return that;
|
120 | };
|
121 |
|
122 | module.exports = create;
|