UNPKG

4.91 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2021 Google LLC
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.FlowControlledPublisher = void 0;
19const pubsub_message_1 = require("./pubsub-message");
20/**
21 * Encapsulates a series of message publishes from a rapid loop (or similar
22 * circumstance).
23 *
24 * This class is not meant to be instantiated outside of the `@google-cloud/pubsub`
25 * package. It is returned from {@link Topic#flowControlled}. Messages sent
26 * through an instance of this class will obey publisher flow control
27 * settings set through {@link PublisherOptions} on {@link Topic}, across
28 * all instances returned by {@link Topic#flowControlled} on that {@link Topic}.
29 */
30class FlowControlledPublisher {
31 constructor(publisher) {
32 this.publisher = publisher;
33 this.flowControl = this.publisher.flowControl;
34 this.idPromises = [];
35 }
36 /**
37 * Returns true if sending the specified Buffer would result in exceeding the
38 * limits of the flow control settings.
39 *
40 * @param {PubsubMessage} message The data buffer with the message's contents.
41 * @returns {boolean} True if the message would exceed flow control limits.
42 */
43 wouldExceed(message) {
44 return this.flowControl.wouldExceed(pubsub_message_1.calculateMessageSize(message), 1);
45 }
46 /**
47 * Publishes a message, subject to flow control restrictions.
48 *
49 * If the message can be sent immediately, this will return `null`. Otherwise,
50 * it will return a Promise<void> that resolves after it's okay to resume
51 * calling the method.
52 *
53 * @param {Buffer} [data] The message contents to be sent.
54 * @param {Attributes} [attributes] Optional attributes.
55 * @returns null, or a Promise that resolves when sending may resume.
56 *
57 * @example
58 * ```
59 * const wait = flowControlled.publish({data});
60 * if (wait) {
61 * await wait;
62 * }
63 *
64 * ```
65 * @example
66 * ```
67 * // It's okay to await unconditionally, it's equivalent to nextTick().
68 * await flowControlled.publish(data);
69 * ```
70 */
71 publish(message) {
72 const doPublish = () => {
73 this.doPublish(message);
74 };
75 const size = pubsub_message_1.calculateMessageSize(message);
76 if (this.flowControl.wouldExceed(size, 1)) {
77 const waitPromise = this.flowControl.willSend(size, 1);
78 return waitPromise.then(doPublish);
79 }
80 else {
81 this.flowControl.willSend(size, 1).then(() => { });
82 doPublish();
83 return null;
84 }
85 }
86 /**
87 * Publishes a message unconditionally, updating flow control counters.
88 *
89 * You'll generally only want to use this if you want to deal with timing the
90 * flow control yourself, but you'd like the library to do the bean counting.
91 *
92 * @param {Buffer} [data] The message contents to be sent.
93 * @param {Attributes} [attributes] Optional attributes.
94 *
95 * @example
96 * ```
97 * if (!flowControlled.wouldExceed(data)) {
98 * flowControlled.publishNow(data);
99 * }
100 * ```
101 */
102 publishNow(message) {
103 this.flowControl.addToCount(pubsub_message_1.calculateMessageSize(message), 1);
104 this.doPublish(message);
105 }
106 doPublish(message) {
107 let idPromise = this.publisher.publishMessage(message);
108 // This will defer but not eat any errors.
109 const publishDone = (id) => {
110 this.flowControl.sent(pubsub_message_1.calculateMessageSize(message), 1);
111 return id;
112 };
113 idPromise.catch(publishDone);
114 idPromise = idPromise.then(publishDone);
115 this.idPromises.push(idPromise);
116 }
117 /**
118 * Returns a Promise that will resolve to all of the currently sent
119 * message IDs (or reject if there is an error). This also clears
120 * out any currently sent messages, so the next call to `all()` will
121 * be a clean slate.
122 *
123 * @returns {Promise<string[]>} A Promise that resolves when all current
124 * messages are sent.
125 */
126 all() {
127 const allPromise = Promise.all(this.idPromises);
128 this.idPromises = [];
129 return allPromise;
130 }
131}
132exports.FlowControlledPublisher = FlowControlledPublisher;
133//# sourceMappingURL=flow-publisher.js.map
\No newline at end of file