1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.Subscriber = exports.Message = void 0;
|
19 | const precise_date_1 = require("@google-cloud/precise-date");
|
20 | const projectify_1 = require("@google-cloud/projectify");
|
21 | const promisify_1 = require("@google-cloud/promisify");
|
22 | const events_1 = require("events");
|
23 | const api_1 = require("@opentelemetry/api");
|
24 | const semantic_conventions_1 = require("@opentelemetry/semantic-conventions");
|
25 | const histogram_1 = require("./histogram");
|
26 | const lease_manager_1 = require("./lease-manager");
|
27 | const message_queues_1 = require("./message-queues");
|
28 | const message_stream_1 = require("./message-stream");
|
29 | const default_options_1 = require("./default-options");
|
30 | const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
|
31 | const util_1 = require("./util");
|
32 | const temporal_1 = require("./temporal");
|
33 | /**
|
34 | * Date object with nanosecond precision. Supports all standard Date arguments
|
35 | * in addition to several custom types.
|
36 | *
|
37 | * @external PreciseDate
|
38 | * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
|
39 | */
|
40 | /**
|
41 | * Message objects provide a simple interface for users to get message data and
|
42 | * acknowledge the message.
|
43 | *
|
44 | * @example
|
45 | * ```
|
46 | * subscription.on('message', message => {
|
47 | * // {
|
48 | * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
|
49 | * // attributes: {key: 'value'},
|
50 | * // data: Buffer.from('Hello, world!'),
|
51 | * // id: '1551297743043',
|
52 | * // orderingKey: 'ordering-key',
|
53 | * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
|
54 | * // received: 1551297743043,
|
55 | * // length: 13
|
56 | * // }
|
57 | * });
|
58 | * ```
|
59 | */
|
60 | class Message {
|
61 | /**
|
62 | * @hideconstructor
|
63 | *
|
64 | * @param {Subscriber} sub The parent subscriber.
|
65 | * @param {object} message The raw message response.
|
66 | */
|
67 | constructor(sub, { ackId, message, deliveryAttempt }) {
|
68 | /**
|
69 | * This ID is used to acknowledge the message.
|
70 | *
|
71 | * @name Message#ackId
|
72 | * @type {string}
|
73 | */
|
74 | this.ackId = ackId;
|
75 | /**
|
76 | * Optional attributes for this message.
|
77 | *
|
78 | * @name Message#attributes
|
79 | * @type {object}
|
80 | */
|
81 | this.attributes = message.attributes || {};
|
82 | /**
|
83 | * The message data as a Buffer.
|
84 | *
|
85 | * @name Message#data
|
86 | * @type {Buffer}
|
87 | */
|
88 | this.data = message.data;
|
89 | /**
|
90 | * Delivery attempt counter is 1 + (the sum of number of NACKs and number of
|
91 | * ack_deadline exceeds) for this message.
|
92 | *
|
93 | * @name Message#deliveryAttempt
|
94 | * @type {number}
|
95 | */
|
96 | this.deliveryAttempt = Number(deliveryAttempt || 0);
|
97 | /**
|
98 | * ID of the message, assigned by the server when the message is published.
|
99 | * Guaranteed to be unique within the topic.
|
100 | *
|
101 | * @name Message#id
|
102 | * @type {string}
|
103 | */
|
104 | this.id = message.messageId;
|
105 | /**
|
106 | * Identifies related messages for which publish order should be respected.
|
107 | * If a `Subscription` has `enableMessageOrdering` set to `true`, messages
|
108 | * published with the same `orderingKey` value will be delivered to
|
109 | * subscribers in the order in which they are received by the Pub/Sub
|
110 | * system.
|
111 | *
|
112 | * **EXPERIMENTAL:** This feature is part of a closed alpha release. This
|
113 | * API might be changed in backward-incompatible ways and is not recommended
|
114 | * for production use. It is not subject to any SLA or deprecation policy.
|
115 | *
|
116 | * @name Message#orderingKey
|
117 | * @type {string}
|
118 | */
|
119 | this.orderingKey = message.orderingKey;
|
120 | /**
|
121 | * The time at which the message was published.
|
122 | *
|
123 | * @name Message#publishTime
|
124 | * @type {external:PreciseDate}
|
125 | */
|
126 | this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
|
127 | /**
|
128 | * The time at which the message was recieved by the subscription.
|
129 | *
|
130 | * @name Message#received
|
131 | * @type {number}
|
132 | */
|
133 | this.received = Date.now();
|
134 | this._handled = false;
|
135 | this._length = this.data.length;
|
136 | this._subscriber = sub;
|
137 | }
|
138 | /**
|
139 | * The length of the message data.
|
140 | *
|
141 | * @type {number}
|
142 | */
|
143 | get length() {
|
144 | return this._length;
|
145 | }
|
146 | /**
|
147 | * Acknowledges the message.
|
148 | *
|
149 | * @example
|
150 | * ```
|
151 | * subscription.on('message', message => {
|
152 | * message.ack();
|
153 | * });
|
154 | * ```
|
155 | */
|
156 | ack() {
|
157 | if (!this._handled) {
|
158 | this._handled = true;
|
159 | this._subscriber.ack(this);
|
160 | }
|
161 | }
|
162 | /**
|
163 | * Modifies the ack deadline.
|
164 | *
|
165 | * @param {number} deadline The number of seconds to extend the deadline.
|
166 | * @private
|
167 | */
|
168 | modAck(deadline) {
|
169 | if (!this._handled) {
|
170 | this._subscriber.modAck(this, deadline);
|
171 | }
|
172 | }
|
173 | /**
|
174 | * Removes the message from our inventory and schedules it to be redelivered.
|
175 | *
|
176 | * @example
|
177 | * ```
|
178 | * subscription.on('message', message => {
|
179 | * message.nack();
|
180 | * });
|
181 | * ```
|
182 | */
|
183 | nack() {
|
184 | if (!this._handled) {
|
185 | this._handled = true;
|
186 | this._subscriber.nack(this);
|
187 | }
|
188 | }
|
189 | }
|
190 | exports.Message = Message;
|
191 | const minAckDeadlineForExactlyOnce = temporal_1.Duration.from({ seconds: 60 });
|
192 | /**
|
193 | * Subscriber class is used to manage all message related functionality.
|
194 | *
|
195 | * @private
|
196 | * @class
|
197 | *
|
198 | * @param {Subscription} subscription The corresponding subscription.
|
199 | * @param {SubscriberOptions} options The subscriber options.
|
200 | */
|
201 | class Subscriber extends events_1.EventEmitter {
|
202 | constructor(subscription, options = {}) {
|
203 | super();
|
204 | this.ackDeadline = default_options_1.defaultOptions.subscription.ackDeadline;
|
205 | this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages;
|
206 | this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes;
|
207 | this.useLegacyFlowControl = false;
|
208 | this.isOpen = false;
|
209 | this._useOpentelemetry = false;
|
210 | this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
|
211 | this._latencies = new histogram_1.Histogram();
|
212 | this._subscription = subscription;
|
213 | this._errorLog = new util_1.Throttler(60 * 1000);
|
214 | this.setOptions(options);
|
215 | }
|
216 | /**
|
217 | * Update our ack extension time that will be used by the lease manager
|
218 | * for sending modAcks.
|
219 | *
|
220 | * Should not be called from outside this class, except for unit tests.
|
221 | *
|
222 | * @param {number} [ackTimeSeconds] The number of seconds that the last
|
223 | * ack took after the message was received. If this is undefined, then
|
224 | * we won't update the histogram, but we will still recalculate the
|
225 | * ackDeadline based on the situation.
|
226 | *
|
227 | * @private
|
228 | */
|
229 | updateAckDeadline(ackTimeSeconds) {
|
230 | // Start with the value we already have.
|
231 | let ackDeadline = this.ackDeadline;
|
232 | // If we got an ack time reading, update the histogram (and ackDeadline).
|
233 | if (ackTimeSeconds) {
|
234 | this._histogram.add(ackTimeSeconds);
|
235 | ackDeadline = this._histogram.percentile(99);
|
236 | }
|
237 | // Grab our current min/max deadline values, based on whether exactly-once
|
238 | // is enabled, and the defaults.
|
239 | const [minDeadline, maxDeadline] = this.getMinMaxDeadlines();
|
240 | if (minDeadline) {
|
241 | ackDeadline = Math.max(ackDeadline, minDeadline.totalOf('second'));
|
242 | }
|
243 | if (maxDeadline) {
|
244 | ackDeadline = Math.min(ackDeadline, maxDeadline.totalOf('second'));
|
245 | }
|
246 | // Set the bounded result back.
|
247 | this.ackDeadline = ackDeadline;
|
248 | }
|
249 | getMinMaxDeadlines() {
|
250 | var _a, _b;
|
251 | // If this is an exactly-once subscription, and the user didn't set their
|
252 | // own minimum ack periods, set it to the default for exactly-once.
|
253 | const defaultMinDeadline = this.isExactlyOnce
|
254 | ? minAckDeadlineForExactlyOnce
|
255 | : default_options_1.defaultOptions.subscription.minAckDeadline;
|
256 | const defaultMaxDeadline = default_options_1.defaultOptions.subscription.maxAckDeadline;
|
257 | // Pull in any user-set min/max.
|
258 | const minDeadline = (_a = this._options.minAckDeadline) !== null && _a !== void 0 ? _a : defaultMinDeadline;
|
259 | const maxDeadline = (_b = this._options.maxAckDeadline) !== null && _b !== void 0 ? _b : defaultMaxDeadline;
|
260 | return [minDeadline, maxDeadline];
|
261 | }
|
262 | /**
|
263 | * Returns true if an exactly once subscription has been detected.
|
264 | *
|
265 | * @private
|
266 | */
|
267 | get isExactlyOnce() {
|
268 | if (!this.subscriptionProperties) {
|
269 | return false;
|
270 | }
|
271 | return !!this.subscriptionProperties.exactlyOnceDeliveryEnabled;
|
272 | }
|
273 | /**
|
274 | * Sets our subscription properties from incoming messages.
|
275 | *
|
276 | * @param {SubscriptionProperties} subscriptionProperties The new properties.
|
277 | * @private
|
278 | */
|
279 | setSubscriptionProperties(subscriptionProperties) {
|
280 | const previouslyEnabled = this.isExactlyOnce;
|
281 | this.subscriptionProperties = subscriptionProperties;
|
282 | // If this is an exactly-once subscription...
|
283 | if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) {
|
284 | // Warn the user that they may have difficulty.
|
285 | this._errorLog.doMaybe(() => console.error('WARNING: Exactly-once subscriptions are not yet supported ' +
|
286 | 'by the Node client library. This feature will be added ' +
|
287 | 'in a future release.'));
|
288 | }
|
289 | // Update ackDeadline in case the flag switched.
|
290 | if (previouslyEnabled !== this.isExactlyOnce) {
|
291 | this.updateAckDeadline();
|
292 | }
|
293 | }
|
294 | /**
|
295 | * The 99th percentile of request latencies.
|
296 | *
|
297 | * @type {number}
|
298 | * @private
|
299 | */
|
300 | get modAckLatency() {
|
301 | const latency = this._latencies.percentile(99);
|
302 | let bufferTime = 0;
|
303 | if (this._modAcks) {
|
304 | bufferTime = this._modAcks.maxMilliseconds;
|
305 | }
|
306 | return latency * 1000 + bufferTime;
|
307 | }
|
308 | /**
|
309 | * The full name of the Subscription.
|
310 | *
|
311 | * @type {string}
|
312 | * @private
|
313 | */
|
314 | get name() {
|
315 | if (!this._name) {
|
316 | const { name, projectId } = this._subscription;
|
317 | this._name = (0, projectify_1.replaceProjectIdToken)(name, projectId);
|
318 | }
|
319 | return this._name;
|
320 | }
|
321 | /**
|
322 | * Acknowledges the supplied message.
|
323 | *
|
324 | * @param {Message} message The message to acknowledge.
|
325 | * @returns {Promise}
|
326 | * @private
|
327 | */
|
328 | async ack(message) {
|
329 | const ackTimeSeconds = (Date.now() - message.received) / 1000;
|
330 | this.updateAckDeadline(ackTimeSeconds);
|
331 | this._acks.add(message);
|
332 | await this._acks.onFlush();
|
333 | this._inventory.remove(message);
|
334 | }
|
335 | /**
|
336 | * Closes the subscriber. The returned promise will resolve once any pending
|
337 | * acks/modAcks are finished.
|
338 | *
|
339 | * @returns {Promise}
|
340 | * @private
|
341 | */
|
342 | async close() {
|
343 | if (!this.isOpen) {
|
344 | return;
|
345 | }
|
346 | this.isOpen = false;
|
347 | this._stream.destroy();
|
348 | this._inventory.clear();
|
349 | await this._waitForFlush();
|
350 | this.emit('close');
|
351 | }
|
352 | /**
|
353 | * Gets the subscriber client instance.
|
354 | *
|
355 | * @returns {Promise<object>}
|
356 | * @private
|
357 | */
|
358 | async getClient() {
|
359 | const pubsub = this._subscription.pubsub;
|
360 | const [client] = await (0, promisify_1.promisify)(pubsub.getClient_).call(pubsub, {
|
361 | client: 'SubscriberClient',
|
362 | });
|
363 | return client;
|
364 | }
|
365 | /**
|
366 | * Modifies the acknowledge deadline for the provided message.
|
367 | *
|
368 | * @param {Message} message The message to modify.
|
369 | * @param {number} deadline The deadline.
|
370 | * @returns {Promise}
|
371 | * @private
|
372 | */
|
373 | async modAck(message, deadline) {
|
374 | const startTime = Date.now();
|
375 | this._modAcks.add(message, deadline);
|
376 | await this._modAcks.onFlush();
|
377 | const latency = (Date.now() - startTime) / 1000;
|
378 | this._latencies.add(latency);
|
379 | }
|
380 | /**
|
381 | * Modfies the acknowledge deadline for the provided message and then removes
|
382 | * it from our inventory.
|
383 | *
|
384 | * @param {Message} message The message.
|
385 | * @return {Promise}
|
386 | * @private
|
387 | */
|
388 | async nack(message) {
|
389 | await this.modAck(message, 0);
|
390 | this._inventory.remove(message);
|
391 | }
|
392 | /**
|
393 | * Starts pulling messages.
|
394 | * @private
|
395 | */
|
396 | open() {
|
397 | const { batching, flowControl, streamingOptions } = this._options;
|
398 | this._acks = new message_queues_1.AckQueue(this, batching);
|
399 | this._modAcks = new message_queues_1.ModAckQueue(this, batching);
|
400 | this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
|
401 | this._stream = new message_stream_1.MessageStream(this, streamingOptions);
|
402 | this._stream
|
403 | .on('error', err => this.emit('error', err))
|
404 | .on('debug', err => this.emit('debug', err))
|
405 | .on('data', (data) => this._onData(data))
|
406 | .once('close', () => this.close());
|
407 | this._inventory
|
408 | .on('full', () => this._stream.pause())
|
409 | .on('free', () => this._stream.resume());
|
410 | this.isOpen = true;
|
411 | }
|
412 | /**
|
413 | * Sets subscriber options.
|
414 | *
|
415 | * @param {SubscriberOptions} options The options.
|
416 | * @private
|
417 | */
|
418 | setOptions(options) {
|
419 | this._options = options;
|
420 | this._useOpentelemetry = options.enableOpenTelemetryTracing || false;
|
421 | // The user-set ackDeadline value basically pegs the extension time.
|
422 | // We'll emulate it by overwriting min/max.
|
423 | const passedAckDeadline = options.ackDeadline;
|
424 | if (passedAckDeadline !== undefined) {
|
425 | this.ackDeadline = passedAckDeadline;
|
426 | options.minAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline });
|
427 | options.maxAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline });
|
428 | }
|
429 | this.useLegacyFlowControl = options.useLegacyFlowControl || false;
|
430 | if (options.flowControl) {
|
431 | this.maxMessages =
|
432 | options.flowControl.maxMessages ||
|
433 | default_options_1.defaultOptions.subscription.maxOutstandingMessages;
|
434 | this.maxBytes =
|
435 | options.flowControl.maxBytes ||
|
436 | default_options_1.defaultOptions.subscription.maxOutstandingBytes;
|
437 | // In the event that the user has specified the maxMessages option, we
|
438 | // want to make sure that the maxStreams option isn't higher.
|
439 | // It doesn't really make sense to open 5 streams if the user only wants
|
440 | // 1 message at a time.
|
441 | if (!options.streamingOptions) {
|
442 | options.streamingOptions = {};
|
443 | }
|
444 | const { maxStreams = default_options_1.defaultOptions.subscription.maxStreams } = options.streamingOptions;
|
445 | options.streamingOptions.maxStreams = Math.min(maxStreams, this.maxMessages);
|
446 | }
|
447 | }
|
448 | /**
|
449 | * Constructs an OpenTelemetry span from the incoming message.
|
450 | *
|
451 | * @param {Message} message One of the received messages
|
452 | * @private
|
453 | */
|
454 | _constructSpan(message) {
|
455 | // Handle cases where OpenTelemetry is disabled or no span context was sent through message
|
456 | if (!this._useOpentelemetry ||
|
457 | !message.attributes ||
|
458 | !message.attributes['googclient_OpenTelemetrySpanContext']) {
|
459 | return undefined;
|
460 | }
|
461 | const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
|
462 | const parentSpanContext = spanValue
|
463 | ? JSON.parse(spanValue)
|
464 | : undefined;
|
465 | const spanAttributes = {
|
466 | // Original span attributes
|
467 | ackId: message.ackId,
|
468 | deliveryAttempt: message.deliveryAttempt,
|
469 | //
|
470 | // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
|
471 | [semantic_conventions_1.SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub',
|
472 | [semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: 'process',
|
473 | [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: this.name,
|
474 | [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic',
|
475 | [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id,
|
476 | [semantic_conventions_1.SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub',
|
477 | [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: message.data.length,
|
478 | // Not in Opentelemetry semantic convention but mimics naming
|
479 | 'messaging.pubsub.received_at': message.received,
|
480 | 'messaging.pubsub.acknowlege_id': message.ackId,
|
481 | 'messaging.pubsub.delivery_attempt': message.deliveryAttempt,
|
482 | };
|
483 | // Subscriber spans should always have a publisher span as a parent.
|
484 | // Return undefined if no parent is provided
|
485 | const spanName = `${this.name} process`;
|
486 | const span = parentSpanContext
|
487 | ? (0, opentelemetry_tracing_1.createSpan)(spanName.trim(), api_1.SpanKind.CONSUMER, spanAttributes, parentSpanContext)
|
488 | : undefined;
|
489 | return span;
|
490 | }
|
491 | /**
|
492 | * Callback to be invoked when a new message is available.
|
493 | *
|
494 | * New messages will be added to the subscribers inventory, which in turn will
|
495 | * automatically extend the messages ack deadline until either:
|
496 | * a. the user acks/nacks it
|
497 | * b. the maxExtension option is hit
|
498 | *
|
499 | * If the message puts us at/over capacity, then we'll pause our message
|
500 | * stream until we've freed up some inventory space.
|
501 | *
|
502 | * New messages must immediately issue a ModifyAckDeadline request
|
503 | * (aka receipt) to confirm with the backend that we did infact receive the
|
504 | * message and its ok to start ticking down on the deadline.
|
505 | *
|
506 | * @private
|
507 | */
|
508 | _onData(response) {
|
509 | // Grab the subscription properties for exactly once and ordering flags.
|
510 | if (response.subscriptionProperties) {
|
511 | this.setSubscriptionProperties(response.subscriptionProperties);
|
512 | }
|
513 | const { receivedMessages } = response;
|
514 | for (const data of receivedMessages) {
|
515 | const message = new Message(this, data);
|
516 | const span = this._constructSpan(message);
|
517 | if (this.isOpen) {
|
518 | message.modAck(this.ackDeadline);
|
519 | this._inventory.add(message);
|
520 | }
|
521 | else {
|
522 | message.nack();
|
523 | }
|
524 | if (span) {
|
525 | span.end();
|
526 | }
|
527 | }
|
528 | }
|
529 | /**
|
530 | * Returns a promise that will resolve once all pending requests have settled.
|
531 | *
|
532 | * @private
|
533 | *
|
534 | * @returns {Promise}
|
535 | */
|
536 | async _waitForFlush() {
|
537 | const promises = [];
|
538 | if (this._acks.numPendingRequests) {
|
539 | promises.push(this._acks.onFlush());
|
540 | this._acks.flush();
|
541 | }
|
542 | if (this._modAcks.numPendingRequests) {
|
543 | promises.push(this._modAcks.onFlush());
|
544 | this._modAcks.flush();
|
545 | }
|
546 | if (this._acks.numInFlightRequests) {
|
547 | promises.push(this._acks.onDrain());
|
548 | }
|
549 | if (this._modAcks.numInFlightRequests) {
|
550 | promises.push(this._modAcks.onDrain());
|
551 | }
|
552 | await Promise.all(promises);
|
553 | }
|
554 | }
|
555 | exports.Subscriber = Subscriber;
|
556 | //# sourceMappingURL=subscriber.js.map |
\ | No newline at end of file |