1 | ;
|
2 | /*!
|
3 | * Copyright 2021 Google LLC
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.FlowControlledPublisher = void 0;
|
19 | const pubsub_message_1 = require("./pubsub-message");
|
20 | /**
|
21 | * Encapsulates a series of message publishes from a rapid loop (or similar
|
22 | * circumstance).
|
23 | *
|
24 | * This class is not meant to be instantiated outside of the `@google-cloud/pubsub`
|
25 | * package. It is returned from {@link Topic#flowControlled}. Messages sent
|
26 | * through an instance of this class will obey publisher flow control
|
27 | * settings set through {@link PublisherOptions} on {@link Topic}, across
|
28 | * all instances returned by {@link Topic#flowControlled} on that {@link Topic}.
|
29 | */
|
30 | class FlowControlledPublisher {
|
31 | constructor(publisher) {
|
32 | this.publisher = publisher;
|
33 | this.flowControl = this.publisher.flowControl;
|
34 | this.idPromises = [];
|
35 | }
|
36 | /**
|
37 | * Returns true if sending the specified Buffer would result in exceeding the
|
38 | * limits of the flow control settings.
|
39 | *
|
40 | * @param {PubsubMessage} message The data buffer with the message's contents.
|
41 | * @returns {boolean} True if the message would exceed flow control limits.
|
42 | */
|
43 | wouldExceed(message) {
|
44 | return this.flowControl.wouldExceed(pubsub_message_1.calculateMessageSize(message), 1);
|
45 | }
|
46 | /**
|
47 | * Publishes a message, subject to flow control restrictions.
|
48 | *
|
49 | * If the message can be sent immediately, this will return `null`. Otherwise,
|
50 | * it will return a Promise<void> that resolves after it's okay to resume
|
51 | * calling the method.
|
52 | *
|
53 | * @param {Buffer} [data] The message contents to be sent.
|
54 | * @param {Attributes} [attributes] Optional attributes.
|
55 | * @returns null, or a Promise that resolves when sending may resume.
|
56 | *
|
57 | * @example
|
58 | * ```
|
59 | * const wait = flowControlled.publish({data});
|
60 | * if (wait) {
|
61 | * await wait;
|
62 | * }
|
63 | *
|
64 | * ```
|
65 | * @example
|
66 | * ```
|
67 | * // It's okay to await unconditionally, it's equivalent to nextTick().
|
68 | * await flowControlled.publish(data);
|
69 | * ```
|
70 | */
|
71 | publish(message) {
|
72 | const doPublish = () => {
|
73 | this.doPublish(message);
|
74 | };
|
75 | const size = pubsub_message_1.calculateMessageSize(message);
|
76 | if (this.flowControl.wouldExceed(size, 1)) {
|
77 | const waitPromise = this.flowControl.willSend(size, 1);
|
78 | return waitPromise.then(doPublish);
|
79 | }
|
80 | else {
|
81 | this.flowControl.willSend(size, 1).then(() => { });
|
82 | doPublish();
|
83 | return null;
|
84 | }
|
85 | }
|
86 | /**
|
87 | * Publishes a message unconditionally, updating flow control counters.
|
88 | *
|
89 | * You'll generally only want to use this if you want to deal with timing the
|
90 | * flow control yourself, but you'd like the library to do the bean counting.
|
91 | *
|
92 | * @param {Buffer} [data] The message contents to be sent.
|
93 | * @param {Attributes} [attributes] Optional attributes.
|
94 | *
|
95 | * @example
|
96 | * ```
|
97 | * if (!flowControlled.wouldExceed(data)) {
|
98 | * flowControlled.publishNow(data);
|
99 | * }
|
100 | * ```
|
101 | */
|
102 | publishNow(message) {
|
103 | this.flowControl.addToCount(pubsub_message_1.calculateMessageSize(message), 1);
|
104 | this.doPublish(message);
|
105 | }
|
106 | doPublish(message) {
|
107 | let idPromise = this.publisher.publishMessage(message);
|
108 | // This will defer but not eat any errors.
|
109 | const publishDone = (id) => {
|
110 | this.flowControl.sent(pubsub_message_1.calculateMessageSize(message), 1);
|
111 | return id;
|
112 | };
|
113 | idPromise.catch(publishDone);
|
114 | idPromise = idPromise.then(publishDone);
|
115 | this.idPromises.push(idPromise);
|
116 | }
|
117 | /**
|
118 | * Returns a Promise that will resolve to all of the currently sent
|
119 | * message IDs (or reject if there is an error). This also clears
|
120 | * out any currently sent messages, so the next call to `all()` will
|
121 | * be a clean slate.
|
122 | *
|
123 | * @returns {Promise<string[]>} A Promise that resolves when all current
|
124 | * messages are sent.
|
125 | */
|
126 | all() {
|
127 | const allPromise = Promise.all(this.idPromises);
|
128 | this.idPromises = [];
|
129 | return allPromise;
|
130 | }
|
131 | }
|
132 | exports.FlowControlledPublisher = FlowControlledPublisher;
|
133 | //# sourceMappingURL=flow-publisher.js.map |
\ | No newline at end of file |