UNPKG

3.43 kBJavaScriptView Raw
1// Copyright © 2017, 2018 IBM Corp. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14'use strict';
15
16const fs = require('fs');
17const liner = require('./liner.js');
18const change = require('./change.js');
19const error = require('./error.js');
20const debug = require('debug')('couchbackup:spoolchanges');
21
22/**
23 * Write log file for all changes from a database, ready for downloading
24 * in batches.
25 *
26 * @param {string} dbUrl - URL of database
27 * @param {string} log - path to log file to use
28 * @param {number} bufferSize - the number of changes per batch/log line
29 * @param {function(err)} callback - a callback to run on completion
30 */
31module.exports = function(db, log, bufferSize, ee, callback) {
32 // list of document ids to process
33 var buffer = [];
34 var batch = 0;
35 var lastSeq = null;
36 var logStream = fs.createWriteStream(log);
37
38 // send documents ids to the queue in batches of bufferSize + the last batch
39 var processBuffer = function(lastOne) {
40 if (buffer.length >= bufferSize || (lastOne && buffer.length > 0)) {
41 debug('writing', buffer.length, 'changes to the backup file');
42 var b = { docs: buffer.splice(0, bufferSize), batch: batch };
43 logStream.write(':t batch' + batch + ' ' + JSON.stringify(b.docs) + '\n');
44 ee.emit('changes', batch);
45 batch++;
46 }
47 };
48
49 // called once per received change
50 var onChange = function(c) {
51 if (c) {
52 if (c.error) {
53 ee.emit('error', new error.BackupError('InvalidChange', `Received invalid change: ${c}`));
54 } else if (c.changes) {
55 var obj = { id: c.id };
56 buffer.push(obj);
57 processBuffer(false);
58 } else if (c.last_seq) {
59 lastSeq = c.last_seq;
60 }
61 }
62 };
63
64 // stream the changes feed to disk
65 var changesRequest = db.changesAsStream({ seq_interval: 10000 })
66 .on('error', function(err) {
67 callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
68 })
69 .on('response', function(resp) {
70 if (resp.statusCode >= 400) {
71 changesRequest.abort();
72 callback(error.convertResponseError(resp));
73 } else {
74 changesRequest.pipe(liner())
75 .on('error', function(err) {
76 callback(err);
77 })
78 .pipe(change(onChange))
79 .on('error', function(err) {
80 callback(err);
81 })
82 .on('finish', function() {
83 processBuffer(true);
84 if (!lastSeq) {
85 logStream.end();
86 debug('changes request terminated before last_seq was sent');
87 callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
88 } else {
89 debug('finished streaming database changes');
90 logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
91 }
92 });
93 }
94 });
95};