UNPKG

6.06 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2022 Google LLC. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.RowQueue = exports.defaultOptions = void 0;
19const common = require("@google-cloud/common");
20const extend = require("extend");
21const uuid = require("uuid");
22const _1 = require(".");
23const rowBatch_1 = require("./rowBatch");
24exports.defaultOptions = {
25 // The maximum number of rows we'll batch up for insert().
26 maxOutstandingRows: 300,
27 // The maximum size of the total batched up rows for insert().
28 maxOutstandingBytes: 9 * 1024 * 1024,
29 // The maximum time we'll wait to send batched rows, in milliseconds.
30 maxDelayMillis: 10000,
31};
32/**
33 * Standard row queue used for inserting rows.
34 *
35 *
36 * @param {Table} table The table.
37 * @param {Duplex} dup Row stream.
38 * @param {InsertStreamOptions} options Insert and batch options.
39 */
40class RowQueue {
41 constructor(table, dup, options) {
42 this.insertRowsOptions = {};
43 this.table = table;
44 this.stream = dup;
45 this.inFlight = false;
46 const opts = typeof options === 'object' ? options : {};
47 if (opts.insertRowsOptions) {
48 this.insertRowsOptions = opts.insertRowsOptions;
49 }
50 else {
51 this.insertRowsOptions = {};
52 }
53 if (opts.batchOptions) {
54 this.setOptions(opts.batchOptions);
55 }
56 else {
57 this.setOptions();
58 }
59 this.batch = new rowBatch_1.RowBatch(this.batchOptions);
60 }
61 /**
62 * Adds a row to the queue.
63 *
64 * @param {RowMetadata} row The row to insert.
65 * @param {InsertRowsCallback} callback The insert callback.
66 */
67 add(row, callback) {
68 if (!this.insertRowsOptions.raw) {
69 row = {
70 json: _1.Table.encodeValue_(row),
71 };
72 if (this.insertRowsOptions.createInsertId !== false) {
73 row.insertId = uuid.v4();
74 }
75 }
76 if (!this.batch.canFit(row)) {
77 this.insert();
78 }
79 this.batch.add(row, callback);
80 if (this.batch.isFull()) {
81 this.insert();
82 }
83 else if (!this.pending) {
84 const { maxMilliseconds } = this.batchOptions;
85 this.pending = setTimeout(() => {
86 this.insert();
87 }, maxMilliseconds);
88 }
89 }
90 /**
91 * Cancels any pending inserts and calls _insert immediately.
92 */
93 insert(callback) {
94 const { rows, callbacks } = this.batch;
95 this.batch = new rowBatch_1.RowBatch(this.batchOptions);
96 if (this.pending) {
97 clearTimeout(this.pending);
98 delete this.pending;
99 }
100 if (rows.length > 0) {
101 this._insert(rows, callbacks, callback);
102 }
103 }
104 /**
105 * Accepts a batch of rows and inserts them into table.
106 *
107 * @param {object[]} rows The rows to insert.
108 * @param {InsertCallback[]} callbacks The corresponding callback functions.
109 * @param {function} [callback] Callback to be fired when insert is done.
110 */
111 _insert(rows, callbacks, cb) {
112 const json = extend(true, {}, this.insertRowsOptions, { rows });
113 delete json.createInsertId;
114 delete json.partialRetries;
115 delete json.raw;
116 this.table.request({
117 method: 'POST',
118 uri: '/insertAll',
119 json,
120 }, (err, resp) => {
121 const partialFailures = (resp.insertErrors || []).map((insertError) => {
122 return {
123 errors: insertError.errors.map(error => {
124 return {
125 message: error.message,
126 reason: error.reason,
127 };
128 }),
129 // eslint-disable-next-line @typescript-eslint/no-explicit-any
130 row: rows[insertError.index],
131 };
132 });
133 if (partialFailures.length > 0) {
134 err = new common.util.PartialFailureError({
135 errors: partialFailures,
136 response: resp,
137 });
138 callbacks.forEach(callback => callback(err, resp));
139 this.stream.emit('error', err);
140 }
141 else {
142 callbacks.forEach(callback => callback(err, resp));
143 this.stream.emit('response', resp);
144 cb(err, resp);
145 }
146 cb(err, resp);
147 });
148 }
149 /**
150 * Sets the batching options.
151 *
152 *
153 * @param {RowBatchOptions} [options] The batching options.
154 */
155 setOptions(options = {}) {
156 const defaults = this.getOptionDefaults();
157 const { maxBytes, maxRows, maxMilliseconds } = extend(true, defaults, options);
158 this.batchOptions = {
159 maxBytes: Math.min(maxBytes, rowBatch_1.BATCH_LIMITS.maxBytes),
160 maxRows: Math.min(maxRows, rowBatch_1.BATCH_LIMITS.maxRows),
161 maxMilliseconds: maxMilliseconds,
162 };
163 }
164 getOptionDefaults() {
165 // Return a unique copy to avoid shenanigans.
166 const defaults = {
167 maxBytes: exports.defaultOptions.maxOutstandingBytes,
168 maxRows: exports.defaultOptions.maxOutstandingRows,
169 maxMilliseconds: exports.defaultOptions.maxDelayMillis,
170 };
171 return defaults;
172 }
173}
174exports.RowQueue = RowQueue;
175//# sourceMappingURL=rowQueue.js.map
\No newline at end of file