UNPKG

5.87 kBJavaScriptView Raw
1// Copyright © 2017, 2021 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const stream = require('stream');
18const error = require('./error.js');
19const debug = require('debug')('couchbackup:writer');
20
21module.exports = function(db, bufferSize, parallelism, ee) {
22 const writer = new stream.Transform({ objectMode: true });
23 let buffer = [];
24 let written = 0;
25 let linenumber = 0;
26
27 // this is the queue of chunks that are written to the database
28 // the queue's payload will be an array of documents to be written,
29 // the size of the array will be bufferSize. The variable parallelism
30 // determines how many HTTP requests will occur at any one time.
31 const q = async.queue(function(payload, cb) {
32 // if we are restoring known revisions, we need to supply new_edits=false
33 if (payload.docs && payload.docs[0] && payload.docs[0]._rev) {
34 payload.new_edits = false;
35 debug('Using new_edits false mode.');
36 }
37
38 if (!didError) {
39 db.service.postBulkDocs({
40 db: db.db,
41 bulkDocs: payload
42 }).then(response => {
43 if (!response.result || (payload.new_edits === false && response.result.length > 0)) {
44 throw new Error(`Error writing batch with new_edits:${payload.new_edits !== false}` +
45 ` and ${response.result ? response.result.length : 'unavailable'} items`);
46 }
47 written += payload.docs.length;
48 writer.emit('restored', { documents: payload.docs.length, total: written });
49 cb();
50 }).catch(err => {
51 err = error.convertResponseError(err);
52 debug(`Error writing docs ${err.name} ${err.message}`);
53 cb(err, payload);
54 });
55 }
56 }, parallelism);
57
58 let didError = false;
59
60 // write the contents of the buffer to CouchDB in blocks of bufferSize
61 function processBuffer(flush, callback) {
62 function taskCallback(err, payload) {
63 if (err && !didError) {
64 debug(`Queue task failed with error ${err.name}`);
65 didError = true;
66 q.kill();
67 writer.emit('error', err);
68 }
69 }
70
71 if (flush || buffer.length >= bufferSize) {
72 // work through the buffer to break off bufferSize chunks
73 // and feed the chunks to the queue
74 do {
75 // split the buffer into bufferSize chunks
76 const toSend = buffer.splice(0, bufferSize);
77
78 // and add the chunk to the queue
79 debug(`Adding ${toSend.length} to the write queue.`);
80 q.push({ docs: toSend }, taskCallback);
81 } while (buffer.length >= bufferSize);
82
83 // send any leftover documents to the queue
84 if (flush && buffer.length > 0) {
85 debug(`Adding remaining ${buffer.length} to the write queue.`);
86 q.push({ docs: buffer }, taskCallback);
87 }
88
89 // wait until the queue size falls to a reasonable level
90 async.until(
91 // wait until the queue length drops to twice the paralellism
92 // or until empty on the last write
93 function(callback) {
94 // if we encountered an error, stop this until loop
95 if (didError) {
96 return callback(null, true);
97 }
98 if (flush) {
99 callback(null, q.idle() && q.length() === 0);
100 } else {
101 callback(null, q.length() <= parallelism * 2);
102 }
103 },
104 function(cb) {
105 setTimeout(cb, 20);
106 },
107
108 function() {
109 if (flush && !didError) {
110 writer.emit('finished', { total: written });
111 }
112 // callback when we're happy with the queue size
113 callback();
114 });
115 } else {
116 callback();
117 }
118 }
119
120 // take an object
121 writer._transform = function(obj, encoding, done) {
122 // each obj that arrives here is a line from the backup file
123 // it should contain an array of objects. The length of the array
124 // depends on the bufferSize at backup time.
125 linenumber++;
126 if (!didError && obj !== '') {
127 // see if it parses as JSON
128 try {
129 const arr = JSON.parse(obj);
130
131 // if it's an array with a length
132 if (typeof arr === 'object' && arr.length > 0) {
133 // push each document into a buffer
134 buffer = buffer.concat(arr);
135
136 // pause the stream
137 // it's likely that the speed with which data can be read from disk
138 // may exceed the rate it can be written to CouchDB. To prevent
139 // the whole file being buffered in memory, we pause the stream here.
140 // it is resumed, when processBuffer calls back and we call done()
141 this.pause();
142
143 // break the buffer in to bufferSize chunks to be written to the database
144 processBuffer(false, done);
145 } else {
146 ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - not an array`));
147 done();
148 }
149 } catch (e) {
150 ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - cannot parse as JSON`));
151 // Could be an incomplete write that was subsequently resumed
152 done();
153 }
154 } else {
155 done();
156 }
157 };
158
159 // called when we need to flush everything
160 writer._flush = function(done) {
161 processBuffer(true, done);
162 };
163 return writer;
164};