UNPKG

9.32 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2019 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.OrderedQueue = exports.Queue = exports.MessageQueue = void 0;
19const events_1 = require("events");
20const message_batch_1 = require("./message-batch");
21const publish_error_1 = require("./publish-error");
22/**
23 * Queues are used to manage publishing batches of messages.
24 *
25 * @private
26 *
27 * @param {Publisher} publisher The parent publisher.
28 */
29class MessageQueue extends events_1.EventEmitter {
30 constructor(publisher) {
31 super();
32 this.publisher = publisher;
33 this.batchOptions = publisher.settings.batching;
34 }
35 /**
36 * Forces the queue to update its options from the publisher.
37 * The specific queue will need to do a bit more to pass the new
38 * values down into any MessageBatch.
39 *
40 * This is only for use by {@link Publisher}.
41 *
42 * @private
43 */
44 updateOptions() {
45 this.batchOptions = this.publisher.settings.batching;
46 }
47 /**
48 * Accepts a batch of messages and publishes them to the API.
49 *
50 * @param {object[]} messages The messages to publish.
51 * @param {PublishCallback[]} callbacks The corresponding callback functions.
52 * @param {function} [callback] Callback to be fired when publish is done.
53 */
54 _publish(messages, callbacks, callback) {
55 const { topic, settings } = this.publisher;
56 const reqOpts = {
57 topic: topic.name,
58 messages,
59 };
60 if (messages.length === 0) {
61 if (typeof callback === 'function') {
62 callback(null);
63 }
64 return;
65 }
66 topic.request({
67 client: 'PublisherClient',
68 method: 'publish',
69 reqOpts,
70 gaxOpts: settings.gaxOpts,
71 }, (err, resp) => {
72 const messageIds = (resp && resp.messageIds) || [];
73 callbacks.forEach((callback, i) => callback(err, messageIds[i]));
74 if (typeof callback === 'function') {
75 callback(err);
76 }
77 });
78 }
79}
80exports.MessageQueue = MessageQueue;
81/**
82 * Standard message queue used for publishing messages.
83 *
84 * @private
85 * @extends MessageQueue
86 *
87 * @param {Publisher} publisher The publisher.
88 */
89class Queue extends MessageQueue {
90 constructor(publisher) {
91 super(publisher);
92 this.batch = new message_batch_1.MessageBatch(this.batchOptions);
93 }
94 // This needs to update our existing message batch.
95 updateOptions() {
96 super.updateOptions();
97 this.batch.setOptions(this.batchOptions);
98 }
99 /**
100 * Adds a message to the queue.
101 *
102 * @param {PubsubMessage} message The message to publish.
103 * @param {PublishCallback} callback The publish callback.
104 */
105 add(message, callback) {
106 if (!this.batch.canFit(message)) {
107 this.publish();
108 }
109 this.batch.add(message, callback);
110 if (this.batch.isFull()) {
111 this.publish();
112 }
113 else if (!this.pending) {
114 const { maxMilliseconds } = this.batchOptions;
115 this.pending = setTimeout(() => this.publish(), maxMilliseconds);
116 }
117 }
118 /**
119 * Cancels any pending publishes and calls _publish immediately.
120 */
121 publish(callback) {
122 const { messages, callbacks } = this.batch;
123 this.batch = new message_batch_1.MessageBatch(this.batchOptions);
124 if (this.pending) {
125 clearTimeout(this.pending);
126 delete this.pending;
127 }
128 this._publish(messages, callbacks, callback);
129 }
130}
131exports.Queue = Queue;
132/**
133 * Queue for handling ordered messages. Unlike the standard queue, this
134 * ensures that batches are published one at a time and throws an exception in
135 * the event that any batch fails to publish.
136 *
137 * @private
138 * @extends MessageQueue
139 *
140 * @param {Publisher} publisher The publisher.
141 * @param {string} key The key used to order the messages.
142 */
143class OrderedQueue extends MessageQueue {
144 constructor(publisher, key) {
145 super(publisher);
146 this.batches = [];
147 this.inFlight = false;
148 this.key = key;
149 }
150 // This needs to update our existing message batches.
151 updateOptions() {
152 super.updateOptions();
153 this.batches.forEach(b => b.setOptions(this.batchOptions));
154 }
155 /**
156 * Reference to the batch we're currently filling.
157 * @returns {MessageBatch}
158 */
159 get currentBatch() {
160 if (!this.batches.length) {
161 this.batches.push(this.createBatch());
162 }
163 return this.batches[0];
164 }
165 /**
166 * Adds a message to a batch, creating a new batch if need be.
167 *
168 * @param {object} message The message to publish.
169 * @param {PublishCallback} callback The publish callback.
170 */
171 add(message, callback) {
172 if (this.error) {
173 callback(this.error);
174 return;
175 }
176 if (this.inFlight) {
177 // in the event that a batch is currently in flight, we can overfill
178 // the next batch as long as it hasn't hit the API limit
179 if (this.currentBatch.isAtMax()) {
180 this.batches.unshift(this.createBatch());
181 }
182 this.currentBatch.add(message, callback);
183 return;
184 }
185 if (!this.currentBatch.canFit(message)) {
186 this.publish();
187 }
188 this.currentBatch.add(message, callback);
189 // it is possible that we triggered a publish earlier, so we'll need to
190 // check again here
191 if (!this.inFlight) {
192 if (this.currentBatch.isFull()) {
193 this.publish();
194 }
195 else if (!this.pending) {
196 this.beginNextPublish();
197 }
198 }
199 }
200 /**
201 * Starts a timeout to publish any pending messages.
202 */
203 beginNextPublish() {
204 const maxMilliseconds = this.batchOptions.maxMilliseconds;
205 const timeWaiting = Date.now() - this.currentBatch.created;
206 const delay = Math.max(0, maxMilliseconds - timeWaiting);
207 this.pending = setTimeout(() => this.publish(), delay);
208 }
209 /**
210 * Creates a new {@link MessageBatch} instance.
211 *
212 * @returns {MessageBatch}
213 */
214 createBatch() {
215 return new message_batch_1.MessageBatch(this.batchOptions);
216 }
217 /**
218 * In the event of a publish failure, we need to cache the error in question
219 * and reject all pending publish calls, prompting the user to call
220 * {@link OrderedQueue#resumePublishing}.
221 *
222 * @param {Error} err The publishing error.
223 */
224 handlePublishFailure(err) {
225 this.error = new publish_error_1.PublishError(this.key, err);
226 // reject all pending publishes
227 while (this.batches.length) {
228 const { callbacks } = this.batches.pop();
229 callbacks.forEach(callback => callback(err));
230 }
231 }
232 /**
233 * Publishes the messages. If successful it will prepare the next batch to be
234 * published immediately after. If an error occurs, it will reject all
235 * pending messages. In the event that no pending messages/batches are left,
236 * a "drain" event will be fired, indicating to the publisher that it is
237 * safe to delete this queue.
238 *
239 * @fires OrderedQueue#drain
240 */
241 publish(callback) {
242 const definedCallback = callback || (() => { });
243 this.inFlight = true;
244 if (this.pending) {
245 clearTimeout(this.pending);
246 delete this.pending;
247 }
248 const { messages, callbacks } = this.batches.pop();
249 this._publish(messages, callbacks, (err) => {
250 this.inFlight = false;
251 if (err) {
252 this.handlePublishFailure(err);
253 definedCallback(err);
254 }
255 else if (this.batches.length) {
256 this.beginNextPublish();
257 }
258 else {
259 this.emit('drain');
260 definedCallback(null);
261 }
262 });
263 }
264 /**
265 * Tells the queue it is ok to continue publishing messages.
266 */
267 resumePublishing() {
268 delete this.error;
269 // once this is called, we'll make this object eligible for garbage
270 // collection. by wrapping in nextTick() we'll give users an opportunity
271 // to use it again instead of deleting instantly and then creating a new
272 // instance.
273 process.nextTick(() => {
274 if (!this.batches.length) {
275 this.emit('drain');
276 }
277 });
278 }
279}
280exports.OrderedQueue = OrderedQueue;
281//# sourceMappingURL=message-queues.js.map
\No newline at end of file