UNPKG

9.33 kBJavaScriptView Raw
1// Copyright © 2017, 2021 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const events = require('events');
18const fs = require('fs');
19const error = require('./error.js');
20const spoolchanges = require('./spoolchanges.js');
21const logfilesummary = require('./logfilesummary.js');
22const logfilegetbatches = require('./logfilegetbatches.js');
23
24/**
25 * Read documents from a database to be backed up.
26 *
27 * @param {string} db - `@cloudant/cloudant` DB object for source database.
28 * @param {number} blocksize - number of documents to download in single request
29 * @param {number} parallelism - number of concurrent downloads
30 * @param {string} log - path to log file to use
31 * @param {boolean} resume - whether to resume from an existing log file
32 * @returns EventEmitter with following events:
33 * - `received` - called with a block of documents to write to backup
34 * - `error` - on error
35 * - `finished` - when backup process is finished (either complete or errored)
36 */
37module.exports = function(db, options) {
38 const ee = new events.EventEmitter();
39 const start = new Date().getTime(); // backup start time
40 const batchesPerDownloadSession = 50; // max batches to read from log file for download at a time (prevent OOM)
41
42 function proceedWithBackup() {
43 if (options.resume) {
44 // pick up from existing log file from previous run
45 downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
46 } else {
47 // create new log file and process
48 spoolchanges(db, options.log, options.bufferSize, ee, function(err) {
49 if (err) {
50 ee.emit('error', err);
51 } else {
52 downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
53 }
54 });
55 }
56 }
57
58 validateBulkGetSupport(db, function(err) {
59 if (err) {
60 return ee.emit('error', err);
61 } else {
62 proceedWithBackup();
63 }
64 });
65
66 return ee;
67};
68
69/**
70 * Validate /_bulk_get support for a specified database.
71 *
72 * @param {string} db - nodejs-cloudant db
73 * @param {function} callback - called on completion with signature (err)
74 */
75function validateBulkGetSupport(db, callback) {
76 db.service.postBulkGet({ db: db.db, docs: [] }).then(() => { callback(); }).catch(err => {
77 err = error.convertResponseError(err, function(err) {
78 switch (err.status) {
79 case undefined:
80 // There was no status code on the error
81 return err;
82 case 404:
83 return new error.BackupError('BulkGetError', 'Database does not support /_bulk_get endpoint');
84 default:
85 return new error.HTTPError(err);
86 }
87 });
88 callback(err);
89 });
90}
91
92/**
93 * Download remaining batches in a log file, splitting batches into sets
94 * to avoid enqueueing too many in one go.
95 *
96 * @param {string} log - log file name to maintain download state
97 * @param {string} db - nodejs-cloudant db
98 * @param {events.EventEmitter} ee - event emitter to emit received events on
99 * @param {time} startTime - start time for backup process
100 * @param {number} batchesPerDownloadSession - max batches to enqueue for
101 * download at a time. As batches contain many doc IDs, this helps avoid
102 * exhausting memory.
103 * @param {number} parallelism - number of concurrent downloads
104 * @returns function to call do download remaining batches with signature
105 * (err, {batches: batch, docs: doccount}) {@see spoolchanges}.
106 */
107function downloadRemainingBatches(log, db, ee, startTime, batchesPerDownloadSession, parallelism) {
108 let total = 0; // running total of documents downloaded so far
109 let noRemainingBatches = false;
110
111 // Generate a set of batches (up to batchesPerDownloadSession) to download from the
112 // log file and download them. Set noRemainingBatches to `true` for last batch.
113 function downloadSingleBatchSet(done) {
114 // Fetch the doc IDs for the batches in the current set to
115 // download them.
116 function batchSetComplete(err, data) {
117 if (!err) {
118 total = data.total;
119 }
120 done(err);
121 }
122 function processRetrievedBatches(err, batches) {
123 if (!err) {
124 // process them in parallelised queue
125 processBatchSet(db, parallelism, log, batches, ee, startTime, total, batchSetComplete);
126 } else {
127 batchSetComplete(err);
128 }
129 }
130
131 readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, function(err, batchSetIds) {
132 if (err) {
133 ee.emit('error', err);
134 // Stop processing changes file for fatal errors
135 noRemainingBatches = true;
136 done();
137 } else {
138 if (batchSetIds.length === 0) {
139 noRemainingBatches = true;
140 return done();
141 }
142 logfilegetbatches(log, batchSetIds, processRetrievedBatches);
143 }
144 });
145 }
146
147 // Return true if all batches in log file have been downloaded
148 function isFinished(callback) { callback(null, noRemainingBatches); }
149
150 function onComplete() {
151 ee.emit('finished', { total: total });
152 }
153
154 async.doUntil(downloadSingleBatchSet, isFinished, onComplete);
155}
156
157/**
158 * Return a set of uncompleted download batch IDs from the log file.
159 *
160 * @param {string} log - log file path
161 * @param {number} batchesPerDownloadSession - maximum IDs to return
162 * @param {function} callback - sign (err, batchSetIds array)
163 */
164function readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, callback) {
165 logfilesummary(log, function processSummary(err, summary) {
166 if (!err) {
167 if (!summary.changesComplete) {
168 callback(new error.BackupError('IncompleteChangesInLogFile',
169 'WARNING: Changes did not finish spooling'));
170 return;
171 }
172 if (Object.keys(summary.batches).length === 0) {
173 return callback(null, []);
174 }
175
176 // batch IDs are the property names of summary.batches
177 const batchSetIds = getPropertyNames(summary.batches, batchesPerDownloadSession);
178 callback(null, batchSetIds);
179 } else {
180 callback(err);
181 }
182 });
183}
184
185/**
186 * Download a set of batches retrieved from a log file. When a download is
187 * complete, add a line to the logfile indicating such.
188 *
189 * @param {any} db - nodejs-cloudant database
190 * @param {any} parallelism - number of concurrent requests to make
191 * @param {any} log - log file to drive downloads from
192 * @param {any} batches - batches to download
193 * @param {any} ee - event emitter for progress. This funciton emits
194 * received and error events.
195 * @param {any} start - time backup started, to report deltas
196 * @param {any} grandtotal - count of documents downloaded prior to this set
197 * of batches
198 * @param {any} callback - completion callback, (err, {total: number}).
199 */
200function processBatchSet(db, parallelism, log, batches, ee, start, grandtotal, callback) {
201 let hasErrored = false;
202 let total = grandtotal;
203
204 // queue to process the fetch requests in an orderly fashion using _bulk_get
205 const q = async.queue(function(payload, done) {
206 const output = [];
207 const thisBatch = payload.batch;
208 delete payload.batch;
209 delete payload.command;
210
211 function logCompletedBatch(batch) {
212 if (log) {
213 fs.appendFile(log, ':d batch' + thisBatch + '\n', done);
214 } else {
215 done();
216 }
217 }
218
219 // do the /db/_bulk_get request
220 db.service.postBulkGet({
221 db: db.db,
222 revs: true,
223 docs: payload.docs
224 }).then(response => {
225 // create an output array with the docs returned
226 response.result.results.forEach(function(d) {
227 if (d.docs) {
228 d.docs.forEach(function(doc) {
229 if (doc.ok) {
230 output.push(doc.ok);
231 }
232 });
233 }
234 });
235 total += output.length;
236 const t = (new Date().getTime() - start) / 1000;
237 ee.emit('received', {
238 batch: thisBatch,
239 data: output,
240 length: output.length,
241 time: t,
242 total: total
243 }, q, logCompletedBatch);
244 }).catch(err => {
245 if (!hasErrored) {
246 hasErrored = true;
247 err = error.convertResponseError(err);
248 // Kill the queue for fatal errors
249 q.kill();
250 ee.emit('error', err);
251 }
252 done();
253 });
254 }, parallelism);
255
256 for (const i in batches) {
257 q.push(batches[i]);
258 }
259
260 q.drain(function() {
261 callback(null, { total: total });
262 });
263}
264
265/**
266 * Returns first N properties on an object.
267 *
268 * @param {object} obj - object with properties
269 * @param {number} count - number of properties to return
270 */
271function getPropertyNames(obj, count) {
272 // decide which batch numbers to deal with
273 const batchestofetch = [];
274 let j = 0;
275 for (const i in obj) {
276 batchestofetch.push(parseInt(i));
277 j++;
278 if (j >= count) break;
279 }
280 return batchestofetch;
281}