1 | ;
|
2 | /*!
|
3 | * Copyright 2021 Google LLC
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.FlowControl = void 0;
|
19 | const defer = require("p-defer");
|
20 | /**
|
21 | * Manages flow control handling for max bytes and messages.
|
22 | *
|
23 | * Do not use this class externally, it may change without warning.
|
24 | * @private
|
25 | *
|
26 | */
|
27 | class FlowControl {
|
28 | constructor(options) {
|
29 | this.options = {};
|
30 | this.setOptions(options);
|
31 | this.bytes = this.messages = 0;
|
32 | this.requests = [];
|
33 | }
|
34 | /**
|
35 | * Update our options after the fact.
|
36 | *
|
37 | * Do not use externally, it may change without warning.
|
38 | * @private
|
39 | */
|
40 | setOptions(options) {
|
41 | this.options = options;
|
42 | if (this.options.maxOutstandingBytes === 0 ||
|
43 | this.options.maxOutstandingMessages === 0) {
|
44 | // Undefined is okay, but if either is zero, no publishes ever happen.
|
45 | throw new Error('When using publisher flow control, maxOutstandingBytes and maxOutstandingMessages must not be zero');
|
46 | }
|
47 | }
|
48 | /**
|
49 | * @returns {number} The number of bytes that are queued up.
|
50 | */
|
51 | get currentByteCount() {
|
52 | return this.bytes;
|
53 | }
|
54 | /**
|
55 | * @returns {number} The number of messages that are queued up.
|
56 | */
|
57 | get currentMessageCount() {
|
58 | return this.messages;
|
59 | }
|
60 | /**
|
61 | * Adds the specified number of bytes or messages to our count. We'll
|
62 | * assume that this is end running around our queueing mechanisms.
|
63 | *
|
64 | * @param {number} bytes The number of bytes to add to the count.
|
65 | * @param {number} messages The number of messages to add to the count.
|
66 | */
|
67 | addToCount(bytes, messages) {
|
68 | this.bytes += bytes;
|
69 | this.messages += messages;
|
70 | }
|
71 | /**
|
72 | * Attempts to queue the specified number of bytes and messages. If
|
73 | * there are too many things in the publisher flow control queue
|
74 | * already, we will defer and come back to it.
|
75 | *
|
76 | * Do not use externally, it may change without warning.
|
77 | * @private
|
78 | */
|
79 | async willSend(bytes, messages) {
|
80 | // Add this to our queue size.
|
81 | this.bytes += bytes;
|
82 | this.messages += messages;
|
83 | // If this request won't fit, we have to put it in the queue.
|
84 | if (this.exceeded()) {
|
85 | const promise = defer();
|
86 | this.requests.push({
|
87 | promise: promise.promise,
|
88 | resolve: promise.resolve,
|
89 | reject: promise.reject,
|
90 | bytes,
|
91 | messageCount: messages,
|
92 | });
|
93 | // This will pass through when someone else's this.sent() completes.
|
94 | await promise.promise;
|
95 | }
|
96 | }
|
97 | /**
|
98 | * Removes the specified number of bytes and messages from our queued
|
99 | * counts, after a deferred request was released. If there is enough
|
100 | * space.
|
101 | *
|
102 | * Do not use externally, it may change without warning.
|
103 | * @private
|
104 | */
|
105 | sent(bytes, messages) {
|
106 | this.bytes -= bytes;
|
107 | this.messages -= messages;
|
108 | // This shouldn't happen, but just be sure.
|
109 | if (this.bytes < 0)
|
110 | this.bytes = 0;
|
111 | if (this.messages < 0)
|
112 | this.messages = 0;
|
113 | // Let things waiting on willSend() have a go, if there's space.
|
114 | if (this.requests.length > 0 && !this.exceeded()) {
|
115 | const next = this.requests.shift();
|
116 | next.resolve();
|
117 | }
|
118 | }
|
119 | // Just uses wouldExceed() to see if we've already exceeded the limits.
|
120 | exceeded() {
|
121 | return this.wouldExceed(0, 0);
|
122 | }
|
123 | /**
|
124 | * Returns true if adding the specified number of bytes or messages
|
125 | * would exceed limits imposed by configuration.
|
126 | *
|
127 | * Do not use externally, it may change without warning.
|
128 | * @private
|
129 | */
|
130 | wouldExceed(bytes, messages) {
|
131 | const totalBytes = this.bytes + bytes;
|
132 | const totalMessages = this.messages + messages;
|
133 | if (this.options.maxOutstandingBytes !== undefined &&
|
134 | totalBytes > this.options.maxOutstandingBytes) {
|
135 | return true;
|
136 | }
|
137 | if (this.options.maxOutstandingMessages !== undefined &&
|
138 | totalMessages > this.options.maxOutstandingMessages) {
|
139 | return true;
|
140 | }
|
141 | return false;
|
142 | }
|
143 | }
|
144 | exports.FlowControl = FlowControl;
|
145 | //# sourceMappingURL=flow-control.js.map |
\ | No newline at end of file |