UNPKG

11.6 kBJavaScriptView Raw
1// Copyright © 2017, 2018 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16/**
17 * CouchBackup module.
18 * @module couchbackup
19 * @see module:couchbackup
20 */
21
22const restoreInternal = require('./includes/restore.js');
23const backupShallow = require('./includes/shallowbackup.js');
24const backupFull = require('./includes/backup.js');
25const defaults = require('./includes/config.js').apiDefaults();
26const events = require('events');
27const debug = require('debug')('couchbackup:app');
28const error = require('./includes/error.js');
29const fs = require('fs');
30const legacyUrl = require('url');
31
32/**
33 * Test for a positive, safe integer.
34 *
35 * @param {object} x - Object under test.
36 */
37function isSafePositiveInteger(x) {
38 // https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
39 const MAX_SAFE_INTEGER = Number.MAX_SAFE_INTEGER || 9007199254740991;
40 // Is it a number?
41 return Object.prototype.toString.call(x) === '[object Number]' &&
42 // Is it an integer?
43 x % 1 === 0 &&
44 // Is it positive?
45 x > 0 &&
46 // Is it less than the maximum safe integer?
47 x <= MAX_SAFE_INTEGER;
48}
49
50/**
51 * Validate arguments.
52 *
53 * @param {object} url - URL of database.
54 * @param {object} opts - Options.
55 * @param {function} cb - Callback to be called on error.
56 */
57function validateArgs(url, opts, cb) {
58 if (typeof url !== 'string') {
59 cb(new error.BackupError('InvalidOption', 'Invalid URL, must be type string'), null);
60 return;
61 }
62 if (opts && typeof opts.bufferSize !== 'undefined' && !isSafePositiveInteger(opts.bufferSize)) {
63 cb(new error.BackupError('InvalidOption', 'Invalid buffer size option, must be a positive integer in the range (0, MAX_SAFE_INTEGER]'), null);
64 return;
65 }
66 if (opts && typeof opts.iamApiKey !== 'undefined' && typeof opts.iamApiKey !== 'string') {
67 cb(new error.BackupError('InvalidOption', 'Invalid iamApiKey option, must be type string'), null);
68 return;
69 }
70 if (opts && typeof opts.log !== 'undefined' && typeof opts.log !== 'string') {
71 cb(new error.BackupError('InvalidOption', 'Invalid log option, must be type string'), null);
72 return;
73 }
74 if (opts && typeof opts.mode !== 'undefined' && ['full', 'shallow'].indexOf(opts.mode) === -1) {
75 cb(new error.BackupError('InvalidOption', 'Invalid mode option, must be either "full" or "shallow"'), null);
76 return;
77 }
78 if (opts && typeof opts.output !== 'undefined' && typeof opts.output !== 'string') {
79 cb(new error.BackupError('InvalidOption', 'Invalid output option, must be type string'), null);
80 return;
81 }
82 if (opts && typeof opts.parallelism !== 'undefined' && !isSafePositiveInteger(opts.parallelism)) {
83 cb(new error.BackupError('InvalidOption', 'Invalid parallelism option, must be a positive integer in the range (0, MAX_SAFE_INTEGER]'), null);
84 return;
85 }
86 if (opts && typeof opts.resume !== 'undefined' && typeof opts.resume !== 'boolean') {
87 cb(new error.BackupError('InvalidOption', 'Invalid resume option, must be type boolean'), null);
88 return;
89 }
90
91 // Validate URL and ensure no auth if using key
92 try {
93 const urlObject = legacyUrl.parse(url);
94 // We require a protocol, host and path (for db), fail if any is missing.
95 if (urlObject.protocol !== 'https:' && urlObject.protocol !== 'http:') {
96 cb(new error.BackupError('InvalidOption', 'Invalid URL protocol.'));
97 return;
98 }
99 if (!urlObject.host) {
100 cb(new error.BackupError('InvalidOption', 'Invalid URL host.'));
101 return;
102 }
103 if (!urlObject.path) {
104 cb(new error.BackupError('InvalidOption', 'Invalid URL, missing path element (no database).'));
105 return;
106 }
107 if (opts && opts.iamApiKey && urlObject.auth) {
108 cb(new error.BackupError('InvalidOption', 'URL user information must not be supplied when using IAM API key.'));
109 return;
110 }
111 } catch (err) {
112 cb(err);
113 return;
114 }
115
116 if (opts && opts.resume) {
117 if (!opts.log) {
118 // This is the second place we check for the presence of the log option in conjunction with resume
119 // It has to be here for the API case
120 cb(new error.BackupError('NoLogFileName', 'To resume a backup, a log file must be specified'), null);
121 return;
122 } else if (!fs.existsSync(opts.log)) {
123 cb(new error.BackupError('LogDoesNotExist', 'To resume a backup, the log file must exist'), null);
124 return;
125 }
126 }
127 return true;
128}
129
130module.exports = {
131
132 /**
133 * Backup a Cloudant database to a stream.
134 *
135 * @param {string} srcUrl - URL of database to backup.
136 * @param {stream.Writable} targetStream - Stream to write content to.
137 * @param {object} opts - Backup options.
138 * @param {number} [opts.parallelism=5] - Number of parallel HTTP requests to use.
139 * @param {number} [opts.bufferSize=500] - Number of documents per batch request.
140 * @param {string} [opts.iamApiKey] - IAM API key to use to access Cloudant database.
141 * @param {string} [opts.log] - Log file name. Default uses a temporary file.
142 * @param {boolean} [opts.resume] - Whether to resume from existing log.
143 * @param {string} [opts.mode=full] - Use `full` or `shallow` mode.
144 * @param {backupRestoreCallback} callback - Called on completion.
145 */
146 backup: function(srcUrl, targetStream, opts, callback) {
147 if (typeof callback === 'undefined' && typeof opts === 'function') {
148 callback = opts;
149 opts = {};
150 }
151 if (!validateArgs(srcUrl, opts, callback)) {
152 // bad args, bail
153 return;
154 }
155 opts = Object.assign({}, defaults, opts);
156
157 var backup = null;
158 if (opts.mode === 'shallow') {
159 backup = backupShallow;
160 } else { // full mode
161 backup = backupFull;
162 }
163
164 const ee = new events.EventEmitter();
165
166 // if there is an error writing to the stream, call the completion
167 // callback with the error set
168 targetStream.on('error', function(err) {
169 debug('Error ' + JSON.stringify(err));
170 if (callback) callback(err);
171 });
172
173 // If resuming write a newline as it's possible one would be missing from
174 // an interruption of the previous backup. If the backup was clean this
175 // will cause an empty line that will be gracefully handled by the restore.
176 if (opts.resume) {
177 targetStream.write('\n');
178 }
179
180 // Get the event emitter from the backup process so we can handle events
181 // before passing them on to the app's event emitter if needed.
182 const internalEE = backup(srcUrl, opts)
183 .on('changes', function(batch) {
184 ee.emit('changes', batch);
185 }).on('received', function(obj, q, logCompletedBatch) {
186 debug(' backed up batch', obj.batch, ' docs: ', obj.total, 'Time', obj.time);
187 // Callback to emit the written event when the content is flushed
188 function writeFlushed() {
189 ee.emit('written', {total: obj.total, time: obj.time, batch: obj.batch});
190 if (logCompletedBatch) {
191 logCompletedBatch(obj.batch);
192 }
193 }
194 // Write the received content to the targetStream
195 const continueWriting = targetStream.write(JSON.stringify(obj.data) + '\n',
196 'utf8',
197 writeFlushed);
198 if (!continueWriting) {
199 // The buffer was full, pause the queue to stop the writes until we
200 // get a drain event
201 if (q && !q.isPaused) {
202 q.pause();
203 targetStream.once('drain', function() {
204 q.resume();
205 });
206 }
207 }
208 })
209 // For errors we expect, may or may not be fatal
210 .on('error', function(err) {
211 debug('Error ' + JSON.stringify(err));
212 // These are fatal errors
213 // We only want to callback once for a fatal error
214 // even though other errors may be received,
215 // so deregister the listeners now
216 internalEE.removeAllListeners();
217 callback(err);
218 })
219 .on('finished', function(obj) {
220 function emitFinished() {
221 debug('Backup complete - written ' + JSON.stringify(obj));
222 const summary = {total: obj.total};
223 ee.emit('finished', summary);
224 if (callback) callback(null, summary);
225 }
226 if (targetStream === process.stdout) {
227 // stdout cannot emit a finish event so use a final write + callback
228 targetStream.write('', 'utf8', emitFinished);
229 } else {
230 // If we're writing to a file, end the writes and register the
231 // emitFinished function for a callback when the file stream's finish
232 // event is emitted.
233 targetStream.end('', 'utf8', emitFinished);
234 }
235 });
236
237 return ee;
238 },
239
240 /**
241 * Restore a backup from a stream.
242 *
243 * @param {stream.Readable} srcStream - Stream containing backed up data.
244 * @param {string} targetUrl - Target database.
245 * @param {object} opts - Restore options.
246 * @param {number} opts.parallelism - Number of parallel HTTP requests to use. Default 5.
247 * @param {number} opts.bufferSize - Number of documents per batch request. Default 500.
248 * @param {string} opts.iamApiKey - IAM API key to use to access Cloudant database.
249 * @param {backupRestoreCallback} callback - Called on completion.
250 */
251 restore: function(srcStream, targetUrl, opts, callback) {
252 if (typeof callback === 'undefined' && typeof opts === 'function') {
253 callback = opts;
254 opts = {};
255 }
256 validateArgs(targetUrl, opts, callback);
257 opts = Object.assign({}, defaults, opts);
258
259 const ee = new events.EventEmitter();
260
261 restoreInternal(
262 targetUrl,
263 opts,
264 srcStream,
265 ee,
266 function(err, writer) {
267 if (err) {
268 callback(err, null);
269 return;
270 }
271 if (writer != null) {
272 writer.on('restored', function(obj) {
273 debug(' restored ', obj.total);
274 ee.emit('restored', {documents: obj.documents, total: obj.total});
275 })
276 // For errors we expect, may or may not be fatal
277 .on('error', function(err) {
278 debug('Error ' + JSON.stringify(err));
279 // These are fatal errors
280 // We only want to callback once for a fatal error
281 // even though other errors may be received,
282 // so deregister listeners now
283 writer.removeAllListeners();
284 // Only call destroy if it is available on the stream
285 if (srcStream.destroy && srcStream.destroy instanceof Function) {
286 srcStream.destroy();
287 }
288 callback(err);
289 })
290 .on('finished', function(obj) {
291 debug('restore complete');
292 ee.emit('finished', {total: obj.total});
293 callback(null, obj);
294 });
295 }
296 }
297 );
298
299 return ee;
300 }
301
302};
303
304/**
305 * Backup/restore callback
306 * @callback backupRestoreCallback
307 * @param {Error} err - Error object if operation failed.
308 * @param {object} data - summary data for backup/restore
309 */