UNPKG

20.6 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscriber = exports.Message = void 0;
19const precise_date_1 = require("@google-cloud/precise-date");
20const projectify_1 = require("@google-cloud/projectify");
21const promisify_1 = require("@google-cloud/promisify");
22const events_1 = require("events");
23const api_1 = require("@opentelemetry/api");
24const semantic_conventions_1 = require("@opentelemetry/semantic-conventions");
25const histogram_1 = require("./histogram");
26const lease_manager_1 = require("./lease-manager");
27const message_queues_1 = require("./message-queues");
28const message_stream_1 = require("./message-stream");
29const default_options_1 = require("./default-options");
30const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
31const util_1 = require("./util");
32const temporal_1 = require("./temporal");
33/**
34 * Date object with nanosecond precision. Supports all standard Date arguments
35 * in addition to several custom types.
36 *
37 * @external PreciseDate
38 * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
39 */
40/**
41 * Message objects provide a simple interface for users to get message data and
42 * acknowledge the message.
43 *
44 * @example
45 * ```
46 * subscription.on('message', message => {
47 * // {
48 * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
49 * // attributes: {key: 'value'},
50 * // data: Buffer.from('Hello, world!'),
51 * // id: '1551297743043',
52 * // orderingKey: 'ordering-key',
53 * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
54 * // received: 1551297743043,
55 * // length: 13
56 * // }
57 * });
58 * ```
59 */
60class Message {
61 /**
62 * @hideconstructor
63 *
64 * @param {Subscriber} sub The parent subscriber.
65 * @param {object} message The raw message response.
66 */
67 constructor(sub, { ackId, message, deliveryAttempt }) {
68 /**
69 * This ID is used to acknowledge the message.
70 *
71 * @name Message#ackId
72 * @type {string}
73 */
74 this.ackId = ackId;
75 /**
76 * Optional attributes for this message.
77 *
78 * @name Message#attributes
79 * @type {object}
80 */
81 this.attributes = message.attributes || {};
82 /**
83 * The message data as a Buffer.
84 *
85 * @name Message#data
86 * @type {Buffer}
87 */
88 this.data = message.data;
89 /**
90 * Delivery attempt counter is 1 + (the sum of number of NACKs and number of
91 * ack_deadline exceeds) for this message.
92 *
93 * @name Message#deliveryAttempt
94 * @type {number}
95 */
96 this.deliveryAttempt = Number(deliveryAttempt || 0);
97 /**
98 * ID of the message, assigned by the server when the message is published.
99 * Guaranteed to be unique within the topic.
100 *
101 * @name Message#id
102 * @type {string}
103 */
104 this.id = message.messageId;
105 /**
106 * Identifies related messages for which publish order should be respected.
107 * If a `Subscription` has `enableMessageOrdering` set to `true`, messages
108 * published with the same `orderingKey` value will be delivered to
109 * subscribers in the order in which they are received by the Pub/Sub
110 * system.
111 *
112 * **EXPERIMENTAL:** This feature is part of a closed alpha release. This
113 * API might be changed in backward-incompatible ways and is not recommended
114 * for production use. It is not subject to any SLA or deprecation policy.
115 *
116 * @name Message#orderingKey
117 * @type {string}
118 */
119 this.orderingKey = message.orderingKey;
120 /**
121 * The time at which the message was published.
122 *
123 * @name Message#publishTime
124 * @type {external:PreciseDate}
125 */
126 this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
127 /**
128 * The time at which the message was recieved by the subscription.
129 *
130 * @name Message#received
131 * @type {number}
132 */
133 this.received = Date.now();
134 this._handled = false;
135 this._length = this.data.length;
136 this._subscriber = sub;
137 }
138 /**
139 * The length of the message data.
140 *
141 * @type {number}
142 */
143 get length() {
144 return this._length;
145 }
146 /**
147 * Acknowledges the message.
148 *
149 * @example
150 * ```
151 * subscription.on('message', message => {
152 * message.ack();
153 * });
154 * ```
155 */
156 ack() {
157 if (!this._handled) {
158 this._handled = true;
159 this._subscriber.ack(this);
160 }
161 }
162 /**
163 * Modifies the ack deadline.
164 *
165 * @param {number} deadline The number of seconds to extend the deadline.
166 * @private
167 */
168 modAck(deadline) {
169 if (!this._handled) {
170 this._subscriber.modAck(this, deadline);
171 }
172 }
173 /**
174 * Removes the message from our inventory and schedules it to be redelivered.
175 *
176 * @example
177 * ```
178 * subscription.on('message', message => {
179 * message.nack();
180 * });
181 * ```
182 */
183 nack() {
184 if (!this._handled) {
185 this._handled = true;
186 this._subscriber.nack(this);
187 }
188 }
189}
190exports.Message = Message;
191const minAckDeadlineForExactlyOnce = temporal_1.Duration.from({ seconds: 60 });
192/**
193 * Subscriber class is used to manage all message related functionality.
194 *
195 * @private
196 * @class
197 *
198 * @param {Subscription} subscription The corresponding subscription.
199 * @param {SubscriberOptions} options The subscriber options.
200 */
201class Subscriber extends events_1.EventEmitter {
202 constructor(subscription, options = {}) {
203 super();
204 this.ackDeadline = default_options_1.defaultOptions.subscription.ackDeadline;
205 this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages;
206 this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes;
207 this.useLegacyFlowControl = false;
208 this.isOpen = false;
209 this._useOpentelemetry = false;
210 this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
211 this._latencies = new histogram_1.Histogram();
212 this._subscription = subscription;
213 this._errorLog = new util_1.Throttler(60 * 1000);
214 this.setOptions(options);
215 }
216 /**
217 * Update our ack extension time that will be used by the lease manager
218 * for sending modAcks.
219 *
220 * Should not be called from outside this class, except for unit tests.
221 *
222 * @param {number} [ackTimeSeconds] The number of seconds that the last
223 * ack took after the message was received. If this is undefined, then
224 * we won't update the histogram, but we will still recalculate the
225 * ackDeadline based on the situation.
226 *
227 * @private
228 */
229 updateAckDeadline(ackTimeSeconds) {
230 // Start with the value we already have.
231 let ackDeadline = this.ackDeadline;
232 // If we got an ack time reading, update the histogram (and ackDeadline).
233 if (ackTimeSeconds) {
234 this._histogram.add(ackTimeSeconds);
235 ackDeadline = this._histogram.percentile(99);
236 }
237 // Grab our current min/max deadline values, based on whether exactly-once
238 // is enabled, and the defaults.
239 const [minDeadline, maxDeadline] = this.getMinMaxDeadlines();
240 if (minDeadline) {
241 ackDeadline = Math.max(ackDeadline, minDeadline.totalOf('second'));
242 }
243 if (maxDeadline) {
244 ackDeadline = Math.min(ackDeadline, maxDeadline.totalOf('second'));
245 }
246 // Set the bounded result back.
247 this.ackDeadline = ackDeadline;
248 }
249 getMinMaxDeadlines() {
250 var _a, _b;
251 // If this is an exactly-once subscription, and the user didn't set their
252 // own minimum ack periods, set it to the default for exactly-once.
253 const defaultMinDeadline = this.isExactlyOnce
254 ? minAckDeadlineForExactlyOnce
255 : default_options_1.defaultOptions.subscription.minAckDeadline;
256 const defaultMaxDeadline = default_options_1.defaultOptions.subscription.maxAckDeadline;
257 // Pull in any user-set min/max.
258 const minDeadline = (_a = this._options.minAckDeadline) !== null && _a !== void 0 ? _a : defaultMinDeadline;
259 const maxDeadline = (_b = this._options.maxAckDeadline) !== null && _b !== void 0 ? _b : defaultMaxDeadline;
260 return [minDeadline, maxDeadline];
261 }
262 /**
263 * Returns true if an exactly once subscription has been detected.
264 *
265 * @private
266 */
267 get isExactlyOnce() {
268 if (!this.subscriptionProperties) {
269 return false;
270 }
271 return !!this.subscriptionProperties.exactlyOnceDeliveryEnabled;
272 }
273 /**
274 * Sets our subscription properties from incoming messages.
275 *
276 * @param {SubscriptionProperties} subscriptionProperties The new properties.
277 * @private
278 */
279 setSubscriptionProperties(subscriptionProperties) {
280 const previouslyEnabled = this.isExactlyOnce;
281 this.subscriptionProperties = subscriptionProperties;
282 // If this is an exactly-once subscription...
283 if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) {
284 // Warn the user that they may have difficulty.
285 this._errorLog.doMaybe(() => console.error('WARNING: Exactly-once subscriptions are not yet supported ' +
286 'by the Node client library. This feature will be added ' +
287 'in a future release.'));
288 }
289 // Update ackDeadline in case the flag switched.
290 if (previouslyEnabled !== this.isExactlyOnce) {
291 this.updateAckDeadline();
292 }
293 }
294 /**
295 * The 99th percentile of request latencies.
296 *
297 * @type {number}
298 * @private
299 */
300 get modAckLatency() {
301 const latency = this._latencies.percentile(99);
302 let bufferTime = 0;
303 if (this._modAcks) {
304 bufferTime = this._modAcks.maxMilliseconds;
305 }
306 return latency * 1000 + bufferTime;
307 }
308 /**
309 * The full name of the Subscription.
310 *
311 * @type {string}
312 * @private
313 */
314 get name() {
315 if (!this._name) {
316 const { name, projectId } = this._subscription;
317 this._name = (0, projectify_1.replaceProjectIdToken)(name, projectId);
318 }
319 return this._name;
320 }
321 /**
322 * Acknowledges the supplied message.
323 *
324 * @param {Message} message The message to acknowledge.
325 * @returns {Promise}
326 * @private
327 */
328 async ack(message) {
329 const ackTimeSeconds = (Date.now() - message.received) / 1000;
330 this.updateAckDeadline(ackTimeSeconds);
331 this._acks.add(message);
332 await this._acks.onFlush();
333 this._inventory.remove(message);
334 }
335 /**
336 * Closes the subscriber. The returned promise will resolve once any pending
337 * acks/modAcks are finished.
338 *
339 * @returns {Promise}
340 * @private
341 */
342 async close() {
343 if (!this.isOpen) {
344 return;
345 }
346 this.isOpen = false;
347 this._stream.destroy();
348 this._inventory.clear();
349 await this._waitForFlush();
350 this.emit('close');
351 }
352 /**
353 * Gets the subscriber client instance.
354 *
355 * @returns {Promise<object>}
356 * @private
357 */
358 async getClient() {
359 const pubsub = this._subscription.pubsub;
360 const [client] = await (0, promisify_1.promisify)(pubsub.getClient_).call(pubsub, {
361 client: 'SubscriberClient',
362 });
363 return client;
364 }
365 /**
366 * Modifies the acknowledge deadline for the provided message.
367 *
368 * @param {Message} message The message to modify.
369 * @param {number} deadline The deadline.
370 * @returns {Promise}
371 * @private
372 */
373 async modAck(message, deadline) {
374 const startTime = Date.now();
375 this._modAcks.add(message, deadline);
376 await this._modAcks.onFlush();
377 const latency = (Date.now() - startTime) / 1000;
378 this._latencies.add(latency);
379 }
380 /**
381 * Modfies the acknowledge deadline for the provided message and then removes
382 * it from our inventory.
383 *
384 * @param {Message} message The message.
385 * @return {Promise}
386 * @private
387 */
388 async nack(message) {
389 await this.modAck(message, 0);
390 this._inventory.remove(message);
391 }
392 /**
393 * Starts pulling messages.
394 * @private
395 */
396 open() {
397 const { batching, flowControl, streamingOptions } = this._options;
398 this._acks = new message_queues_1.AckQueue(this, batching);
399 this._modAcks = new message_queues_1.ModAckQueue(this, batching);
400 this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
401 this._stream = new message_stream_1.MessageStream(this, streamingOptions);
402 this._stream
403 .on('error', err => this.emit('error', err))
404 .on('debug', err => this.emit('debug', err))
405 .on('data', (data) => this._onData(data))
406 .once('close', () => this.close());
407 this._inventory
408 .on('full', () => this._stream.pause())
409 .on('free', () => this._stream.resume());
410 this.isOpen = true;
411 }
412 /**
413 * Sets subscriber options.
414 *
415 * @param {SubscriberOptions} options The options.
416 * @private
417 */
418 setOptions(options) {
419 this._options = options;
420 this._useOpentelemetry = options.enableOpenTelemetryTracing || false;
421 // The user-set ackDeadline value basically pegs the extension time.
422 // We'll emulate it by overwriting min/max.
423 const passedAckDeadline = options.ackDeadline;
424 if (passedAckDeadline !== undefined) {
425 this.ackDeadline = passedAckDeadline;
426 options.minAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline });
427 options.maxAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline });
428 }
429 this.useLegacyFlowControl = options.useLegacyFlowControl || false;
430 if (options.flowControl) {
431 this.maxMessages =
432 options.flowControl.maxMessages ||
433 default_options_1.defaultOptions.subscription.maxOutstandingMessages;
434 this.maxBytes =
435 options.flowControl.maxBytes ||
436 default_options_1.defaultOptions.subscription.maxOutstandingBytes;
437 // In the event that the user has specified the maxMessages option, we
438 // want to make sure that the maxStreams option isn't higher.
439 // It doesn't really make sense to open 5 streams if the user only wants
440 // 1 message at a time.
441 if (!options.streamingOptions) {
442 options.streamingOptions = {};
443 }
444 const { maxStreams = default_options_1.defaultOptions.subscription.maxStreams } = options.streamingOptions;
445 options.streamingOptions.maxStreams = Math.min(maxStreams, this.maxMessages);
446 }
447 }
448 /**
449 * Constructs an OpenTelemetry span from the incoming message.
450 *
451 * @param {Message} message One of the received messages
452 * @private
453 */
454 _constructSpan(message) {
455 // Handle cases where OpenTelemetry is disabled or no span context was sent through message
456 if (!this._useOpentelemetry ||
457 !message.attributes ||
458 !message.attributes['googclient_OpenTelemetrySpanContext']) {
459 return undefined;
460 }
461 const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
462 const parentSpanContext = spanValue
463 ? JSON.parse(spanValue)
464 : undefined;
465 const spanAttributes = {
466 // Original span attributes
467 ackId: message.ackId,
468 deliveryAttempt: message.deliveryAttempt,
469 //
470 // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
471 [semantic_conventions_1.SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub',
472 [semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: 'process',
473 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: this.name,
474 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic',
475 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id,
476 [semantic_conventions_1.SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub',
477 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: message.data.length,
478 // Not in Opentelemetry semantic convention but mimics naming
479 'messaging.pubsub.received_at': message.received,
480 'messaging.pubsub.acknowlege_id': message.ackId,
481 'messaging.pubsub.delivery_attempt': message.deliveryAttempt,
482 };
483 // Subscriber spans should always have a publisher span as a parent.
484 // Return undefined if no parent is provided
485 const spanName = `${this.name} process`;
486 const span = parentSpanContext
487 ? (0, opentelemetry_tracing_1.createSpan)(spanName.trim(), api_1.SpanKind.CONSUMER, spanAttributes, parentSpanContext)
488 : undefined;
489 return span;
490 }
491 /**
492 * Callback to be invoked when a new message is available.
493 *
494 * New messages will be added to the subscribers inventory, which in turn will
495 * automatically extend the messages ack deadline until either:
496 * a. the user acks/nacks it
497 * b. the maxExtension option is hit
498 *
499 * If the message puts us at/over capacity, then we'll pause our message
500 * stream until we've freed up some inventory space.
501 *
502 * New messages must immediately issue a ModifyAckDeadline request
503 * (aka receipt) to confirm with the backend that we did infact receive the
504 * message and its ok to start ticking down on the deadline.
505 *
506 * @private
507 */
508 _onData(response) {
509 // Grab the subscription properties for exactly once and ordering flags.
510 if (response.subscriptionProperties) {
511 this.setSubscriptionProperties(response.subscriptionProperties);
512 }
513 const { receivedMessages } = response;
514 for (const data of receivedMessages) {
515 const message = new Message(this, data);
516 const span = this._constructSpan(message);
517 if (this.isOpen) {
518 message.modAck(this.ackDeadline);
519 this._inventory.add(message);
520 }
521 else {
522 message.nack();
523 }
524 if (span) {
525 span.end();
526 }
527 }
528 }
529 /**
530 * Returns a promise that will resolve once all pending requests have settled.
531 *
532 * @private
533 *
534 * @returns {Promise}
535 */
536 async _waitForFlush() {
537 const promises = [];
538 if (this._acks.numPendingRequests) {
539 promises.push(this._acks.onFlush());
540 this._acks.flush();
541 }
542 if (this._modAcks.numPendingRequests) {
543 promises.push(this._modAcks.onFlush());
544 this._modAcks.flush();
545 }
546 if (this._acks.numInFlightRequests) {
547 promises.push(this._acks.onDrain());
548 }
549 if (this._modAcks.numInFlightRequests) {
550 promises.push(this._modAcks.onDrain());
551 }
552 await Promise.all(promises);
553 }
554}
555exports.Subscriber = Subscriber;
556//# sourceMappingURL=subscriber.js.map
\No newline at end of file