1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | 'use strict';
|
15 |
|
16 | const async = require('async');
|
17 | const stream = require('stream');
|
18 | const zlib = require('zlib');
|
19 | const error = require('./error.js');
|
20 | const debug = require('debug')('couchbackup:writer');
|
21 |
|
22 | module.exports = function(db, bufferSize, parallelism, ee) {
|
23 | const writer = new stream.Transform({ objectMode: true });
|
24 | var buffer = [];
|
25 | var written = 0;
|
26 | var linenumber = 0;
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 | var q = async.queue(function(payload, cb) {
|
33 |
|
34 | if (payload.docs && payload.docs[0] && payload.docs[0]._rev) {
|
35 | payload.new_edits = false;
|
36 | }
|
37 |
|
38 |
|
39 | const payloadStream = new stream.PassThrough();
|
40 | payloadStream.end(Buffer.from(JSON.stringify(payload), 'utf8'));
|
41 | const zipstream = zlib.createGzip();
|
42 |
|
43 |
|
44 |
|
45 |
|
46 | class ResponseWriteable extends stream.Writable {
|
47 | constructor(options) {
|
48 | super(options);
|
49 | this.data = [];
|
50 | }
|
51 |
|
52 | _write(chunk, encoding, callback) {
|
53 | this.data.push(chunk);
|
54 | callback();
|
55 | }
|
56 |
|
57 | asJson() {
|
58 | return JSON.parse(Buffer.concat(this.data).toString());
|
59 | }
|
60 | }
|
61 |
|
62 | if (!didError) {
|
63 | var response;
|
64 | const responseBody = new ResponseWriteable();
|
65 | const req = db.server.request({
|
66 | db: db.config.db,
|
67 | path: '_bulk_docs',
|
68 | method: 'POST',
|
69 | headers: { 'content-encoding': 'gzip' },
|
70 | stream: true
|
71 | })
|
72 | .on('response', function(resp) {
|
73 | response = resp;
|
74 | })
|
75 | .on('end', function() {
|
76 | if (response.statusCode >= 400) {
|
77 | const err = error.convertResponseError(Object.assign({}, response, responseBody.asJson()));
|
78 | debug(`Error writing docs ${err.name} ${err.message}`);
|
79 | cb(err, payload);
|
80 | } else {
|
81 | written += payload.docs.length;
|
82 | writer.emit('restored', { documents: payload.docs.length, total: written });
|
83 | cb();
|
84 | }
|
85 | });
|
86 |
|
87 | payloadStream.pipe(zipstream).pipe(req);
|
88 |
|
89 | req.pipe(responseBody);
|
90 | }
|
91 | }, parallelism);
|
92 |
|
93 | var didError = false;
|
94 |
|
95 |
|
96 | function processBuffer(flush, callback) {
|
97 | function taskCallback(err, payload) {
|
98 | if (err && !didError) {
|
99 | debug(`Queue task failed with error ${err.name}`);
|
100 | didError = true;
|
101 | q.kill();
|
102 | writer.emit('error', err);
|
103 | }
|
104 | }
|
105 |
|
106 | if (flush || buffer.length >= bufferSize) {
|
107 |
|
108 |
|
109 | do {
|
110 |
|
111 | var toSend = buffer.splice(0, bufferSize);
|
112 |
|
113 |
|
114 | debug(`Adding ${toSend.length} to the write queue.`);
|
115 | q.push({ docs: toSend }, taskCallback);
|
116 | } while (buffer.length >= bufferSize);
|
117 |
|
118 |
|
119 | if (flush && buffer.length > 0) {
|
120 | debug(`Adding remaining ${buffer.length} to the write queue.`);
|
121 | q.push({ docs: buffer }, taskCallback);
|
122 | }
|
123 |
|
124 |
|
125 | async.until(
|
126 |
|
127 |
|
128 | function(callback) {
|
129 |
|
130 | if (didError) {
|
131 | return callback(null, true);
|
132 | }
|
133 | if (flush) {
|
134 | callback(null, q.idle() && q.length() === 0);
|
135 | } else {
|
136 | callback(null, q.length() <= parallelism * 2);
|
137 | }
|
138 | },
|
139 | function(cb) {
|
140 | setTimeout(cb, 20);
|
141 | },
|
142 |
|
143 | function() {
|
144 | if (flush && !didError) {
|
145 | writer.emit('finished', { total: written });
|
146 | }
|
147 |
|
148 | callback();
|
149 | });
|
150 | } else {
|
151 | callback();
|
152 | }
|
153 | }
|
154 |
|
155 |
|
156 | writer._transform = function(obj, encoding, done) {
|
157 |
|
158 |
|
159 |
|
160 | linenumber++;
|
161 | if (!didError && obj !== '') {
|
162 |
|
163 | try {
|
164 | var arr = JSON.parse(obj);
|
165 |
|
166 |
|
167 | if (typeof arr === 'object' && arr.length > 0) {
|
168 |
|
169 | buffer = buffer.concat(arr);
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 | this.pause();
|
177 |
|
178 |
|
179 | processBuffer(false, done);
|
180 | } else {
|
181 | ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - not an array`));
|
182 | done();
|
183 | }
|
184 | } catch (e) {
|
185 | ee.emit('error', new error.BackupError('BackupFileJsonError', `Error on line ${linenumber} of backup file - cannot parse as JSON`));
|
186 |
|
187 | done();
|
188 | }
|
189 | } else {
|
190 | done();
|
191 | }
|
192 | };
|
193 |
|
194 |
|
195 | writer._flush = function(done) {
|
196 | processBuffer(true, done);
|
197 | };
|
198 | return writer;
|
199 | };
|