UNPKG

8.55 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.LeaseManager = void 0;
19const events_1 = require("events");
20const default_options_1 = require("./default-options");
21/**
22 * @typedef {object} FlowControlOptions
23 * @property {boolean} [allowExcessMessages=true] PubSub delivers messages in
24 * batches with no way to configure the batch size. Sometimes this can be
25 * overwhelming if you only want to process a few messages at a time.
26 * Setting this option to false will make the client manage any excess
27 * messages until you're ready for them. This will prevent them from being
28 * redelivered and make the maxMessages option behave more predictably.
29 * @property {number} [maxBytes=104857600] The desired amount of memory to
30 * allow message data to consume. (Default: 100MB) It's possible that this
31 * value will be exceeded, since messages are received in batches.
32 * @property {number} [maxExtensionMinutes=60] The maximum duration (in minutes)
33 * to extend the message deadline before redelivering.
34 * @property {number} [maxMessages=1000] The desired number of messages to allow
35 * in memory before pausing the message stream. Unless allowExcessMessages
36 * is set to false, it is very likely that this value will be exceeded since
37 * any given message batch could contain a greater number of messages than
38 * the desired amount of messages.
39 */
40/**
41 * Manages a Subscribers inventory while auto-magically extending the message
42 * deadlines.
43 *
44 * @private
45 * @class
46 *
47 * @param {Subscriber} sub The subscriber to manage leases for.
48 * @param {FlowControlOptions} options Flow control options.
49 */
50class LeaseManager extends events_1.EventEmitter {
51 constructor(sub, options = {}) {
52 super();
53 this.bytes = 0;
54 this._isLeasing = false;
55 this._messages = new Set();
56 this._pending = [];
57 this._subscriber = sub;
58 this.setOptions(options);
59 }
60 /**
61 * @type {number}
62 * @private
63 */
64 get pending() {
65 return this._pending.length;
66 }
67 /**
68 * @type {number}
69 * @private
70 */
71 get size() {
72 return this._messages.size;
73 }
74 /**
75 * Adds a message to the inventory, kicking off the deadline extender if it
76 * isn't already running.
77 *
78 * @param {Message} message The message.
79 * @private
80 */
81 add(message) {
82 const { allowExcessMessages } = this._options;
83 const wasFull = this.isFull();
84 this._messages.add(message);
85 this.bytes += message.length;
86 if (allowExcessMessages || !wasFull) {
87 this._dispense(message);
88 }
89 else {
90 this._pending.push(message);
91 }
92 if (!this._isLeasing) {
93 this._isLeasing = true;
94 this._scheduleExtension();
95 }
96 if (!wasFull && this.isFull()) {
97 this.emit('full');
98 }
99 }
100 /**
101 * Removes ALL messages from inventory.
102 * @private
103 */
104 clear() {
105 const wasFull = this.isFull();
106 this._pending = [];
107 this._messages.clear();
108 this.bytes = 0;
109 if (wasFull) {
110 process.nextTick(() => this.emit('free'));
111 }
112 this._cancelExtension();
113 }
114 /**
115 * Indicates if we're at or over capacity.
116 *
117 * @returns {boolean}
118 * @private
119 */
120 isFull() {
121 const { maxBytes, maxMessages } = this._options;
122 return this.size >= maxMessages || this.bytes >= maxBytes;
123 }
124 /**
125 * Removes a message from the inventory. Stopping the deadline extender if no
126 * messages are left over.
127 *
128 * @fires LeaseManager#free
129 *
130 * @param {Message} message The message to remove.
131 * @private
132 */
133 remove(message) {
134 if (!this._messages.has(message)) {
135 return;
136 }
137 const wasFull = this.isFull();
138 this._messages.delete(message);
139 this.bytes -= message.length;
140 if (wasFull && !this.isFull()) {
141 process.nextTick(() => this.emit('free'));
142 }
143 else if (this._pending.includes(message)) {
144 const index = this._pending.indexOf(message);
145 this._pending.splice(index, 1);
146 }
147 else if (this.pending > 0) {
148 this._dispense(this._pending.shift());
149 }
150 if (this.size === 0 && this._isLeasing) {
151 this._cancelExtension();
152 }
153 }
154 /**
155 * Sets options for the LeaseManager.
156 *
157 * @param {FlowControlOptions} [options] The options.
158 *
159 * @throws {RangeError} If both maxExtension and maxExtensionMinutes are set.
160 *
161 * @private
162 */
163 setOptions(options) {
164 // Convert the old deprecated maxExtension to avoid breaking clients,
165 // but allow only one.
166 if (options.maxExtension !== undefined &&
167 options.maxExtensionMinutes !== undefined) {
168 throw new RangeError('Only one of "maxExtension" or "maxExtensionMinutes" may be set for subscriber lease management options');
169 }
170 if (options.maxExtension !== undefined &&
171 options.maxExtensionMinutes === undefined) {
172 options.maxExtensionMinutes = options.maxExtension / 60;
173 delete options.maxExtension;
174 }
175 const defaults = {
176 allowExcessMessages: true,
177 maxBytes: default_options_1.defaultOptions.subscription.maxOutstandingBytes,
178 maxExtensionMinutes: default_options_1.defaultOptions.subscription.maxExtensionMinutes,
179 maxMessages: default_options_1.defaultOptions.subscription.maxOutstandingMessages,
180 };
181 this._options = Object.assign(defaults, options);
182 }
183 /**
184 * Stops extending message deadlines.
185 *
186 * @private
187 */
188 _cancelExtension() {
189 this._isLeasing = false;
190 if (this._timer) {
191 clearTimeout(this._timer);
192 delete this._timer;
193 }
194 }
195 /**
196 * Emits the message. Emitting messages is very slow, so to avoid it acting
197 * as a bottleneck, we're wrapping it in nextTick.
198 *
199 * @private
200 *
201 * @fires Subscriber#message
202 *
203 * @param {Message} message The message to emit.
204 */
205 _dispense(message) {
206 if (this._subscriber.isOpen) {
207 process.nextTick(() => this._subscriber.emit('message', message));
208 }
209 }
210 /**
211 * Loops through inventory and extends the deadlines for any messages that
212 * have not hit the max extension option.
213 *
214 * @private
215 */
216 _extendDeadlines() {
217 const deadline = this._subscriber.ackDeadline;
218 for (const message of this._messages) {
219 // Lifespan here is in minutes.
220 const lifespan = (Date.now() - message.received) / (60 * 1000);
221 if (lifespan < this._options.maxExtensionMinutes) {
222 message.modAck(deadline);
223 }
224 else {
225 this.remove(message);
226 }
227 }
228 if (this._isLeasing) {
229 this._scheduleExtension();
230 }
231 }
232 /**
233 * Creates a timeout(ms) that should allow us to extend any message deadlines
234 * before they would be redelivered.
235 *
236 * @private
237 *
238 * @returns {number}
239 */
240 _getNextExtensionTimeoutMs() {
241 const jitter = Math.random();
242 const deadline = this._subscriber.ackDeadline * 1000;
243 const latency = this._subscriber.modAckLatency;
244 return (deadline * 0.9 - latency) * jitter;
245 }
246 /**
247 * Schedules an deadline extension for all messages.
248 *
249 * @private
250 */
251 _scheduleExtension() {
252 const timeout = this._getNextExtensionTimeoutMs();
253 this._timer = setTimeout(() => this._extendDeadlines(), timeout);
254 }
255}
256exports.LeaseManager = LeaseManager;
257//# sourceMappingURL=lease-manager.js.map
\No newline at end of file