1 | // Copyright © 2017, 2021 IBM Corp. All rights reserved.
|
2 | //
|
3 | // Licensed under the Apache License, Version 2.0 (the "License");
|
4 | // you may not use this file except in compliance with the License.
|
5 | // You may obtain a copy of the License at
|
6 | //
|
7 | // http://www.apache.org/licenses/LICENSE-2.0
|
8 | //
|
9 | // Unless required by applicable law or agreed to in writing, software
|
10 | // distributed under the License is distributed on an "AS IS" BASIS,
|
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | // See the License for the specific language governing permissions and
|
13 | // limitations under the License.
|
14 | ;
|
15 |
|
16 | const async = require('async');
|
17 | const events = require('events');
|
18 | const fs = require('fs');
|
19 | const error = require('./error.js');
|
20 | const spoolchanges = require('./spoolchanges.js');
|
21 | const logfilesummary = require('./logfilesummary.js');
|
22 | const logfilegetbatches = require('./logfilegetbatches.js');
|
23 |
|
24 | /**
|
25 | * Read documents from a database to be backed up.
|
26 | *
|
27 | * @param {string} db - `@cloudant/cloudant` DB object for source database.
|
28 | * @param {number} blocksize - number of documents to download in single request
|
29 | * @param {number} parallelism - number of concurrent downloads
|
30 | * @param {string} log - path to log file to use
|
31 | * @param {boolean} resume - whether to resume from an existing log file
|
32 | * @returns EventEmitter with following events:
|
33 | * - `received` - called with a block of documents to write to backup
|
34 | * - `error` - on error
|
35 | * - `finished` - when backup process is finished (either complete or errored)
|
36 | */
|
37 | module.exports = function(db, options) {
|
38 | const ee = new events.EventEmitter();
|
39 | const start = new Date().getTime(); // backup start time
|
40 | const batchesPerDownloadSession = 50; // max batches to read from log file for download at a time (prevent OOM)
|
41 |
|
42 | function proceedWithBackup() {
|
43 | if (options.resume) {
|
44 | // pick up from existing log file from previous run
|
45 | downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
|
46 | } else {
|
47 | // create new log file and process
|
48 | spoolchanges(db, options.log, options.bufferSize, ee, function(err) {
|
49 | if (err) {
|
50 | ee.emit('error', err);
|
51 | } else {
|
52 | downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
|
53 | }
|
54 | });
|
55 | }
|
56 | }
|
57 |
|
58 | validateBulkGetSupport(db, function(err) {
|
59 | if (err) {
|
60 | return ee.emit('error', err);
|
61 | } else {
|
62 | proceedWithBackup();
|
63 | }
|
64 | });
|
65 |
|
66 | return ee;
|
67 | };
|
68 |
|
69 | /**
|
70 | * Validate /_bulk_get support for a specified database.
|
71 | *
|
72 | * @param {string} db - nodejs-cloudant db
|
73 | * @param {function} callback - called on completion with signature (err)
|
74 | */
|
75 | function validateBulkGetSupport(db, callback) {
|
76 | db.service.postBulkGet({ db: db.db, docs: [] }).then(() => { callback(); }).catch(err => {
|
77 | err = error.convertResponseError(err, function(err) {
|
78 | switch (err.status) {
|
79 | case undefined:
|
80 | // There was no status code on the error
|
81 | return err;
|
82 | case 404:
|
83 | return new error.BackupError('BulkGetError', 'Database does not support /_bulk_get endpoint');
|
84 | default:
|
85 | return new error.HTTPError(err);
|
86 | }
|
87 | });
|
88 | callback(err);
|
89 | });
|
90 | }
|
91 |
|
92 | /**
|
93 | * Download remaining batches in a log file, splitting batches into sets
|
94 | * to avoid enqueueing too many in one go.
|
95 | *
|
96 | * @param {string} log - log file name to maintain download state
|
97 | * @param {string} db - nodejs-cloudant db
|
98 | * @param {events.EventEmitter} ee - event emitter to emit received events on
|
99 | * @param {time} startTime - start time for backup process
|
100 | * @param {number} batchesPerDownloadSession - max batches to enqueue for
|
101 | * download at a time. As batches contain many doc IDs, this helps avoid
|
102 | * exhausting memory.
|
103 | * @param {number} parallelism - number of concurrent downloads
|
104 | * @returns function to call do download remaining batches with signature
|
105 | * (err, {batches: batch, docs: doccount}) {@see spoolchanges}.
|
106 | */
|
107 | function downloadRemainingBatches(log, db, ee, startTime, batchesPerDownloadSession, parallelism) {
|
108 | let total = 0; // running total of documents downloaded so far
|
109 | let noRemainingBatches = false;
|
110 |
|
111 | // Generate a set of batches (up to batchesPerDownloadSession) to download from the
|
112 | // log file and download them. Set noRemainingBatches to `true` for last batch.
|
113 | function downloadSingleBatchSet(done) {
|
114 | // Fetch the doc IDs for the batches in the current set to
|
115 | // download them.
|
116 | function batchSetComplete(err, data) {
|
117 | if (!err) {
|
118 | total = data.total;
|
119 | }
|
120 | done(err);
|
121 | }
|
122 | function processRetrievedBatches(err, batches) {
|
123 | if (!err) {
|
124 | // process them in parallelised queue
|
125 | processBatchSet(db, parallelism, log, batches, ee, startTime, total, batchSetComplete);
|
126 | } else {
|
127 | batchSetComplete(err);
|
128 | }
|
129 | }
|
130 |
|
131 | readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, function(err, batchSetIds) {
|
132 | if (err) {
|
133 | ee.emit('error', err);
|
134 | // Stop processing changes file for fatal errors
|
135 | noRemainingBatches = true;
|
136 | done();
|
137 | } else {
|
138 | if (batchSetIds.length === 0) {
|
139 | noRemainingBatches = true;
|
140 | return done();
|
141 | }
|
142 | logfilegetbatches(log, batchSetIds, processRetrievedBatches);
|
143 | }
|
144 | });
|
145 | }
|
146 |
|
147 | // Return true if all batches in log file have been downloaded
|
148 | function isFinished(callback) { callback(null, noRemainingBatches); }
|
149 |
|
150 | function onComplete() {
|
151 | ee.emit('finished', { total: total });
|
152 | }
|
153 |
|
154 | async.doUntil(downloadSingleBatchSet, isFinished, onComplete);
|
155 | }
|
156 |
|
157 | /**
|
158 | * Return a set of uncompleted download batch IDs from the log file.
|
159 | *
|
160 | * @param {string} log - log file path
|
161 | * @param {number} batchesPerDownloadSession - maximum IDs to return
|
162 | * @param {function} callback - sign (err, batchSetIds array)
|
163 | */
|
164 | function readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, callback) {
|
165 | logfilesummary(log, function processSummary(err, summary) {
|
166 | if (!err) {
|
167 | if (!summary.changesComplete) {
|
168 | callback(new error.BackupError('IncompleteChangesInLogFile',
|
169 | 'WARNING: Changes did not finish spooling'));
|
170 | return;
|
171 | }
|
172 | if (Object.keys(summary.batches).length === 0) {
|
173 | return callback(null, []);
|
174 | }
|
175 |
|
176 | // batch IDs are the property names of summary.batches
|
177 | const batchSetIds = getPropertyNames(summary.batches, batchesPerDownloadSession);
|
178 | callback(null, batchSetIds);
|
179 | } else {
|
180 | callback(err);
|
181 | }
|
182 | });
|
183 | }
|
184 |
|
185 | /**
|
186 | * Download a set of batches retrieved from a log file. When a download is
|
187 | * complete, add a line to the logfile indicating such.
|
188 | *
|
189 | * @param {any} db - nodejs-cloudant database
|
190 | * @param {any} parallelism - number of concurrent requests to make
|
191 | * @param {any} log - log file to drive downloads from
|
192 | * @param {any} batches - batches to download
|
193 | * @param {any} ee - event emitter for progress. This funciton emits
|
194 | * received and error events.
|
195 | * @param {any} start - time backup started, to report deltas
|
196 | * @param {any} grandtotal - count of documents downloaded prior to this set
|
197 | * of batches
|
198 | * @param {any} callback - completion callback, (err, {total: number}).
|
199 | */
|
200 | function processBatchSet(db, parallelism, log, batches, ee, start, grandtotal, callback) {
|
201 | let hasErrored = false;
|
202 | let total = grandtotal;
|
203 |
|
204 | // queue to process the fetch requests in an orderly fashion using _bulk_get
|
205 | const q = async.queue(function(payload, done) {
|
206 | const output = [];
|
207 | const thisBatch = payload.batch;
|
208 | delete payload.batch;
|
209 | delete payload.command;
|
210 |
|
211 | function logCompletedBatch(batch) {
|
212 | if (log) {
|
213 | fs.appendFile(log, ':d batch' + thisBatch + '\n', done);
|
214 | } else {
|
215 | done();
|
216 | }
|
217 | }
|
218 |
|
219 | // do the /db/_bulk_get request
|
220 | db.service.postBulkGet({
|
221 | db: db.db,
|
222 | revs: true,
|
223 | docs: payload.docs
|
224 | }).then(response => {
|
225 | // create an output array with the docs returned
|
226 | response.result.results.forEach(function(d) {
|
227 | if (d.docs) {
|
228 | d.docs.forEach(function(doc) {
|
229 | if (doc.ok) {
|
230 | output.push(doc.ok);
|
231 | }
|
232 | });
|
233 | }
|
234 | });
|
235 | total += output.length;
|
236 | const t = (new Date().getTime() - start) / 1000;
|
237 | ee.emit('received', {
|
238 | batch: thisBatch,
|
239 | data: output,
|
240 | length: output.length,
|
241 | time: t,
|
242 | total: total
|
243 | }, q, logCompletedBatch);
|
244 | }).catch(err => {
|
245 | if (!hasErrored) {
|
246 | hasErrored = true;
|
247 | err = error.convertResponseError(err);
|
248 | // Kill the queue for fatal errors
|
249 | q.kill();
|
250 | ee.emit('error', err);
|
251 | }
|
252 | done();
|
253 | });
|
254 | }, parallelism);
|
255 |
|
256 | for (const i in batches) {
|
257 | q.push(batches[i]);
|
258 | }
|
259 |
|
260 | q.drain(function() {
|
261 | callback(null, { total: total });
|
262 | });
|
263 | }
|
264 |
|
265 | /**
|
266 | * Returns first N properties on an object.
|
267 | *
|
268 | * @param {object} obj - object with properties
|
269 | * @param {number} count - number of properties to return
|
270 | */
|
271 | function getPropertyNames(obj, count) {
|
272 | // decide which batch numbers to deal with
|
273 | const batchestofetch = [];
|
274 | let j = 0;
|
275 | for (const i in obj) {
|
276 | batchestofetch.push(parseInt(i));
|
277 | j++;
|
278 | if (j >= count) break;
|
279 | }
|
280 | return batchestofetch;
|
281 | }
|