1 | // Copyright © 2017, 2019 IBM Corp. All rights reserved.
|
2 | //
|
3 | // Licensed under the Apache License, Version 2.0 (the "License");
|
4 | // you may not use this file except in compliance with the License.
|
5 | // You may obtain a copy of the License at
|
6 | //
|
7 | // http://www.apache.org/licenses/LICENSE-2.0
|
8 | //
|
9 | // Unless required by applicable law or agreed to in writing, software
|
10 | // distributed under the License is distributed on an "AS IS" BASIS,
|
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | // See the License for the specific language governing permissions and
|
13 | // limitations under the License.
|
14 | ;
|
15 |
|
16 | const async = require('async');
|
17 | const events = require('events');
|
18 | const fs = require('fs');
|
19 | const error = require('./error.js');
|
20 | const spoolchanges = require('./spoolchanges.js');
|
21 | const logfilesummary = require('./logfilesummary.js');
|
22 | const logfilegetbatches = require('./logfilegetbatches.js');
|
23 |
|
24 | /**
|
25 | * Read documents from a database to be backed up.
|
26 | *
|
27 | * @param {string} db - `@cloudant/cloudant` DB object for source database.
|
28 | * @param {number} blocksize - number of documents to download in single request
|
29 | * @param {number} parallelism - number of concurrent downloads
|
30 | * @param {string} log - path to log file to use
|
31 | * @param {boolean} resume - whether to resume from an existing log file
|
32 | * @returns EventEmitter with following events:
|
33 | * - `received` - called with a block of documents to write to backup
|
34 | * - `error` - on error
|
35 | * - `finished` - when backup process is finished (either complete or errored)
|
36 | */
|
37 | module.exports = function(db, options) {
|
38 | const ee = new events.EventEmitter();
|
39 | const start = new Date().getTime(); // backup start time
|
40 | const batchesPerDownloadSession = 50; // max batches to read from log file for download at a time (prevent OOM)
|
41 |
|
42 | function proceedWithBackup() {
|
43 | if (options.resume) {
|
44 | // pick up from existing log file from previous run
|
45 | downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
|
46 | } else {
|
47 | // create new log file and process
|
48 | spoolchanges(db, options.log, options.bufferSize, ee, function(err) {
|
49 | if (err) {
|
50 | ee.emit('error', err);
|
51 | } else {
|
52 | downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
|
53 | }
|
54 | });
|
55 | }
|
56 | }
|
57 |
|
58 | validateBulkGetSupport(db, function(err) {
|
59 | if (err) {
|
60 | return ee.emit('error', err);
|
61 | } else {
|
62 | proceedWithBackup();
|
63 | }
|
64 | });
|
65 |
|
66 | return ee;
|
67 | };
|
68 |
|
69 | /**
|
70 | * Validate /_bulk_get support for a specified database.
|
71 | *
|
72 | * @param {string} db - nodejs-cloudant db
|
73 | * @param {function} callback - called on completion with signature (err)
|
74 | */
|
75 | function validateBulkGetSupport(db, callback) {
|
76 | db.head('_bulk_get',
|
77 | function(err) {
|
78 | err = error.convertResponseError(err, function(err) {
|
79 | switch (err.statusCode) {
|
80 | case undefined:
|
81 | // There was no status code on the error
|
82 | return err;
|
83 | case 404:
|
84 | return new error.BackupError('BulkGetError', 'Database does not support /_bulk_get endpoint');
|
85 | case 405:
|
86 | // => supports /_bulk_get endpoint
|
87 | return;
|
88 | default:
|
89 | return new error.HTTPError(err);
|
90 | }
|
91 | });
|
92 | callback(err);
|
93 | });
|
94 | }
|
95 |
|
96 | /**
|
97 | * Download remaining batches in a log file, splitting batches into sets
|
98 | * to avoid enqueueing too many in one go.
|
99 | *
|
100 | * @param {string} log - log file name to maintain download state
|
101 | * @param {string} db - nodejs-cloudant db
|
102 | * @param {events.EventEmitter} ee - event emitter to emit received events on
|
103 | * @param {time} startTime - start time for backup process
|
104 | * @param {number} batchesPerDownloadSession - max batches to enqueue for
|
105 | * download at a time. As batches contain many doc IDs, this helps avoid
|
106 | * exhausting memory.
|
107 | * @param {number} parallelism - number of concurrent downloads
|
108 | * @returns function to call do download remaining batches with signature
|
109 | * (err, {batches: batch, docs: doccount}) {@see spoolchanges}.
|
110 | */
|
111 | function downloadRemainingBatches(log, db, ee, startTime, batchesPerDownloadSession, parallelism) {
|
112 | var total = 0; // running total of documents downloaded so far
|
113 | var noRemainingBatches = false;
|
114 |
|
115 | // Generate a set of batches (up to batchesPerDownloadSession) to download from the
|
116 | // log file and download them. Set noRemainingBatches to `true` for last batch.
|
117 | function downloadSingleBatchSet(done) {
|
118 | // Fetch the doc IDs for the batches in the current set to
|
119 | // download them.
|
120 | function batchSetComplete(err, data) {
|
121 | total = data.total;
|
122 | done();
|
123 | }
|
124 | function processRetrievedBatches(err, batches) {
|
125 | // process them in parallelised queue
|
126 | processBatchSet(db, parallelism, log, batches, ee, startTime, total, batchSetComplete);
|
127 | }
|
128 |
|
129 | readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, function(err, batchSetIds) {
|
130 | if (err) {
|
131 | ee.emit('error', err);
|
132 | // Stop processing changes file for fatal errors
|
133 | noRemainingBatches = true;
|
134 | done();
|
135 | } else {
|
136 | if (batchSetIds.length === 0) {
|
137 | noRemainingBatches = true;
|
138 | return done();
|
139 | }
|
140 | logfilegetbatches(log, batchSetIds, processRetrievedBatches);
|
141 | }
|
142 | });
|
143 | }
|
144 |
|
145 | // Return true if all batches in log file have been downloaded
|
146 | function isFinished(callback) { callback(null, noRemainingBatches); }
|
147 |
|
148 | function onComplete() {
|
149 | ee.emit('finished', { total: total });
|
150 | }
|
151 |
|
152 | async.doUntil(downloadSingleBatchSet, isFinished, onComplete);
|
153 | }
|
154 |
|
155 | /**
|
156 | * Return a set of uncompleted download batch IDs from the log file.
|
157 | *
|
158 | * @param {string} log - log file path
|
159 | * @param {number} batchesPerDownloadSession - maximum IDs to return
|
160 | * @param {function} callback - sign (err, batchSetIds array)
|
161 | */
|
162 | function readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, callback) {
|
163 | logfilesummary(log, function processSummary(err, summary) {
|
164 | if (!summary.changesComplete) {
|
165 | callback(new error.BackupError('IncompleteChangesInLogFile',
|
166 | 'WARNING: Changes did not finish spooling'));
|
167 | return;
|
168 | }
|
169 | if (Object.keys(summary.batches).length === 0) {
|
170 | return callback(null, []);
|
171 | }
|
172 |
|
173 | // batch IDs are the property names of summary.batches
|
174 | var batchSetIds = getPropertyNames(summary.batches, batchesPerDownloadSession);
|
175 | callback(null, batchSetIds);
|
176 | });
|
177 | }
|
178 |
|
179 | /**
|
180 | * Download a set of batches retrieved from a log file. When a download is
|
181 | * complete, add a line to the logfile indicating such.
|
182 | *
|
183 | * @param {any} db - nodejs-cloudant database
|
184 | * @param {any} parallelism - number of concurrent requests to make
|
185 | * @param {any} log - log file to drive downloads from
|
186 | * @param {any} batches - batches to download
|
187 | * @param {any} ee - event emitter for progress. This funciton emits
|
188 | * received and error events.
|
189 | * @param {any} start - time backup started, to report deltas
|
190 | * @param {any} grandtotal - count of documents downloaded prior to this set
|
191 | * of batches
|
192 | * @param {any} callback - completion callback, (err, {total: number}).
|
193 | */
|
194 | function processBatchSet(db, parallelism, log, batches, ee, start, grandtotal, callback) {
|
195 | var hasErrored = false;
|
196 | var total = grandtotal;
|
197 |
|
198 | // queue to process the fetch requests in an orderly fashion using _bulk_get
|
199 | var q = async.queue(function(payload, done) {
|
200 | var output = [];
|
201 | var thisBatch = payload.batch;
|
202 | delete payload.batch;
|
203 | delete payload.command;
|
204 |
|
205 | function logCompletedBatch(batch) {
|
206 | if (log) {
|
207 | fs.appendFile(log, ':d batch' + thisBatch + '\n', done);
|
208 | } else {
|
209 | done();
|
210 | }
|
211 | }
|
212 |
|
213 | // do the /db/_bulk_get request
|
214 | // Note: this should use built-in _bulk_get, but revs is not accepted as
|
215 | // part of the request body by the server yet. Working around using request
|
216 | // method to POST with a query string.
|
217 | db.server.request(
|
218 | { method: 'POST', db: db.config.db, path: '_bulk_get', qs: { revs: true }, body: payload },
|
219 | function(err, body) {
|
220 | if (err) {
|
221 | if (!hasErrored) {
|
222 | hasErrored = true;
|
223 | err = error.convertResponseError(err);
|
224 | // Kill the queue for fatal errors
|
225 | q.kill();
|
226 | ee.emit('error', err);
|
227 | }
|
228 | done();
|
229 | } else {
|
230 | // create an output array with the docs returned
|
231 | body.results.forEach(function(d) {
|
232 | if (d.docs) {
|
233 | d.docs.forEach(function(doc) {
|
234 | if (doc.ok) {
|
235 | output.push(doc.ok);
|
236 | }
|
237 | });
|
238 | }
|
239 | });
|
240 | total += output.length;
|
241 | var t = (new Date().getTime() - start) / 1000;
|
242 | ee.emit('received', {
|
243 | batch: thisBatch,
|
244 | data: output,
|
245 | length: output.length,
|
246 | time: t,
|
247 | total: total
|
248 | }, q, logCompletedBatch);
|
249 | }
|
250 | });
|
251 | }, parallelism);
|
252 |
|
253 | for (var i in batches) {
|
254 | q.push(batches[i]);
|
255 | }
|
256 |
|
257 | q.drain(function() {
|
258 | callback(null, { total: total });
|
259 | });
|
260 | }
|
261 |
|
262 | /**
|
263 | * Returns first N properties on an object.
|
264 | *
|
265 | * @param {object} obj - object with properties
|
266 | * @param {number} count - number of properties to return
|
267 | */
|
268 | function getPropertyNames(obj, count) {
|
269 | // decide which batch numbers to deal with
|
270 | var batchestofetch = [];
|
271 | var j = 0;
|
272 | for (var i in obj) {
|
273 | batchestofetch.push(parseInt(i));
|
274 | j++;
|
275 | if (j >= count) break;
|
276 | }
|
277 | return batchestofetch;
|
278 | }
|