UNPKG

4.95 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2021 Google LLC
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.FlowControl = void 0;
19const defer = require("p-defer");
20/**
21 * Manages flow control handling for max bytes and messages.
22 *
23 * Do not use this class externally, it may change without warning.
24 * @private
25 *
26 */
27class FlowControl {
28 constructor(options) {
29 this.options = {};
30 this.setOptions(options);
31 this.bytes = this.messages = 0;
32 this.requests = [];
33 }
34 /**
35 * Update our options after the fact.
36 *
37 * Do not use externally, it may change without warning.
38 * @private
39 */
40 setOptions(options) {
41 this.options = options;
42 if (this.options.maxOutstandingBytes === 0 ||
43 this.options.maxOutstandingMessages === 0) {
44 // Undefined is okay, but if either is zero, no publishes ever happen.
45 throw new Error('When using publisher flow control, maxOutstandingBytes and maxOutstandingMessages must not be zero');
46 }
47 }
48 /**
49 * @returns {number} The number of bytes that are queued up.
50 */
51 get currentByteCount() {
52 return this.bytes;
53 }
54 /**
55 * @returns {number} The number of messages that are queued up.
56 */
57 get currentMessageCount() {
58 return this.messages;
59 }
60 /**
61 * Adds the specified number of bytes or messages to our count. We'll
62 * assume that this is end running around our queueing mechanisms.
63 *
64 * @param {number} bytes The number of bytes to add to the count.
65 * @param {number} messages The number of messages to add to the count.
66 */
67 addToCount(bytes, messages) {
68 this.bytes += bytes;
69 this.messages += messages;
70 }
71 /**
72 * Attempts to queue the specified number of bytes and messages. If
73 * there are too many things in the publisher flow control queue
74 * already, we will defer and come back to it.
75 *
76 * Do not use externally, it may change without warning.
77 * @private
78 */
79 async willSend(bytes, messages) {
80 // Add this to our queue size.
81 this.bytes += bytes;
82 this.messages += messages;
83 // If this request won't fit, we have to put it in the queue.
84 if (this.exceeded()) {
85 const promise = defer();
86 this.requests.push({
87 promise: promise.promise,
88 resolve: promise.resolve,
89 reject: promise.reject,
90 bytes,
91 messageCount: messages,
92 });
93 // This will pass through when someone else's this.sent() completes.
94 await promise.promise;
95 }
96 }
97 /**
98 * Removes the specified number of bytes and messages from our queued
99 * counts, after a deferred request was released. If there is enough
100 * space.
101 *
102 * Do not use externally, it may change without warning.
103 * @private
104 */
105 sent(bytes, messages) {
106 this.bytes -= bytes;
107 this.messages -= messages;
108 // This shouldn't happen, but just be sure.
109 if (this.bytes < 0)
110 this.bytes = 0;
111 if (this.messages < 0)
112 this.messages = 0;
113 // Let things waiting on willSend() have a go, if there's space.
114 if (this.requests.length > 0 && !this.exceeded()) {
115 const next = this.requests.shift();
116 next.resolve();
117 }
118 }
119 // Just uses wouldExceed() to see if we've already exceeded the limits.
120 exceeded() {
121 return this.wouldExceed(0, 0);
122 }
123 /**
124 * Returns true if adding the specified number of bytes or messages
125 * would exceed limits imposed by configuration.
126 *
127 * Do not use externally, it may change without warning.
128 * @private
129 */
130 wouldExceed(bytes, messages) {
131 const totalBytes = this.bytes + bytes;
132 const totalMessages = this.messages + messages;
133 if (this.options.maxOutstandingBytes !== undefined &&
134 totalBytes > this.options.maxOutstandingBytes) {
135 return true;
136 }
137 if (this.options.maxOutstandingMessages !== undefined &&
138 totalMessages > this.options.maxOutstandingMessages) {
139 return true;
140 }
141 return false;
142 }
143}
144exports.FlowControl = FlowControl;
145//# sourceMappingURL=flow-control.js.map
\No newline at end of file