UNPKG

6.92 kBJavaScriptView Raw
1// Copyright © 2017, 2019 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const async = require('async');
17const stream = require('stream');
18const zlib = require('zlib');
19const error = require('./error.js');
20const debug = require('debug')('couchbackup:writer');
21
22module.exports = function(db, bufferSize, parallelism, ee) {
23 const writer = new stream.Transform({ objectMode: true });
24 var buffer = [];
25 var written = 0;
26 var linenumber = 0;
27
28 // this is the queue of chunks that are written to the database
29 // the queue's payload will be an array of documents to be written,
30 // the size of the array will be bufferSize. The variable parallelism
31 // determines how many HTTP requests will occur at any one time.
32 var q = async.queue(function(payload, cb) {
33 // if we are restoring known revisions, we need to supply new_edits=false
34 if (payload.docs && payload.docs[0] && payload.docs[0]._rev) {
35 payload.new_edits = false;
36 }
37
38 // Stream the payload through a zip stream to the server
39 const payloadStream = new stream.PassThrough();
40 payloadStream.end(Buffer.from(JSON.stringify(payload), 'utf8'));
41 const zipstream = zlib.createGzip();
42
43 // Class for streaming _bulk_docs responses into
44 // In general the response is [] or a small error/reason JSON object
45 // so it is OK to have this in memory.
46 class ResponseWriteable extends stream.Writable {
47 constructor(options) {
48 super(options);
49 this.data = [];
50 }
51
52 _write(chunk, encoding, callback) {
53 this.data.push(chunk);
54 callback();
55 }
56
57 asJson() {
58 return JSON.parse(Buffer.concat(this.data).toString());
59 }
60 }
61
62 if (!didError) {
63 var response;
64 const responseBody = new ResponseWriteable();
65 const req = db.server.request({
66 db: db.config.db,
67 path: '_bulk_docs',
68 method: 'POST',
69 headers: { 'content-encoding': 'gzip' },
70 stream: true
71 })
72 .on('response', function(resp) {
73 response = resp;
74 })
75 .on('end', function() {
76 if (response.statusCode >= 400) {
77 const err = error.convertResponseError(Object.assign({}, response, responseBody.asJson()));
78 debug(`Error writing docs ${err.name} ${err.message}`);
79 cb(err, payload);
80 } else {
81 written += payload.docs.length;
82 writer.emit('restored', { documents: payload.docs.length, total: written });
83 cb();
84 }
85 });
86 // Pipe the payload into the request object to POST to _bulk_docs
87 payloadStream.pipe(zipstream).pipe(req);
88 // Pipe the request object's response into our bulkDocsResponse
89 req.pipe(responseBody);
90 }
91 }, parallelism);
92
93 var didError = false;
94
95 // write the contents of the buffer to CouchDB in blocks of bufferSize
96 function processBuffer(flush, callback) {
97 function taskCallback(err, payload) {
98 if (err && !didError) {
99 debug(`Queue task failed with error ${err.name}`);
100 didError = true;
101 q.kill();
102 writer.emit('error', err);
103 }
104 }
105
106 if (flush || buffer.length >= bufferSize) {
107 // work through the buffer to break off bufferSize chunks
108 // and feed the chunks to the queue
109 do {
110 // split the buffer into bufferSize chunks
111 var toSend = buffer.splice(0, bufferSize);
112
113 // and add the chunk to the queue
114 debug(`Adding ${toSend.length} to the write queue.`);
115 q.push({ docs: toSend }, taskCallback);
116 } while (buffer.length >= bufferSize);
117
118 // send any leftover documents to the queue
119 if (flush && buffer.length > 0) {
120 debug(`Adding remaining ${buffer.length} to the write queue.`);
121 q.push({ docs: buffer }, taskCallback);
122 }
123
124 // wait until the queue size falls to a reasonable level
125 async.until(
126 // wait until the queue length drops to twice the paralellism
127 // or until empty on the last write
128 function(callback) {
129 // if we encountered an error, stop this until loop
130 if (didError) {
131 return callback(null, true);
132 }
133 if (flush) {
134 callback(null, q.idle() && q.length() === 0);
135 } else {
136 callback(null, q.length() <= parallelism * 2);
137 }
138 },
139 function(cb) {
140 setTimeout(cb, 20);
141 },
142
143 function() {
144 if (flush && !didError) {
145 writer.emit('finished', { total: written });
146 }
147 // callback when we're happy with the queue size
148 callback();
149 });
150 } else {
151 callback();
152 }
153 }
154
155 // take an object
156 writer._transform = function(obj, encoding, done) {
157 // each obj that arrives here is a line from the backup file
158 // it should contain an array of objects. The length of the array
159 // depends on the bufferSize at backup time.
160 linenumber++;
161 if (!didError && obj !== '') {
162 // see if it parses as JSON
163 try {
164 var arr = JSON.parse(obj);
165
166 // if it's an array with a length
167 if (typeof arr === 'object' && arr.length > 0) {
168 // push each document into a buffer
169 buffer = buffer.concat(arr);
170
171 // pause the stream
172 // it's likely that the speed with which data can be read from disk
173 // may exceed the rate it can be written to CouchDB. To prevent
174 // the whole file being buffered in memory, we pause the stream here.
175 // it is resumed, when processBuffer calls back and we call done()
176 this.pause();
177
178 // break the buffer in to bufferSize chunks to be written to the database
179 processBuffer(false, done);
180 } else {
181 ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - not an array`));
182 done();
183 }
184 } catch (e) {
185 ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - cannot parse as JSON`));
186 // Could be an incomplete write that was subsequently resumed
187 done();
188 }
189 } else {
190 done();
191 }
192 };
193
194 // called when we need to flush everything
195 writer._flush = function(done) {
196 processBuffer(true, done);
197 };
198 return writer;
199};