UNPKG

9.55 kBJavaScriptView Raw
1// Copyright © 2017, 2019 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const events = require('events');
18const fs = require('fs');
19const error = require('./error.js');
20const spoolchanges = require('./spoolchanges.js');
21const logfilesummary = require('./logfilesummary.js');
22const logfilegetbatches = require('./logfilegetbatches.js');
23
24/**
25 * Read documents from a database to be backed up.
26 *
27 * @param {string} db - `@cloudant/cloudant` DB object for source database.
28 * @param {number} blocksize - number of documents to download in single request
29 * @param {number} parallelism - number of concurrent downloads
30 * @param {string} log - path to log file to use
31 * @param {boolean} resume - whether to resume from an existing log file
32 * @returns EventEmitter with following events:
33 * - `received` - called with a block of documents to write to backup
34 * - `error` - on error
35 * - `finished` - when backup process is finished (either complete or errored)
36 */
37module.exports = function(db, options) {
38 const ee = new events.EventEmitter();
39 const start = new Date().getTime(); // backup start time
40 const batchesPerDownloadSession = 50; // max batches to read from log file for download at a time (prevent OOM)
41
42 function proceedWithBackup() {
43 if (options.resume) {
44 // pick up from existing log file from previous run
45 downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
46 } else {
47 // create new log file and process
48 spoolchanges(db, options.log, options.bufferSize, ee, function(err) {
49 if (err) {
50 ee.emit('error', err);
51 } else {
52 downloadRemainingBatches(options.log, db, ee, start, batchesPerDownloadSession, options.parallelism);
53 }
54 });
55 }
56 }
57
58 validateBulkGetSupport(db, function(err) {
59 if (err) {
60 return ee.emit('error', err);
61 } else {
62 proceedWithBackup();
63 }
64 });
65
66 return ee;
67};
68
69/**
70 * Validate /_bulk_get support for a specified database.
71 *
72 * @param {string} db - nodejs-cloudant db
73 * @param {function} callback - called on completion with signature (err)
74 */
75function validateBulkGetSupport(db, callback) {
76 db.head('_bulk_get',
77 function(err) {
78 err = error.convertResponseError(err, function(err) {
79 switch (err.statusCode) {
80 case undefined:
81 // There was no status code on the error
82 return err;
83 case 404:
84 return new error.BackupError('BulkGetError', 'Database does not support /_bulk_get endpoint');
85 case 405:
86 // => supports /_bulk_get endpoint
87 return;
88 default:
89 return new error.HTTPError(err);
90 }
91 });
92 callback(err);
93 });
94}
95
96/**
97 * Download remaining batches in a log file, splitting batches into sets
98 * to avoid enqueueing too many in one go.
99 *
100 * @param {string} log - log file name to maintain download state
101 * @param {string} db - nodejs-cloudant db
102 * @param {events.EventEmitter} ee - event emitter to emit received events on
103 * @param {time} startTime - start time for backup process
104 * @param {number} batchesPerDownloadSession - max batches to enqueue for
105 * download at a time. As batches contain many doc IDs, this helps avoid
106 * exhausting memory.
107 * @param {number} parallelism - number of concurrent downloads
108 * @returns function to call do download remaining batches with signature
109 * (err, {batches: batch, docs: doccount}) {@see spoolchanges}.
110 */
111function downloadRemainingBatches(log, db, ee, startTime, batchesPerDownloadSession, parallelism) {
112 var total = 0; // running total of documents downloaded so far
113 var noRemainingBatches = false;
114
115 // Generate a set of batches (up to batchesPerDownloadSession) to download from the
116 // log file and download them. Set noRemainingBatches to `true` for last batch.
117 function downloadSingleBatchSet(done) {
118 // Fetch the doc IDs for the batches in the current set to
119 // download them.
120 function batchSetComplete(err, data) {
121 total = data.total;
122 done();
123 }
124 function processRetrievedBatches(err, batches) {
125 // process them in parallelised queue
126 processBatchSet(db, parallelism, log, batches, ee, startTime, total, batchSetComplete);
127 }
128
129 readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, function(err, batchSetIds) {
130 if (err) {
131 ee.emit('error', err);
132 // Stop processing changes file for fatal errors
133 noRemainingBatches = true;
134 done();
135 } else {
136 if (batchSetIds.length === 0) {
137 noRemainingBatches = true;
138 return done();
139 }
140 logfilegetbatches(log, batchSetIds, processRetrievedBatches);
141 }
142 });
143 }
144
145 // Return true if all batches in log file have been downloaded
146 function isFinished(callback) { callback(null, noRemainingBatches); }
147
148 function onComplete() {
149 ee.emit('finished', { total: total });
150 }
151
152 async.doUntil(downloadSingleBatchSet, isFinished, onComplete);
153}
154
155/**
156 * Return a set of uncompleted download batch IDs from the log file.
157 *
158 * @param {string} log - log file path
159 * @param {number} batchesPerDownloadSession - maximum IDs to return
160 * @param {function} callback - sign (err, batchSetIds array)
161 */
162function readBatchSetIdsFromLogFile(log, batchesPerDownloadSession, callback) {
163 logfilesummary(log, function processSummary(err, summary) {
164 if (!summary.changesComplete) {
165 callback(new error.BackupError('IncompleteChangesInLogFile',
166 'WARNING: Changes did not finish spooling'));
167 return;
168 }
169 if (Object.keys(summary.batches).length === 0) {
170 return callback(null, []);
171 }
172
173 // batch IDs are the property names of summary.batches
174 var batchSetIds = getPropertyNames(summary.batches, batchesPerDownloadSession);
175 callback(null, batchSetIds);
176 });
177}
178
179/**
180 * Download a set of batches retrieved from a log file. When a download is
181 * complete, add a line to the logfile indicating such.
182 *
183 * @param {any} db - nodejs-cloudant database
184 * @param {any} parallelism - number of concurrent requests to make
185 * @param {any} log - log file to drive downloads from
186 * @param {any} batches - batches to download
187 * @param {any} ee - event emitter for progress. This funciton emits
188 * received and error events.
189 * @param {any} start - time backup started, to report deltas
190 * @param {any} grandtotal - count of documents downloaded prior to this set
191 * of batches
192 * @param {any} callback - completion callback, (err, {total: number}).
193 */
194function processBatchSet(db, parallelism, log, batches, ee, start, grandtotal, callback) {
195 var hasErrored = false;
196 var total = grandtotal;
197
198 // queue to process the fetch requests in an orderly fashion using _bulk_get
199 var q = async.queue(function(payload, done) {
200 var output = [];
201 var thisBatch = payload.batch;
202 delete payload.batch;
203 delete payload.command;
204
205 function logCompletedBatch(batch) {
206 if (log) {
207 fs.appendFile(log, ':d batch' + thisBatch + '\n', done);
208 } else {
209 done();
210 }
211 }
212
213 // do the /db/_bulk_get request
214 // Note: this should use built-in _bulk_get, but revs is not accepted as
215 // part of the request body by the server yet. Working around using request
216 // method to POST with a query string.
217 db.server.request(
218 { method: 'POST', db: db.config.db, path: '_bulk_get', qs: { revs: true }, body: payload },
219 function(err, body) {
220 if (err) {
221 if (!hasErrored) {
222 hasErrored = true;
223 err = error.convertResponseError(err);
224 // Kill the queue for fatal errors
225 q.kill();
226 ee.emit('error', err);
227 }
228 done();
229 } else {
230 // create an output array with the docs returned
231 body.results.forEach(function(d) {
232 if (d.docs) {
233 d.docs.forEach(function(doc) {
234 if (doc.ok) {
235 output.push(doc.ok);
236 }
237 });
238 }
239 });
240 total += output.length;
241 var t = (new Date().getTime() - start) / 1000;
242 ee.emit('received', {
243 batch: thisBatch,
244 data: output,
245 length: output.length,
246 time: t,
247 total: total
248 }, q, logCompletedBatch);
249 }
250 });
251 }, parallelism);
252
253 for (var i in batches) {
254 q.push(batches[i]);
255 }
256
257 q.drain(function() {
258 callback(null, { total: total });
259 });
260}
261
262/**
263 * Returns first N properties on an object.
264 *
265 * @param {object} obj - object with properties
266 * @param {number} count - number of properties to return
267 */
268function getPropertyNames(obj, count) {
269 // decide which batch numbers to deal with
270 var batchestofetch = [];
271 var j = 0;
272 for (var i in obj) {
273 batchestofetch.push(parseInt(i));
274 j++;
275 if (j >= count) break;
276 }
277 return batchestofetch;
278}