UNPKG

6.99 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.ModAckQueue = exports.AckQueue = exports.MessageQueue = exports.BatchError = void 0;
19const defer = require("p-defer");
20/**
21 * Error class used to signal a batch failure.
22 *
23 * @class
24 *
25 * @param {string} message The error message.
26 * @param {ServiceError} err The grpc service error.
27 */
28class BatchError extends Error {
29 constructor(err, ackIds, rpc) {
30 super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
31 this.ackIds = ackIds;
32 this.code = err.code;
33 this.details = err.details;
34 this.metadata = err.metadata;
35 }
36}
37exports.BatchError = BatchError;
38/**
39 * @typedef {object} BatchOptions
40 * @property {object} [callOptions] Request configuration option, outlined
41 * here: {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html}.
42 * @property {number} [maxMessages=3000] Maximum number of messages allowed in
43 * each batch sent.
44 * @property {number} [maxMilliseconds=100] Maximum duration to wait before
45 * sending a batch. Batches can be sent earlier if the maxMessages option
46 * is met before the configured duration has passed.
47 */
48/**
49 * Class for buffering ack/modAck requests.
50 *
51 * @private
52 * @class
53 *
54 * @param {Subscriber} sub The subscriber we're queueing requests for.
55 * @param {BatchOptions} options Batching options.
56 */
57class MessageQueue {
58 constructor(sub, options = {}) {
59 this.numPendingRequests = 0;
60 this.numInFlightRequests = 0;
61 this._requests = [];
62 this._subscriber = sub;
63 this.setOptions(options);
64 }
65 /**
66 * Gets the default buffer time in ms.
67 *
68 * @returns {number}
69 * @private
70 */
71 get maxMilliseconds() {
72 return this._options.maxMilliseconds;
73 }
74 /**
75 * Adds a message to the queue.
76 *
77 * @param {Message} message The message to add.
78 * @param {number} [deadline] The deadline.
79 * @private
80 */
81 add({ ackId }, deadline) {
82 const { maxMessages, maxMilliseconds } = this._options;
83 this._requests.push([ackId, deadline]);
84 this.numPendingRequests += 1;
85 this.numInFlightRequests += 1;
86 if (this._requests.length >= maxMessages) {
87 this.flush();
88 }
89 else if (!this._timer) {
90 this._timer = setTimeout(() => this.flush(), maxMilliseconds);
91 }
92 }
93 /**
94 * Sends a batch of messages.
95 * @private
96 */
97 async flush() {
98 if (this._timer) {
99 clearTimeout(this._timer);
100 delete this._timer;
101 }
102 const batch = this._requests;
103 const batchSize = batch.length;
104 const deferred = this._onFlush;
105 this._requests = [];
106 this.numPendingRequests -= batchSize;
107 delete this._onFlush;
108 try {
109 await this._sendBatch(batch);
110 }
111 catch (e) {
112 this._subscriber.emit('error', e);
113 }
114 this.numInFlightRequests -= batchSize;
115 if (deferred) {
116 deferred.resolve();
117 }
118 if (this.numInFlightRequests <= 0 && this._onDrain) {
119 this._onDrain.resolve();
120 delete this._onDrain;
121 }
122 }
123 /**
124 * Returns a promise that resolves after the next flush occurs.
125 *
126 * @returns {Promise}
127 * @private
128 */
129 onFlush() {
130 if (!this._onFlush) {
131 this._onFlush = defer();
132 }
133 return this._onFlush.promise;
134 }
135 /**
136 * Returns a promise that resolves when all in-flight messages have settled.
137 */
138 onDrain() {
139 if (!this._onDrain) {
140 this._onDrain = defer();
141 }
142 return this._onDrain.promise;
143 }
144 /**
145 * Set the batching options.
146 *
147 * @param {BatchOptions} options Batching options.
148 * @private
149 */
150 setOptions(options) {
151 const defaults = { maxMessages: 3000, maxMilliseconds: 100 };
152 this._options = Object.assign(defaults, options);
153 }
154}
155exports.MessageQueue = MessageQueue;
156/**
157 * Queues up Acknowledge (ack) requests.
158 *
159 * @private
160 * @class
161 */
162class AckQueue extends MessageQueue {
163 /**
164 * Sends a batch of ack requests.
165 *
166 * @private
167 *
168 * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
169 * @return {Promise}
170 */
171 async _sendBatch(batch) {
172 const client = await this._subscriber.getClient();
173 const ackIds = batch.map(([ackId]) => ackId);
174 const reqOpts = { subscription: this._subscriber.name, ackIds };
175 try {
176 await client.acknowledge(reqOpts, this._options.callOptions);
177 }
178 catch (e) {
179 throw new BatchError(e, ackIds, 'acknowledge');
180 }
181 }
182}
183exports.AckQueue = AckQueue;
184/**
185 * Queues up ModifyAckDeadline requests and sends them out in batches.
186 *
187 * @private
188 * @class
189 */
190class ModAckQueue extends MessageQueue {
191 /**
192 * Sends a batch of modAck requests. Each deadline requires its own request,
193 * so we have to group all the ackIds by deadline and send multiple requests.
194 *
195 * @private
196 *
197 * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
198 * @return {Promise}
199 */
200 async _sendBatch(batch) {
201 const client = await this._subscriber.getClient();
202 const subscription = this._subscriber.name;
203 const modAckTable = batch.reduce((table, [ackId, deadline]) => {
204 if (!table[deadline]) {
205 table[deadline] = [];
206 }
207 table[deadline].push(ackId);
208 return table;
209 }, {});
210 const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
211 const ackIds = modAckTable[deadline];
212 const ackDeadlineSeconds = Number(deadline);
213 const reqOpts = { subscription, ackIds, ackDeadlineSeconds };
214 try {
215 await client.modifyAckDeadline(reqOpts, this._options.callOptions);
216 }
217 catch (e) {
218 throw new BatchError(e, ackIds, 'modifyAckDeadline');
219 }
220 });
221 await Promise.all(modAckRequests);
222 }
223}
224exports.ModAckQueue = ModAckQueue;
225//# sourceMappingURL=message-queues.js.map
\No newline at end of file