UNPKG

7.79 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.LeaseManager = void 0;
19const events_1 = require("events");
20const default_options_1 = require("./default-options");
21/**
22 * @typedef {object} FlowControlOptions
23 * @property {boolean} [allowExcessMessages=true] PubSub delivers messages in
24 * batches with no way to configure the batch size. Sometimes this can be
25 * overwhelming if you only want to process a few messages at a time.
26 * Setting this option to false will make the client manage any excess
27 * messages until you're ready for them. This will prevent them from being
28 * redelivered and make the maxMessages option behave more predictably.
29 * @property {number} [maxBytes=104857600] The desired amount of memory to
30 * allow message data to consume. (Default: 100MB) It's possible that this
31 * value will be exceeded, since messages are received in batches.
32 * @property {number} [maxExtension=60] The maximum duration (in seconds)
33 * to extend the message deadline before redelivering.
34 * @property {number} [maxMessages=1000] The desired number of messages to allow
35 * in memory before pausing the message stream. Unless allowExcessMessages
36 * is set to false, it is very likely that this value will be exceeded since
37 * any given message batch could contain a greater number of messages than
38 * the desired amount of messages.
39 */
40/**
41 * Manages a Subscribers inventory while auto-magically extending the message
42 * deadlines.
43 *
44 * @private
45 * @class
46 *
47 * @param {Subscriber} sub The subscriber to manage leases for.
48 * @param {FlowControlOptions} options Flow control options.
49 */
50class LeaseManager extends events_1.EventEmitter {
51 constructor(sub, options = {}) {
52 super();
53 this.bytes = 0;
54 this._isLeasing = false;
55 this._messages = new Set();
56 this._pending = [];
57 this._subscriber = sub;
58 this.setOptions(options);
59 }
60 /**
61 * @type {number}
62 * @private
63 */
64 get pending() {
65 return this._pending.length;
66 }
67 /**
68 * @type {number}
69 * @private
70 */
71 get size() {
72 return this._messages.size;
73 }
74 /**
75 * Adds a message to the inventory, kicking off the deadline extender if it
76 * isn't already running.
77 *
78 * @param {Message} message The message.
79 * @private
80 */
81 add(message) {
82 const { allowExcessMessages } = this._options;
83 const wasFull = this.isFull();
84 this._messages.add(message);
85 this.bytes += message.length;
86 if (allowExcessMessages || !wasFull) {
87 this._dispense(message);
88 }
89 else {
90 this._pending.push(message);
91 }
92 if (!this._isLeasing) {
93 this._isLeasing = true;
94 this._scheduleExtension();
95 }
96 if (!wasFull && this.isFull()) {
97 this.emit('full');
98 }
99 }
100 /**
101 * Removes ALL messages from inventory.
102 * @private
103 */
104 clear() {
105 const wasFull = this.isFull();
106 this._pending = [];
107 this._messages.clear();
108 this.bytes = 0;
109 if (wasFull) {
110 process.nextTick(() => this.emit('free'));
111 }
112 this._cancelExtension();
113 }
114 /**
115 * Indicates if we're at or over capacity.
116 *
117 * @returns {boolean}
118 * @private
119 */
120 isFull() {
121 const { maxBytes, maxMessages } = this._options;
122 return this.size >= maxMessages || this.bytes >= maxBytes;
123 }
124 /**
125 * Removes a message from the inventory. Stopping the deadline extender if no
126 * messages are left over.
127 *
128 * @fires LeaseManager#free
129 *
130 * @param {Message} message The message to remove.
131 * @private
132 */
133 remove(message) {
134 if (!this._messages.has(message)) {
135 return;
136 }
137 const wasFull = this.isFull();
138 this._messages.delete(message);
139 this.bytes -= message.length;
140 if (wasFull && !this.isFull()) {
141 process.nextTick(() => this.emit('free'));
142 }
143 else if (this._pending.includes(message)) {
144 const index = this._pending.indexOf(message);
145 this._pending.splice(index, 1);
146 }
147 else if (this.pending > 0) {
148 this._dispense(this._pending.shift());
149 }
150 if (this.size === 0 && this._isLeasing) {
151 this._cancelExtension();
152 }
153 }
154 /**
155 * Sets options for the LeaseManager.
156 *
157 * @param {FlowControlOptions} [options] The options.
158 * @private
159 */
160 setOptions(options) {
161 const defaults = {
162 allowExcessMessages: true,
163 maxBytes: default_options_1.defaultOptions.subscription.maxOutstandingBytes,
164 maxExtension: default_options_1.defaultOptions.subscription.maxExtensionMinutes,
165 maxMessages: default_options_1.defaultOptions.subscription.maxOutstandingMessages,
166 };
167 this._options = Object.assign(defaults, options);
168 }
169 /**
170 * Stops extending message deadlines.
171 *
172 * @private
173 */
174 _cancelExtension() {
175 this._isLeasing = false;
176 if (this._timer) {
177 clearTimeout(this._timer);
178 delete this._timer;
179 }
180 }
181 /**
182 * Emits the message. Emitting messages is very slow, so to avoid it acting
183 * as a bottleneck, we're wrapping it in nextTick.
184 *
185 * @private
186 *
187 * @fires Subscriber#message
188 *
189 * @param {Message} message The message to emit.
190 */
191 _dispense(message) {
192 if (this._subscriber.isOpen) {
193 process.nextTick(() => this._subscriber.emit('message', message));
194 }
195 }
196 /**
197 * Loops through inventory and extends the deadlines for any messages that
198 * have not hit the max extension option.
199 *
200 * @private
201 */
202 _extendDeadlines() {
203 const deadline = this._subscriber.ackDeadline;
204 for (const message of this._messages) {
205 const lifespan = (Date.now() - message.received) / 1000;
206 if (lifespan < this._options.maxExtension) {
207 message.modAck(deadline);
208 }
209 else {
210 this.remove(message);
211 }
212 }
213 if (this._isLeasing) {
214 this._scheduleExtension();
215 }
216 }
217 /**
218 * Creates a timeout(ms) that should allow us to extend any message deadlines
219 * before they would be redelivered.
220 *
221 * @private
222 *
223 * @returns {number}
224 */
225 _getNextExtensionTimeoutMs() {
226 const jitter = Math.random();
227 const deadline = this._subscriber.ackDeadline * 1000;
228 const latency = this._subscriber.modAckLatency;
229 return (deadline * 0.9 - latency) * jitter;
230 }
231 /**
232 * Schedules an deadline extension for all messages.
233 *
234 * @private
235 */
236 _scheduleExtension() {
237 const timeout = this._getNextExtensionTimeoutMs();
238 this._timer = setTimeout(() => this._extendDeadlines(), timeout);
239 }
240}
241exports.LeaseManager = LeaseManager;
242//# sourceMappingURL=lease-manager.js.map
\No newline at end of file