1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.LeaseManager = void 0;
|
19 | const events_1 = require("events");
|
20 | const default_options_1 = require("./default-options");
|
21 | /**
|
22 | * @typedef {object} FlowControlOptions
|
23 | * @property {boolean} [allowExcessMessages=true] PubSub delivers messages in
|
24 | * batches with no way to configure the batch size. Sometimes this can be
|
25 | * overwhelming if you only want to process a few messages at a time.
|
26 | * Setting this option to false will make the client manage any excess
|
27 | * messages until you're ready for them. This will prevent them from being
|
28 | * redelivered and make the maxMessages option behave more predictably.
|
29 | * @property {number} [maxBytes=104857600] The desired amount of memory to
|
30 | * allow message data to consume. (Default: 100MB) It's possible that this
|
31 | * value will be exceeded, since messages are received in batches.
|
32 | * @property {number} [maxExtensionMinutes=60] The maximum duration (in minutes)
|
33 | * to extend the message deadline before redelivering.
|
34 | * @property {number} [maxMessages=1000] The desired number of messages to allow
|
35 | * in memory before pausing the message stream. Unless allowExcessMessages
|
36 | * is set to false, it is very likely that this value will be exceeded since
|
37 | * any given message batch could contain a greater number of messages than
|
38 | * the desired amount of messages.
|
39 | */
|
40 | /**
|
41 | * Manages a Subscribers inventory while auto-magically extending the message
|
42 | * deadlines.
|
43 | *
|
44 | * @private
|
45 | * @class
|
46 | *
|
47 | * @param {Subscriber} sub The subscriber to manage leases for.
|
48 | * @param {FlowControlOptions} options Flow control options.
|
49 | */
|
50 | class LeaseManager extends events_1.EventEmitter {
|
51 | constructor(sub, options = {}) {
|
52 | super();
|
53 | this.bytes = 0;
|
54 | this._isLeasing = false;
|
55 | this._messages = new Set();
|
56 | this._pending = [];
|
57 | this._subscriber = sub;
|
58 | this.setOptions(options);
|
59 | }
|
60 | /**
|
61 | * @type {number}
|
62 | * @private
|
63 | */
|
64 | get pending() {
|
65 | return this._pending.length;
|
66 | }
|
67 | /**
|
68 | * @type {number}
|
69 | * @private
|
70 | */
|
71 | get size() {
|
72 | return this._messages.size;
|
73 | }
|
74 | /**
|
75 | * Adds a message to the inventory, kicking off the deadline extender if it
|
76 | * isn't already running.
|
77 | *
|
78 | * @param {Message} message The message.
|
79 | * @private
|
80 | */
|
81 | add(message) {
|
82 | const { allowExcessMessages } = this._options;
|
83 | const wasFull = this.isFull();
|
84 | this._messages.add(message);
|
85 | this.bytes += message.length;
|
86 | if (allowExcessMessages || !wasFull) {
|
87 | this._dispense(message);
|
88 | }
|
89 | else {
|
90 | this._pending.push(message);
|
91 | }
|
92 | if (!this._isLeasing) {
|
93 | this._isLeasing = true;
|
94 | this._scheduleExtension();
|
95 | }
|
96 | if (!wasFull && this.isFull()) {
|
97 | this.emit('full');
|
98 | }
|
99 | }
|
100 | /**
|
101 | * Removes ALL messages from inventory.
|
102 | * @private
|
103 | */
|
104 | clear() {
|
105 | const wasFull = this.isFull();
|
106 | this._pending = [];
|
107 | this._messages.clear();
|
108 | this.bytes = 0;
|
109 | if (wasFull) {
|
110 | process.nextTick(() => this.emit('free'));
|
111 | }
|
112 | this._cancelExtension();
|
113 | }
|
114 | /**
|
115 | * Indicates if we're at or over capacity.
|
116 | *
|
117 | * @returns {boolean}
|
118 | * @private
|
119 | */
|
120 | isFull() {
|
121 | const { maxBytes, maxMessages } = this._options;
|
122 | return this.size >= maxMessages || this.bytes >= maxBytes;
|
123 | }
|
124 | /**
|
125 | * Removes a message from the inventory. Stopping the deadline extender if no
|
126 | * messages are left over.
|
127 | *
|
128 | * @fires LeaseManager#free
|
129 | *
|
130 | * @param {Message} message The message to remove.
|
131 | * @private
|
132 | */
|
133 | remove(message) {
|
134 | if (!this._messages.has(message)) {
|
135 | return;
|
136 | }
|
137 | const wasFull = this.isFull();
|
138 | this._messages.delete(message);
|
139 | this.bytes -= message.length;
|
140 | if (wasFull && !this.isFull()) {
|
141 | process.nextTick(() => this.emit('free'));
|
142 | }
|
143 | else if (this._pending.includes(message)) {
|
144 | const index = this._pending.indexOf(message);
|
145 | this._pending.splice(index, 1);
|
146 | }
|
147 | else if (this.pending > 0) {
|
148 | this._dispense(this._pending.shift());
|
149 | }
|
150 | if (this.size === 0 && this._isLeasing) {
|
151 | this._cancelExtension();
|
152 | }
|
153 | }
|
154 | /**
|
155 | * Sets options for the LeaseManager.
|
156 | *
|
157 | * @param {FlowControlOptions} [options] The options.
|
158 | *
|
159 | * @throws {RangeError} If both maxExtension and maxExtensionMinutes are set.
|
160 | *
|
161 | * @private
|
162 | */
|
163 | setOptions(options) {
|
164 | // Convert the old deprecated maxExtension to avoid breaking clients,
|
165 | // but allow only one.
|
166 | if (options.maxExtension !== undefined &&
|
167 | options.maxExtensionMinutes !== undefined) {
|
168 | throw new RangeError('Only one of "maxExtension" or "maxExtensionMinutes" may be set for subscriber lease management options');
|
169 | }
|
170 | if (options.maxExtension !== undefined &&
|
171 | options.maxExtensionMinutes === undefined) {
|
172 | options.maxExtensionMinutes = options.maxExtension / 60;
|
173 | delete options.maxExtension;
|
174 | }
|
175 | const defaults = {
|
176 | allowExcessMessages: true,
|
177 | maxBytes: default_options_1.defaultOptions.subscription.maxOutstandingBytes,
|
178 | maxExtensionMinutes: default_options_1.defaultOptions.subscription.maxExtensionMinutes,
|
179 | maxMessages: default_options_1.defaultOptions.subscription.maxOutstandingMessages,
|
180 | };
|
181 | this._options = Object.assign(defaults, options);
|
182 | }
|
183 | /**
|
184 | * Stops extending message deadlines.
|
185 | *
|
186 | * @private
|
187 | */
|
188 | _cancelExtension() {
|
189 | this._isLeasing = false;
|
190 | if (this._timer) {
|
191 | clearTimeout(this._timer);
|
192 | delete this._timer;
|
193 | }
|
194 | }
|
195 | /**
|
196 | * Emits the message. Emitting messages is very slow, so to avoid it acting
|
197 | * as a bottleneck, we're wrapping it in nextTick.
|
198 | *
|
199 | * @private
|
200 | *
|
201 | * @fires Subscriber#message
|
202 | *
|
203 | * @param {Message} message The message to emit.
|
204 | */
|
205 | _dispense(message) {
|
206 | if (this._subscriber.isOpen) {
|
207 | process.nextTick(() => this._subscriber.emit('message', message));
|
208 | }
|
209 | }
|
210 | /**
|
211 | * Loops through inventory and extends the deadlines for any messages that
|
212 | * have not hit the max extension option.
|
213 | *
|
214 | * @private
|
215 | */
|
216 | _extendDeadlines() {
|
217 | const deadline = this._subscriber.ackDeadline;
|
218 | for (const message of this._messages) {
|
219 | // Lifespan here is in minutes.
|
220 | const lifespan = (Date.now() - message.received) / (60 * 1000);
|
221 | if (lifespan < this._options.maxExtensionMinutes) {
|
222 | message.modAck(deadline);
|
223 | }
|
224 | else {
|
225 | this.remove(message);
|
226 | }
|
227 | }
|
228 | if (this._isLeasing) {
|
229 | this._scheduleExtension();
|
230 | }
|
231 | }
|
232 | /**
|
233 | * Creates a timeout(ms) that should allow us to extend any message deadlines
|
234 | * before they would be redelivered.
|
235 | *
|
236 | * @private
|
237 | *
|
238 | * @returns {number}
|
239 | */
|
240 | _getNextExtensionTimeoutMs() {
|
241 | const jitter = Math.random();
|
242 | const deadline = this._subscriber.ackDeadline * 1000;
|
243 | const latency = this._subscriber.modAckLatency;
|
244 | return (deadline * 0.9 - latency) * jitter;
|
245 | }
|
246 | /**
|
247 | * Schedules an deadline extension for all messages.
|
248 | *
|
249 | * @private
|
250 | */
|
251 | _scheduleExtension() {
|
252 | const timeout = this._getNextExtensionTimeoutMs();
|
253 | this._timer = setTimeout(() => this._extendDeadlines(), timeout);
|
254 | }
|
255 | }
|
256 | exports.LeaseManager = LeaseManager;
|
257 | //# sourceMappingURL=lease-manager.js.map |
\ | No newline at end of file |