UNPKG

7.22 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.ModAckQueue = exports.AckQueue = exports.MessageQueue = exports.BatchError = void 0;
19const defer = require("p-defer");
20/**
21 * Error class used to signal a batch failure.
22 *
23 * @class
24 *
25 * @param {string} message The error message.
26 * @param {ServiceError} err The grpc service error.
27 */
28class BatchError extends Error {
29 constructor(err, ackIds, rpc) {
30 super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
31 this.ackIds = ackIds;
32 this.code = err.code;
33 this.details = err.details;
34 this.metadata = err.metadata;
35 }
36}
37exports.BatchError = BatchError;
38/**
39 * @typedef {object} BatchOptions
40 * @property {object} [callOptions] Request configuration option, outlined
41 * here: {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html}.
42 * @property {number} [maxMessages=3000] Maximum number of messages allowed in
43 * each batch sent.
44 * @property {number} [maxMilliseconds=100] Maximum duration to wait before
45 * sending a batch. Batches can be sent earlier if the maxMessages option
46 * is met before the configured duration has passed.
47 */
48/**
49 * Class for buffering ack/modAck requests.
50 *
51 * @private
52 * @class
53 *
54 * @param {Subscriber} sub The subscriber we're queueing requests for.
55 * @param {BatchOptions} options Batching options.
56 */
57class MessageQueue {
58 constructor(sub, options = {}) {
59 this.numPendingRequests = 0;
60 this.numInFlightRequests = 0;
61 this._requests = [];
62 this._subscriber = sub;
63 this.setOptions(options);
64 }
65 /**
66 * Gets the default buffer time in ms.
67 *
68 * @returns {number}
69 * @private
70 */
71 get maxMilliseconds() {
72 return this._options.maxMilliseconds;
73 }
74 /**
75 * Adds a message to the queue.
76 *
77 * @param {Message} message The message to add.
78 * @param {number} [deadline] The deadline.
79 * @private
80 */
81 add({ ackId }, deadline) {
82 const { maxMessages, maxMilliseconds } = this._options;
83 this._requests.push([ackId, deadline]);
84 this.numPendingRequests += 1;
85 this.numInFlightRequests += 1;
86 if (this._requests.length >= maxMessages) {
87 this.flush();
88 }
89 else if (!this._timer) {
90 this._timer = setTimeout(() => this.flush(), maxMilliseconds);
91 }
92 }
93 /**
94 * Sends a batch of messages.
95 * @private
96 */
97 async flush() {
98 if (this._timer) {
99 clearTimeout(this._timer);
100 delete this._timer;
101 }
102 const batch = this._requests;
103 const batchSize = batch.length;
104 const deferred = this._onFlush;
105 this._requests = [];
106 this.numPendingRequests -= batchSize;
107 delete this._onFlush;
108 try {
109 await this._sendBatch(batch);
110 }
111 catch (e) {
112 // These queues are used for ack and modAck messages, which should
113 // never surface an error to the user level. However, we'll emit
114 // them onto this debug channel in case debug info is needed.
115 this._subscriber.emit('debug', e);
116 }
117 this.numInFlightRequests -= batchSize;
118 if (deferred) {
119 deferred.resolve();
120 }
121 if (this.numInFlightRequests <= 0 && this._onDrain) {
122 this._onDrain.resolve();
123 delete this._onDrain;
124 }
125 }
126 /**
127 * Returns a promise that resolves after the next flush occurs.
128 *
129 * @returns {Promise}
130 * @private
131 */
132 onFlush() {
133 if (!this._onFlush) {
134 this._onFlush = defer();
135 }
136 return this._onFlush.promise;
137 }
138 /**
139 * Returns a promise that resolves when all in-flight messages have settled.
140 */
141 onDrain() {
142 if (!this._onDrain) {
143 this._onDrain = defer();
144 }
145 return this._onDrain.promise;
146 }
147 /**
148 * Set the batching options.
149 *
150 * @param {BatchOptions} options Batching options.
151 * @private
152 */
153 setOptions(options) {
154 const defaults = { maxMessages: 3000, maxMilliseconds: 100 };
155 this._options = Object.assign(defaults, options);
156 }
157}
158exports.MessageQueue = MessageQueue;
159/**
160 * Queues up Acknowledge (ack) requests.
161 *
162 * @private
163 * @class
164 */
165class AckQueue extends MessageQueue {
166 /**
167 * Sends a batch of ack requests.
168 *
169 * @private
170 *
171 * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
172 * @return {Promise}
173 */
174 async _sendBatch(batch) {
175 const client = await this._subscriber.getClient();
176 const ackIds = batch.map(([ackId]) => ackId);
177 const reqOpts = { subscription: this._subscriber.name, ackIds };
178 try {
179 await client.acknowledge(reqOpts, this._options.callOptions);
180 }
181 catch (e) {
182 throw new BatchError(e, ackIds, 'acknowledge');
183 }
184 }
185}
186exports.AckQueue = AckQueue;
187/**
188 * Queues up ModifyAckDeadline requests and sends them out in batches.
189 *
190 * @private
191 * @class
192 */
193class ModAckQueue extends MessageQueue {
194 /**
195 * Sends a batch of modAck requests. Each deadline requires its own request,
196 * so we have to group all the ackIds by deadline and send multiple requests.
197 *
198 * @private
199 *
200 * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
201 * @return {Promise}
202 */
203 async _sendBatch(batch) {
204 const client = await this._subscriber.getClient();
205 const subscription = this._subscriber.name;
206 const modAckTable = batch.reduce((table, [ackId, deadline]) => {
207 if (!table[deadline]) {
208 table[deadline] = [];
209 }
210 table[deadline].push(ackId);
211 return table;
212 }, {});
213 const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
214 const ackIds = modAckTable[deadline];
215 const ackDeadlineSeconds = Number(deadline);
216 const reqOpts = { subscription, ackIds, ackDeadlineSeconds };
217 try {
218 await client.modifyAckDeadline(reqOpts, this._options.callOptions);
219 }
220 catch (e) {
221 throw new BatchError(e, ackIds, 'modifyAckDeadline');
222 }
223 });
224 await Promise.all(modAckRequests);
225 }
226}
227exports.ModAckQueue = ModAckQueue;
228//# sourceMappingURL=message-queues.js.map
\No newline at end of file