UNPKG

4.03 kBJavaScriptView Raw
1// Copyright © 2017, 2021 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const fs = require('fs');
17const liner = require('./liner.js');
18const change = require('./change.js');
19const error = require('./error.js');
20const debug = require('debug')('couchbackup:spoolchanges');
21
22/**
23 * Write log file for all changes from a database, ready for downloading
24 * in batches.
25 *
26 * @param {string} dbUrl - URL of database
27 * @param {string} log - path to log file to use
28 * @param {number} bufferSize - the number of changes per batch/log line
29 * @param {function(err)} callback - a callback to run on completion
30 */
31module.exports = function(db, log, bufferSize, ee, callback) {
32 // list of document ids to process
33 const buffer = [];
34 let batch = 0;
35 let lastSeq = null;
36 const logStream = fs.createWriteStream(log);
37 let pending = 0;
38 // The number of changes to fetch per request
39 const limit = 100000;
40
41 // send documents ids to the queue in batches of bufferSize + the last batch
42 const processBuffer = function(lastOne) {
43 if (buffer.length >= bufferSize || (lastOne && buffer.length > 0)) {
44 debug('writing', buffer.length, 'changes to the backup file');
45 const b = { docs: buffer.splice(0, bufferSize), batch: batch };
46 logStream.write(':t batch' + batch + ' ' + JSON.stringify(b.docs) + '\n');
47 ee.emit('changes', batch);
48 batch++;
49 }
50 };
51
52 // called once per received change
53 const onChange = function(c) {
54 if (c) {
55 if (c.error) {
56 ee.emit('error', new error.BackupError('InvalidChange', `Received invalid change: ${c}`));
57 } else if (c.changes) {
58 const obj = { id: c.id };
59 buffer.push(obj);
60 processBuffer(false);
61 } else if (c.last_seq) {
62 lastSeq = c.last_seq;
63 pending = c.pending;
64 }
65 }
66 };
67
68 function getChanges(since = 0) {
69 debug('making changes request since ' + since);
70 return db.service.postChangesAsStream({ db: db.db, since: since, limit: limit, seq_interval: limit })
71 .then(response => {
72 response.result.pipe(liner())
73 .on('error', function(err) {
74 logStream.end();
75 callback(err);
76 })
77 .pipe(change(onChange))
78 .on('error', function(err) {
79 logStream.end();
80 callback(err);
81 })
82 .on('finish', function() {
83 processBuffer(true);
84 if (!lastSeq) {
85 logStream.end();
86 debug('changes request terminated before last_seq was sent');
87 callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
88 } else {
89 debug(`changes request completed with last_seq: ${lastSeq} and ${pending} changes pending.`);
90 if (pending > 0) {
91 // Return the next promise
92 return getChanges(lastSeq);
93 } else {
94 debug('finished streaming database changes');
95 logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
96 }
97 }
98 });
99 })
100 .catch(err => {
101 logStream.end();
102 if (err.status && err.status >= 400) {
103 callback(error.convertResponseError(err));
104 } else if (err.name !== 'SpoolChangesError') {
105 callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
106 }
107 });
108 }
109
110 getChanges();
111};