1 | ;
|
2 | /*!
|
3 | * Copyright 2019 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.OrderedQueue = exports.Queue = exports.MessageQueue = void 0;
|
19 | const events_1 = require("events");
|
20 | const message_batch_1 = require("./message-batch");
|
21 | const publish_error_1 = require("./publish-error");
|
22 | /**
|
23 | * Queues are used to manage publishing batches of messages.
|
24 | *
|
25 | * @private
|
26 | *
|
27 | * @param {Publisher} publisher The parent publisher.
|
28 | */
|
29 | class MessageQueue extends events_1.EventEmitter {
|
30 | constructor(publisher) {
|
31 | super();
|
32 | this.publisher = publisher;
|
33 | this.batchOptions = publisher.settings.batching;
|
34 | }
|
35 | /**
|
36 | * Forces the queue to update its options from the publisher.
|
37 | * The specific queue will need to do a bit more to pass the new
|
38 | * values down into any MessageBatch.
|
39 | *
|
40 | * This is only for use by {@link Publisher}.
|
41 | *
|
42 | * @private
|
43 | */
|
44 | updateOptions() {
|
45 | this.batchOptions = this.publisher.settings.batching;
|
46 | }
|
47 | /**
|
48 | * Accepts a batch of messages and publishes them to the API.
|
49 | *
|
50 | * @param {object[]} messages The messages to publish.
|
51 | * @param {PublishCallback[]} callbacks The corresponding callback functions.
|
52 | * @param {function} [callback] Callback to be fired when publish is done.
|
53 | */
|
54 | _publish(messages, callbacks, callback) {
|
55 | const { topic, settings } = this.publisher;
|
56 | const reqOpts = {
|
57 | topic: topic.name,
|
58 | messages,
|
59 | };
|
60 | if (messages.length === 0) {
|
61 | if (typeof callback === 'function') {
|
62 | callback(null);
|
63 | }
|
64 | return;
|
65 | }
|
66 | topic.request({
|
67 | client: 'PublisherClient',
|
68 | method: 'publish',
|
69 | reqOpts,
|
70 | gaxOpts: settings.gaxOpts,
|
71 | }, (err, resp) => {
|
72 | const messageIds = (resp && resp.messageIds) || [];
|
73 | callbacks.forEach((callback, i) => callback(err, messageIds[i]));
|
74 | if (typeof callback === 'function') {
|
75 | callback(err);
|
76 | }
|
77 | });
|
78 | }
|
79 | }
|
80 | exports.MessageQueue = MessageQueue;
|
81 | /**
|
82 | * Standard message queue used for publishing messages.
|
83 | *
|
84 | * @private
|
85 | * @extends MessageQueue
|
86 | *
|
87 | * @param {Publisher} publisher The publisher.
|
88 | */
|
89 | class Queue extends MessageQueue {
|
90 | constructor(publisher) {
|
91 | super(publisher);
|
92 | this.batch = new message_batch_1.MessageBatch(this.batchOptions);
|
93 | }
|
94 | // This needs to update our existing message batch.
|
95 | updateOptions() {
|
96 | super.updateOptions();
|
97 | this.batch.setOptions(this.batchOptions);
|
98 | }
|
99 | /**
|
100 | * Adds a message to the queue.
|
101 | *
|
102 | * @param {PubsubMessage} message The message to publish.
|
103 | * @param {PublishCallback} callback The publish callback.
|
104 | */
|
105 | add(message, callback) {
|
106 | if (!this.batch.canFit(message)) {
|
107 | this.publish();
|
108 | }
|
109 | this.batch.add(message, callback);
|
110 | if (this.batch.isFull()) {
|
111 | this.publish();
|
112 | }
|
113 | else if (!this.pending) {
|
114 | const { maxMilliseconds } = this.batchOptions;
|
115 | this.pending = setTimeout(() => this.publish(), maxMilliseconds);
|
116 | }
|
117 | }
|
118 | /**
|
119 | * Cancels any pending publishes and calls _publish immediately.
|
120 | */
|
121 | publish(callback) {
|
122 | const { messages, callbacks } = this.batch;
|
123 | this.batch = new message_batch_1.MessageBatch(this.batchOptions);
|
124 | if (this.pending) {
|
125 | clearTimeout(this.pending);
|
126 | delete this.pending;
|
127 | }
|
128 | this._publish(messages, callbacks, callback);
|
129 | }
|
130 | }
|
131 | exports.Queue = Queue;
|
132 | /**
|
133 | * Queue for handling ordered messages. Unlike the standard queue, this
|
134 | * ensures that batches are published one at a time and throws an exception in
|
135 | * the event that any batch fails to publish.
|
136 | *
|
137 | * @private
|
138 | * @extends MessageQueue
|
139 | *
|
140 | * @param {Publisher} publisher The publisher.
|
141 | * @param {string} key The key used to order the messages.
|
142 | */
|
143 | class OrderedQueue extends MessageQueue {
|
144 | constructor(publisher, key) {
|
145 | super(publisher);
|
146 | this.batches = [];
|
147 | this.inFlight = false;
|
148 | this.key = key;
|
149 | }
|
150 | // This needs to update our existing message batches.
|
151 | updateOptions() {
|
152 | super.updateOptions();
|
153 | this.batches.forEach(b => b.setOptions(this.batchOptions));
|
154 | }
|
155 | /**
|
156 | * Reference to the batch we're currently filling.
|
157 | * @returns {MessageBatch}
|
158 | */
|
159 | get currentBatch() {
|
160 | if (!this.batches.length) {
|
161 | this.batches.push(this.createBatch());
|
162 | }
|
163 | return this.batches[0];
|
164 | }
|
165 | /**
|
166 | * Adds a message to a batch, creating a new batch if need be.
|
167 | *
|
168 | * @param {object} message The message to publish.
|
169 | * @param {PublishCallback} callback The publish callback.
|
170 | */
|
171 | add(message, callback) {
|
172 | if (this.error) {
|
173 | callback(this.error);
|
174 | return;
|
175 | }
|
176 | if (this.inFlight) {
|
177 | // in the event that a batch is currently in flight, we can overfill
|
178 | // the next batch as long as it hasn't hit the API limit
|
179 | if (this.currentBatch.isAtMax()) {
|
180 | this.batches.unshift(this.createBatch());
|
181 | }
|
182 | this.currentBatch.add(message, callback);
|
183 | return;
|
184 | }
|
185 | if (!this.currentBatch.canFit(message)) {
|
186 | this.publish();
|
187 | }
|
188 | this.currentBatch.add(message, callback);
|
189 | // it is possible that we triggered a publish earlier, so we'll need to
|
190 | // check again here
|
191 | if (!this.inFlight) {
|
192 | if (this.currentBatch.isFull()) {
|
193 | this.publish();
|
194 | }
|
195 | else if (!this.pending) {
|
196 | this.beginNextPublish();
|
197 | }
|
198 | }
|
199 | }
|
200 | /**
|
201 | * Starts a timeout to publish any pending messages.
|
202 | */
|
203 | beginNextPublish() {
|
204 | const maxMilliseconds = this.batchOptions.maxMilliseconds;
|
205 | const timeWaiting = Date.now() - this.currentBatch.created;
|
206 | const delay = Math.max(0, maxMilliseconds - timeWaiting);
|
207 | this.pending = setTimeout(() => this.publish(), delay);
|
208 | }
|
209 | /**
|
210 | * Creates a new {@link MessageBatch} instance.
|
211 | *
|
212 | * @returns {MessageBatch}
|
213 | */
|
214 | createBatch() {
|
215 | return new message_batch_1.MessageBatch(this.batchOptions);
|
216 | }
|
217 | /**
|
218 | * In the event of a publish failure, we need to cache the error in question
|
219 | * and reject all pending publish calls, prompting the user to call
|
220 | * {@link OrderedQueue#resumePublishing}.
|
221 | *
|
222 | * @param {Error} err The publishing error.
|
223 | */
|
224 | handlePublishFailure(err) {
|
225 | this.error = new publish_error_1.PublishError(this.key, err);
|
226 | // reject all pending publishes
|
227 | while (this.batches.length) {
|
228 | const { callbacks } = this.batches.pop();
|
229 | callbacks.forEach(callback => callback(err));
|
230 | }
|
231 | }
|
232 | /**
|
233 | * Publishes the messages. If successful it will prepare the next batch to be
|
234 | * published immediately after. If an error occurs, it will reject all
|
235 | * pending messages. In the event that no pending messages/batches are left,
|
236 | * a "drain" event will be fired, indicating to the publisher that it is
|
237 | * safe to delete this queue.
|
238 | *
|
239 | * @fires OrderedQueue#drain
|
240 | */
|
241 | publish(callback) {
|
242 | const definedCallback = callback || (() => { });
|
243 | this.inFlight = true;
|
244 | if (this.pending) {
|
245 | clearTimeout(this.pending);
|
246 | delete this.pending;
|
247 | }
|
248 | const { messages, callbacks } = this.batches.pop();
|
249 | this._publish(messages, callbacks, (err) => {
|
250 | this.inFlight = false;
|
251 | if (err) {
|
252 | this.handlePublishFailure(err);
|
253 | definedCallback(err);
|
254 | }
|
255 | else if (this.batches.length) {
|
256 | this.beginNextPublish();
|
257 | }
|
258 | else {
|
259 | this.emit('drain');
|
260 | definedCallback(null);
|
261 | }
|
262 | });
|
263 | }
|
264 | /**
|
265 | * Tells the queue it is ok to continue publishing messages.
|
266 | */
|
267 | resumePublishing() {
|
268 | delete this.error;
|
269 | // once this is called, we'll make this object eligible for garbage
|
270 | // collection. by wrapping in nextTick() we'll give users an opportunity
|
271 | // to use it again instead of deleting instantly and then creating a new
|
272 | // instance.
|
273 | process.nextTick(() => {
|
274 | if (!this.batches.length) {
|
275 | this.emit('drain');
|
276 | }
|
277 | });
|
278 | }
|
279 | }
|
280 | exports.OrderedQueue = OrderedQueue;
|
281 | //# sourceMappingURL=message-queues.js.map |
\ | No newline at end of file |