UNPKG

17.1 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscriber = exports.Message = void 0;
19const precise_date_1 = require("@google-cloud/precise-date");
20const projectify_1 = require("@google-cloud/projectify");
21const promisify_1 = require("@google-cloud/promisify");
22const events_1 = require("events");
23const api_1 = require("@opentelemetry/api");
24const semantic_conventions_1 = require("@opentelemetry/semantic-conventions");
25const histogram_1 = require("./histogram");
26const lease_manager_1 = require("./lease-manager");
27const message_queues_1 = require("./message-queues");
28const message_stream_1 = require("./message-stream");
29const default_options_1 = require("./default-options");
30const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
31/**
32 * Date object with nanosecond precision. Supports all standard Date arguments
33 * in addition to several custom types.
34 *
35 * @external PreciseDate
36 * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
37 */
38/**
39 * Message objects provide a simple interface for users to get message data and
40 * acknowledge the message.
41 *
42 * @example
43 * ```
44 * subscription.on('message', message => {
45 * // {
46 * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
47 * // attributes: {key: 'value'},
48 * // data: Buffer.from('Hello, world!'),
49 * // id: '1551297743043',
50 * // orderingKey: 'ordering-key',
51 * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
52 * // received: 1551297743043,
53 * // length: 13
54 * // }
55 * });
56 * ```
57 */
58class Message {
59 /**
60 * @hideconstructor
61 *
62 * @param {Subscriber} sub The parent subscriber.
63 * @param {object} message The raw message response.
64 */
65 constructor(sub, { ackId, message, deliveryAttempt }) {
66 /**
67 * This ID is used to acknowledge the message.
68 *
69 * @name Message#ackId
70 * @type {string}
71 */
72 this.ackId = ackId;
73 /**
74 * Optional attributes for this message.
75 *
76 * @name Message#attributes
77 * @type {object}
78 */
79 this.attributes = message.attributes || {};
80 /**
81 * The message data as a Buffer.
82 *
83 * @name Message#data
84 * @type {Buffer}
85 */
86 this.data = message.data;
87 /**
88 * Delivery attempt counter is 1 + (the sum of number of NACKs and number of
89 * ack_deadline exceeds) for this message.
90 *
91 * @name Message#deliveryAttempt
92 * @type {number}
93 */
94 this.deliveryAttempt = Number(deliveryAttempt || 0);
95 /**
96 * ID of the message, assigned by the server when the message is published.
97 * Guaranteed to be unique within the topic.
98 *
99 * @name Message#id
100 * @type {string}
101 */
102 this.id = message.messageId;
103 /**
104 * Identifies related messages for which publish order should be respected.
105 * If a `Subscription` has `enableMessageOrdering` set to `true`, messages
106 * published with the same `orderingKey` value will be delivered to
107 * subscribers in the order in which they are received by the Pub/Sub
108 * system.
109 *
110 * **EXPERIMENTAL:** This feature is part of a closed alpha release. This
111 * API might be changed in backward-incompatible ways and is not recommended
112 * for production use. It is not subject to any SLA or deprecation policy.
113 *
114 * @name Message#orderingKey
115 * @type {string}
116 */
117 this.orderingKey = message.orderingKey;
118 /**
119 * The time at which the message was published.
120 *
121 * @name Message#publishTime
122 * @type {external:PreciseDate}
123 */
124 this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
125 /**
126 * The time at which the message was recieved by the subscription.
127 *
128 * @name Message#received
129 * @type {number}
130 */
131 this.received = Date.now();
132 this._handled = false;
133 this._length = this.data.length;
134 this._subscriber = sub;
135 }
136 /**
137 * The length of the message data.
138 *
139 * @type {number}
140 */
141 get length() {
142 return this._length;
143 }
144 /**
145 * Acknowledges the message.
146 *
147 * @example
148 * ```
149 * subscription.on('message', message => {
150 * message.ack();
151 * });
152 * ```
153 */
154 ack() {
155 if (!this._handled) {
156 this._handled = true;
157 this._subscriber.ack(this);
158 }
159 }
160 /**
161 * Modifies the ack deadline.
162 *
163 * @param {number} deadline The number of seconds to extend the deadline.
164 * @private
165 */
166 modAck(deadline) {
167 if (!this._handled) {
168 this._subscriber.modAck(this, deadline);
169 }
170 }
171 /**
172 * Removes the message from our inventory and schedules it to be redelivered.
173 *
174 * @example
175 * ```
176 * subscription.on('message', message => {
177 * message.nack();
178 * });
179 * ```
180 */
181 nack() {
182 if (!this._handled) {
183 this._handled = true;
184 this._subscriber.nack(this);
185 }
186 }
187}
188exports.Message = Message;
189/**
190 * @typedef {object} SubscriberOptions
191 * @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left
192 * unset the initial value will be 10 seconds, but it will evolve into the
193 * 99th percentile time it takes to acknowledge a message.
194 * @property {BatchOptions} [batching] Request batching options.
195 * @property {FlowControlOptions} [flowControl] Flow control options.
196 * @property {boolean} [useLegacyFlowControl] Disables enforcing flow control
197 * settings at the Cloud PubSub server and uses the less accurate method
198 * of only enforcing flow control at the client side.
199 * @property {MessageStreamOptions} [streamingOptions] Streaming options.
200 */
201/**
202 * Subscriber class is used to manage all message related functionality.
203 *
204 * @private
205 * @class
206 *
207 * @param {Subscription} subscription The corresponding subscription.
208 * @param {SubscriberOptions} options The subscriber options.
209 */
210class Subscriber extends events_1.EventEmitter {
211 constructor(subscription, options = {}) {
212 super();
213 this.ackDeadline = 10;
214 this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages;
215 this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes;
216 this.useLegacyFlowControl = false;
217 this.isOpen = false;
218 this._isUserSetDeadline = false;
219 this._useOpentelemetry = false;
220 this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
221 this._latencies = new histogram_1.Histogram();
222 this._subscription = subscription;
223 this.setOptions(options);
224 }
225 /**
226 * The 99th percentile of request latencies.
227 *
228 * @type {number}
229 * @private
230 */
231 get modAckLatency() {
232 const latency = this._latencies.percentile(99);
233 let bufferTime = 0;
234 if (this._modAcks) {
235 bufferTime = this._modAcks.maxMilliseconds;
236 }
237 return latency * 1000 + bufferTime;
238 }
239 /**
240 * The full name of the Subscription.
241 *
242 * @type {string}
243 * @private
244 */
245 get name() {
246 if (!this._name) {
247 const { name, projectId } = this._subscription;
248 this._name = projectify_1.replaceProjectIdToken(name, projectId);
249 }
250 return this._name;
251 }
252 /**
253 * Acknowledges the supplied message.
254 *
255 * @param {Message} message The message to acknowledge.
256 * @returns {Promise}
257 * @private
258 */
259 async ack(message) {
260 if (!this._isUserSetDeadline) {
261 const ackTimeSeconds = (Date.now() - message.received) / 1000;
262 this._histogram.add(ackTimeSeconds);
263 this.ackDeadline = this._histogram.percentile(99);
264 }
265 this._acks.add(message);
266 await this._acks.onFlush();
267 this._inventory.remove(message);
268 }
269 /**
270 * Closes the subscriber. The returned promise will resolve once any pending
271 * acks/modAcks are finished.
272 *
273 * @returns {Promise}
274 * @private
275 */
276 async close() {
277 if (!this.isOpen) {
278 return;
279 }
280 this.isOpen = false;
281 this._stream.destroy();
282 this._inventory.clear();
283 await this._waitForFlush();
284 this.emit('close');
285 }
286 /**
287 * Gets the subscriber client instance.
288 *
289 * @returns {Promise<object>}
290 * @private
291 */
292 async getClient() {
293 const pubsub = this._subscription.pubsub;
294 const [client] = await promisify_1.promisify(pubsub.getClient_).call(pubsub, {
295 client: 'SubscriberClient',
296 });
297 return client;
298 }
299 /**
300 * Modifies the acknowledge deadline for the provided message.
301 *
302 * @param {Message} message The message to modify.
303 * @param {number} deadline The deadline.
304 * @returns {Promise}
305 * @private
306 */
307 async modAck(message, deadline) {
308 const startTime = Date.now();
309 this._modAcks.add(message, deadline);
310 await this._modAcks.onFlush();
311 const latency = (Date.now() - startTime) / 1000;
312 this._latencies.add(latency);
313 }
314 /**
315 * Modfies the acknowledge deadline for the provided message and then removes
316 * it from our inventory.
317 *
318 * @param {Message} message The message.
319 * @return {Promise}
320 * @private
321 */
322 async nack(message) {
323 await this.modAck(message, 0);
324 this._inventory.remove(message);
325 }
326 /**
327 * Starts pulling messages.
328 * @private
329 */
330 open() {
331 const { batching, flowControl, streamingOptions } = this._options;
332 this._acks = new message_queues_1.AckQueue(this, batching);
333 this._modAcks = new message_queues_1.ModAckQueue(this, batching);
334 this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
335 this._stream = new message_stream_1.MessageStream(this, streamingOptions);
336 this._stream
337 .on('error', err => this.emit('error', err))
338 .on('debug', err => this.emit('debug', err))
339 .on('data', (data) => this._onData(data))
340 .once('close', () => this.close());
341 this._inventory
342 .on('full', () => this._stream.pause())
343 .on('free', () => this._stream.resume());
344 this.isOpen = true;
345 }
346 /**
347 * Sets subscriber options.
348 *
349 * @param {SubscriberOptions} options The options.
350 * @private
351 */
352 setOptions(options) {
353 this._options = options;
354 this._useOpentelemetry = options.enableOpenTelemetryTracing || false;
355 if (options.ackDeadline) {
356 this.ackDeadline = options.ackDeadline;
357 this._isUserSetDeadline = true;
358 }
359 this.useLegacyFlowControl = options.useLegacyFlowControl || false;
360 if (options.flowControl) {
361 this.maxMessages =
362 options.flowControl.maxMessages ||
363 default_options_1.defaultOptions.subscription.maxOutstandingMessages;
364 this.maxBytes =
365 options.flowControl.maxBytes ||
366 default_options_1.defaultOptions.subscription.maxOutstandingBytes;
367 // In the event that the user has specified the maxMessages option, we
368 // want to make sure that the maxStreams option isn't higher.
369 // It doesn't really make sense to open 5 streams if the user only wants
370 // 1 message at a time.
371 if (!options.streamingOptions) {
372 options.streamingOptions = {};
373 }
374 const { maxStreams = default_options_1.defaultOptions.subscription.maxStreams } = options.streamingOptions;
375 options.streamingOptions.maxStreams = Math.min(maxStreams, this.maxMessages);
376 }
377 }
378 /**
379 * Constructs an OpenTelemetry span from the incoming message.
380 *
381 * @param {Message} message One of the received messages
382 * @private
383 */
384 _constructSpan(message) {
385 // Handle cases where OpenTelemetry is disabled or no span context was sent through message
386 if (!this._useOpentelemetry ||
387 !message.attributes ||
388 !message.attributes['googclient_OpenTelemetrySpanContext']) {
389 return undefined;
390 }
391 const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
392 const parentSpanContext = spanValue
393 ? JSON.parse(spanValue)
394 : undefined;
395 const spanAttributes = {
396 // Original span attributes
397 ackId: message.ackId,
398 deliveryAttempt: message.deliveryAttempt,
399 //
400 // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
401 [semantic_conventions_1.SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub',
402 [semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: 'process',
403 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: this.name,
404 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic',
405 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id,
406 [semantic_conventions_1.SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub',
407 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: message.data.length,
408 // Not in Opentelemetry semantic convention but mimics naming
409 'messaging.pubsub.received_at': message.received,
410 'messaging.pubsub.acknowlege_id': message.ackId,
411 'messaging.pubsub.delivery_attempt': message.deliveryAttempt,
412 };
413 // Subscriber spans should always have a publisher span as a parent.
414 // Return undefined if no parent is provided
415 const spanName = `${this.name} process`;
416 const span = parentSpanContext
417 ? opentelemetry_tracing_1.createSpan(spanName.trim(), api_1.SpanKind.CONSUMER, spanAttributes, parentSpanContext)
418 : undefined;
419 return span;
420 }
421 /**
422 * Callback to be invoked when a new message is available.
423 *
424 * New messages will be added to the subscribers inventory, which in turn will
425 * automatically extend the messages ack deadline until either:
426 * a. the user acks/nacks it
427 * b. the maxExtension option is hit
428 *
429 * If the message puts us at/over capacity, then we'll pause our message
430 * stream until we've freed up some inventory space.
431 *
432 * New messages must immediately issue a ModifyAckDeadline request
433 * (aka receipt) to confirm with the backend that we did infact receive the
434 * message and its ok to start ticking down on the deadline.
435 *
436 * @private
437 */
438 _onData({ receivedMessages }) {
439 for (const data of receivedMessages) {
440 const message = new Message(this, data);
441 const span = this._constructSpan(message);
442 if (this.isOpen) {
443 message.modAck(this.ackDeadline);
444 this._inventory.add(message);
445 }
446 else {
447 message.nack();
448 }
449 if (span) {
450 span.end();
451 }
452 }
453 }
454 /**
455 * Returns a promise that will resolve once all pending requests have settled.
456 *
457 * @private
458 *
459 * @returns {Promise}
460 */
461 async _waitForFlush() {
462 const promises = [];
463 if (this._acks.numPendingRequests) {
464 promises.push(this._acks.onFlush());
465 this._acks.flush();
466 }
467 if (this._modAcks.numPendingRequests) {
468 promises.push(this._modAcks.onFlush());
469 this._modAcks.flush();
470 }
471 if (this._acks.numInFlightRequests) {
472 promises.push(this._acks.onDrain());
473 }
474 if (this._modAcks.numInFlightRequests) {
475 promises.push(this._modAcks.onDrain());
476 }
477 await Promise.all(promises);
478 }
479}
480exports.Subscriber = Subscriber;
481//# sourceMappingURL=subscriber.js.map
\No newline at end of file