1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.ModAckQueue = exports.AckQueue = exports.MessageQueue = exports.BatchError = void 0;
|
19 | const defer = require("p-defer");
|
20 | /**
|
21 | * Error class used to signal a batch failure.
|
22 | *
|
23 | * @class
|
24 | *
|
25 | * @param {string} message The error message.
|
26 | * @param {ServiceError} err The grpc service error.
|
27 | */
|
28 | class BatchError extends Error {
|
29 | constructor(err, ackIds, rpc) {
|
30 | super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
|
31 | this.ackIds = ackIds;
|
32 | this.code = err.code;
|
33 | this.details = err.details;
|
34 | this.metadata = err.metadata;
|
35 | }
|
36 | }
|
37 | exports.BatchError = BatchError;
|
38 | /**
|
39 | * @typedef {object} BatchOptions
|
40 | * @property {object} [callOptions] Request configuration option, outlined
|
41 | * here: {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html}.
|
42 | * @property {number} [maxMessages=3000] Maximum number of messages allowed in
|
43 | * each batch sent.
|
44 | * @property {number} [maxMilliseconds=100] Maximum duration to wait before
|
45 | * sending a batch. Batches can be sent earlier if the maxMessages option
|
46 | * is met before the configured duration has passed.
|
47 | */
|
48 | /**
|
49 | * Class for buffering ack/modAck requests.
|
50 | *
|
51 | * @private
|
52 | * @class
|
53 | *
|
54 | * @param {Subscriber} sub The subscriber we're queueing requests for.
|
55 | * @param {BatchOptions} options Batching options.
|
56 | */
|
57 | class MessageQueue {
|
58 | constructor(sub, options = {}) {
|
59 | this.numPendingRequests = 0;
|
60 | this.numInFlightRequests = 0;
|
61 | this._requests = [];
|
62 | this._subscriber = sub;
|
63 | this.setOptions(options);
|
64 | }
|
65 | /**
|
66 | * Gets the default buffer time in ms.
|
67 | *
|
68 | * @returns {number}
|
69 | * @private
|
70 | */
|
71 | get maxMilliseconds() {
|
72 | return this._options.maxMilliseconds;
|
73 | }
|
74 | /**
|
75 | * Adds a message to the queue.
|
76 | *
|
77 | * @param {Message} message The message to add.
|
78 | * @param {number} [deadline] The deadline.
|
79 | * @private
|
80 | */
|
81 | add({ ackId }, deadline) {
|
82 | const { maxMessages, maxMilliseconds } = this._options;
|
83 | this._requests.push([ackId, deadline]);
|
84 | this.numPendingRequests += 1;
|
85 | this.numInFlightRequests += 1;
|
86 | if (this._requests.length >= maxMessages) {
|
87 | this.flush();
|
88 | }
|
89 | else if (!this._timer) {
|
90 | this._timer = setTimeout(() => this.flush(), maxMilliseconds);
|
91 | }
|
92 | }
|
93 | /**
|
94 | * Sends a batch of messages.
|
95 | * @private
|
96 | */
|
97 | async flush() {
|
98 | if (this._timer) {
|
99 | clearTimeout(this._timer);
|
100 | delete this._timer;
|
101 | }
|
102 | const batch = this._requests;
|
103 | const batchSize = batch.length;
|
104 | const deferred = this._onFlush;
|
105 | this._requests = [];
|
106 | this.numPendingRequests -= batchSize;
|
107 | delete this._onFlush;
|
108 | try {
|
109 | await this._sendBatch(batch);
|
110 | }
|
111 | catch (e) {
|
112 | // These queues are used for ack and modAck messages, which should
|
113 | // never surface an error to the user level. However, we'll emit
|
114 | // them onto this debug channel in case debug info is needed.
|
115 | this._subscriber.emit('debug', e);
|
116 | }
|
117 | this.numInFlightRequests -= batchSize;
|
118 | if (deferred) {
|
119 | deferred.resolve();
|
120 | }
|
121 | if (this.numInFlightRequests <= 0 && this._onDrain) {
|
122 | this._onDrain.resolve();
|
123 | delete this._onDrain;
|
124 | }
|
125 | }
|
126 | /**
|
127 | * Returns a promise that resolves after the next flush occurs.
|
128 | *
|
129 | * @returns {Promise}
|
130 | * @private
|
131 | */
|
132 | onFlush() {
|
133 | if (!this._onFlush) {
|
134 | this._onFlush = defer();
|
135 | }
|
136 | return this._onFlush.promise;
|
137 | }
|
138 | /**
|
139 | * Returns a promise that resolves when all in-flight messages have settled.
|
140 | */
|
141 | onDrain() {
|
142 | if (!this._onDrain) {
|
143 | this._onDrain = defer();
|
144 | }
|
145 | return this._onDrain.promise;
|
146 | }
|
147 | /**
|
148 | * Set the batching options.
|
149 | *
|
150 | * @param {BatchOptions} options Batching options.
|
151 | * @private
|
152 | */
|
153 | setOptions(options) {
|
154 | const defaults = { maxMessages: 3000, maxMilliseconds: 100 };
|
155 | this._options = Object.assign(defaults, options);
|
156 | }
|
157 | }
|
158 | exports.MessageQueue = MessageQueue;
|
159 | /**
|
160 | * Queues up Acknowledge (ack) requests.
|
161 | *
|
162 | * @private
|
163 | * @class
|
164 | */
|
165 | class AckQueue extends MessageQueue {
|
166 | /**
|
167 | * Sends a batch of ack requests.
|
168 | *
|
169 | * @private
|
170 | *
|
171 | * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
|
172 | * @return {Promise}
|
173 | */
|
174 | async _sendBatch(batch) {
|
175 | const client = await this._subscriber.getClient();
|
176 | const ackIds = batch.map(([ackId]) => ackId);
|
177 | const reqOpts = { subscription: this._subscriber.name, ackIds };
|
178 | try {
|
179 | await client.acknowledge(reqOpts, this._options.callOptions);
|
180 | }
|
181 | catch (e) {
|
182 | throw new BatchError(e, ackIds, 'acknowledge');
|
183 | }
|
184 | }
|
185 | }
|
186 | exports.AckQueue = AckQueue;
|
187 | /**
|
188 | * Queues up ModifyAckDeadline requests and sends them out in batches.
|
189 | *
|
190 | * @private
|
191 | * @class
|
192 | */
|
193 | class ModAckQueue extends MessageQueue {
|
194 | /**
|
195 | * Sends a batch of modAck requests. Each deadline requires its own request,
|
196 | * so we have to group all the ackIds by deadline and send multiple requests.
|
197 | *
|
198 | * @private
|
199 | *
|
200 | * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
|
201 | * @return {Promise}
|
202 | */
|
203 | async _sendBatch(batch) {
|
204 | const client = await this._subscriber.getClient();
|
205 | const subscription = this._subscriber.name;
|
206 | const modAckTable = batch.reduce((table, [ackId, deadline]) => {
|
207 | if (!table[deadline]) {
|
208 | table[deadline] = [];
|
209 | }
|
210 | table[deadline].push(ackId);
|
211 | return table;
|
212 | }, {});
|
213 | const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
|
214 | const ackIds = modAckTable[deadline];
|
215 | const ackDeadlineSeconds = Number(deadline);
|
216 | const reqOpts = { subscription, ackIds, ackDeadlineSeconds };
|
217 | try {
|
218 | await client.modifyAckDeadline(reqOpts, this._options.callOptions);
|
219 | }
|
220 | catch (e) {
|
221 | throw new BatchError(e, ackIds, 'modifyAckDeadline');
|
222 | }
|
223 | });
|
224 | await Promise.all(modAckRequests);
|
225 | }
|
226 | }
|
227 | exports.ModAckQueue = ModAckQueue;
|
228 | //# sourceMappingURL=message-queues.js.map |
\ | No newline at end of file |