UNPKG

17 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscriber = exports.Message = void 0;
19const precise_date_1 = require("@google-cloud/precise-date");
20const projectify_1 = require("@google-cloud/projectify");
21const promisify_1 = require("@google-cloud/promisify");
22const events_1 = require("events");
23const api_1 = require("@opentelemetry/api");
24const semantic_conventions_1 = require("@opentelemetry/semantic-conventions");
25const histogram_1 = require("./histogram");
26const lease_manager_1 = require("./lease-manager");
27const message_queues_1 = require("./message-queues");
28const message_stream_1 = require("./message-stream");
29const default_options_1 = require("./default-options");
30const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
31/**
32 * Date object with nanosecond precision. Supports all standard Date arguments
33 * in addition to several custom types.
34 *
35 * @external PreciseDate
36 * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
37 */
38/**
39 * Message objects provide a simple interface for users to get message data and
40 * acknowledge the message.
41 *
42 * @example
43 * subscription.on('message', message => {
44 * // {
45 * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
46 * // attributes: {key: 'value'},
47 * // data: Buffer.from('Hello, world!'),
48 * // id: '1551297743043',
49 * // orderingKey: 'ordering-key',
50 * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
51 * // received: 1551297743043,
52 * // length: 13
53 * // }
54 * });
55 */
56class Message {
57 /**
58 * @hideconstructor
59 *
60 * @param {Subscriber} sub The parent subscriber.
61 * @param {object} message The raw message response.
62 */
63 constructor(sub, { ackId, message, deliveryAttempt }) {
64 /**
65 * This ID is used to acknowledge the message.
66 *
67 * @name Message#ackId
68 * @type {string}
69 */
70 this.ackId = ackId;
71 /**
72 * Optional attributes for this message.
73 *
74 * @name Message#attributes
75 * @type {object}
76 */
77 this.attributes = message.attributes || {};
78 /**
79 * The message data as a Buffer.
80 *
81 * @name Message#data
82 * @type {Buffer}
83 */
84 this.data = message.data;
85 /**
86 * Delivery attempt counter is 1 + (the sum of number of NACKs and number of
87 * ack_deadline exceeds) for this message.
88 *
89 * @name Message#deliveryAttempt
90 * @type {number}
91 */
92 this.deliveryAttempt = Number(deliveryAttempt || 0);
93 /**
94 * ID of the message, assigned by the server when the message is published.
95 * Guaranteed to be unique within the topic.
96 *
97 * @name Message#id
98 * @type {string}
99 */
100 this.id = message.messageId;
101 /**
102 * Identifies related messages for which publish order should be respected.
103 * If a `Subscription` has `enableMessageOrdering` set to `true`, messages
104 * published with the same `orderingKey` value will be delivered to
105 * subscribers in the order in which they are received by the Pub/Sub
106 * system.
107 *
108 * **EXPERIMENTAL:** This feature is part of a closed alpha release. This
109 * API might be changed in backward-incompatible ways and is not recommended
110 * for production use. It is not subject to any SLA or deprecation policy.
111 *
112 * @name Message#orderingKey
113 * @type {string}
114 */
115 this.orderingKey = message.orderingKey;
116 /**
117 * The time at which the message was published.
118 *
119 * @name Message#publishTime
120 * @type {external:PreciseDate}
121 */
122 this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
123 /**
124 * The time at which the message was recieved by the subscription.
125 *
126 * @name Message#received
127 * @type {number}
128 */
129 this.received = Date.now();
130 this._handled = false;
131 this._length = this.data.length;
132 this._subscriber = sub;
133 }
134 /**
135 * The length of the message data.
136 *
137 * @type {number}
138 */
139 get length() {
140 return this._length;
141 }
142 /**
143 * Acknowledges the message.
144 *
145 * @example
146 * subscription.on('message', message => {
147 * message.ack();
148 * });
149 */
150 ack() {
151 if (!this._handled) {
152 this._handled = true;
153 this._subscriber.ack(this);
154 }
155 }
156 /**
157 * Modifies the ack deadline.
158 *
159 * @param {number} deadline The number of seconds to extend the deadline.
160 * @private
161 */
162 modAck(deadline) {
163 if (!this._handled) {
164 this._subscriber.modAck(this, deadline);
165 }
166 }
167 /**
168 * Removes the message from our inventory and schedules it to be redelivered.
169 *
170 * @example
171 * subscription.on('message', message => {
172 * message.nack();
173 * });
174 */
175 nack() {
176 if (!this._handled) {
177 this._handled = true;
178 this._subscriber.nack(this);
179 }
180 }
181}
182exports.Message = Message;
183/**
184 * @typedef {object} SubscriberOptions
185 * @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left
186 * unset the initial value will be 10 seconds, but it will evolve into the
187 * 99th percentile time it takes to acknowledge a message.
188 * @property {BatchOptions} [batching] Request batching options.
189 * @property {FlowControlOptions} [flowControl] Flow control options.
190 * @property {boolean} [useLegacyFlowControl] Disables enforcing flow control
191 * settings at the Cloud PubSub server and uses the less accurate method
192 * of only enforcing flow control at the client side.
193 * @property {MessageStreamOptions} [streamingOptions] Streaming options.
194 */
195/**
196 * Subscriber class is used to manage all message related functionality.
197 *
198 * @private
199 * @class
200 *
201 * @param {Subscription} subscription The corresponding subscription.
202 * @param {SubscriberOptions} options The subscriber options.
203 */
204class Subscriber extends events_1.EventEmitter {
205 constructor(subscription, options = {}) {
206 super();
207 this.ackDeadline = 10;
208 this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages;
209 this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes;
210 this.useLegacyFlowControl = false;
211 this.isOpen = false;
212 this._isUserSetDeadline = false;
213 this._useOpentelemetry = false;
214 this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
215 this._latencies = new histogram_1.Histogram();
216 this._subscription = subscription;
217 this.setOptions(options);
218 }
219 /**
220 * The 99th percentile of request latencies.
221 *
222 * @type {number}
223 * @private
224 */
225 get modAckLatency() {
226 const latency = this._latencies.percentile(99);
227 let bufferTime = 0;
228 if (this._modAcks) {
229 bufferTime = this._modAcks.maxMilliseconds;
230 }
231 return latency * 1000 + bufferTime;
232 }
233 /**
234 * The full name of the Subscription.
235 *
236 * @type {string}
237 * @private
238 */
239 get name() {
240 if (!this._name) {
241 const { name, projectId } = this._subscription;
242 this._name = projectify_1.replaceProjectIdToken(name, projectId);
243 }
244 return this._name;
245 }
246 /**
247 * Acknowledges the supplied message.
248 *
249 * @param {Message} message The message to acknowledge.
250 * @returns {Promise}
251 * @private
252 */
253 async ack(message) {
254 if (!this._isUserSetDeadline) {
255 const ackTimeSeconds = (Date.now() - message.received) / 1000;
256 this._histogram.add(ackTimeSeconds);
257 this.ackDeadline = this._histogram.percentile(99);
258 }
259 this._acks.add(message);
260 await this._acks.onFlush();
261 this._inventory.remove(message);
262 }
263 /**
264 * Closes the subscriber. The returned promise will resolve once any pending
265 * acks/modAcks are finished.
266 *
267 * @returns {Promise}
268 * @private
269 */
270 async close() {
271 if (!this.isOpen) {
272 return;
273 }
274 this.isOpen = false;
275 this._stream.destroy();
276 this._inventory.clear();
277 await this._waitForFlush();
278 this.emit('close');
279 }
280 /**
281 * Gets the subscriber client instance.
282 *
283 * @returns {Promise<object>}
284 * @private
285 */
286 async getClient() {
287 const pubsub = this._subscription.pubsub;
288 const [client] = await promisify_1.promisify(pubsub.getClient_).call(pubsub, {
289 client: 'SubscriberClient',
290 });
291 return client;
292 }
293 /**
294 * Modifies the acknowledge deadline for the provided message.
295 *
296 * @param {Message} message The message to modify.
297 * @param {number} deadline The deadline.
298 * @returns {Promise}
299 * @private
300 */
301 async modAck(message, deadline) {
302 const startTime = Date.now();
303 this._modAcks.add(message, deadline);
304 await this._modAcks.onFlush();
305 const latency = (Date.now() - startTime) / 1000;
306 this._latencies.add(latency);
307 }
308 /**
309 * Modfies the acknowledge deadline for the provided message and then removes
310 * it from our inventory.
311 *
312 * @param {Message} message The message.
313 * @return {Promise}
314 * @private
315 */
316 async nack(message) {
317 await this.modAck(message, 0);
318 this._inventory.remove(message);
319 }
320 /**
321 * Starts pulling messages.
322 * @private
323 */
324 open() {
325 const { batching, flowControl, streamingOptions } = this._options;
326 this._acks = new message_queues_1.AckQueue(this, batching);
327 this._modAcks = new message_queues_1.ModAckQueue(this, batching);
328 this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
329 this._stream = new message_stream_1.MessageStream(this, streamingOptions);
330 this._stream
331 .on('error', err => this.emit('error', err))
332 .on('data', (data) => this._onData(data))
333 .once('close', () => this.close());
334 this._inventory
335 .on('full', () => this._stream.pause())
336 .on('free', () => this._stream.resume());
337 this.isOpen = true;
338 }
339 /**
340 * Sets subscriber options.
341 *
342 * @param {SubscriberOptions} options The options.
343 * @private
344 */
345 setOptions(options) {
346 this._options = options;
347 this._useOpentelemetry = options.enableOpenTelemetryTracing || false;
348 if (options.ackDeadline) {
349 this.ackDeadline = options.ackDeadline;
350 this._isUserSetDeadline = true;
351 }
352 this.useLegacyFlowControl = options.useLegacyFlowControl || false;
353 if (options.flowControl) {
354 this.maxMessages =
355 options.flowControl.maxMessages ||
356 default_options_1.defaultOptions.subscription.maxOutstandingMessages;
357 this.maxBytes =
358 options.flowControl.maxBytes ||
359 default_options_1.defaultOptions.subscription.maxOutstandingBytes;
360 // In the event that the user has specified the maxMessages option, we
361 // want to make sure that the maxStreams option isn't higher.
362 // It doesn't really make sense to open 5 streams if the user only wants
363 // 1 message at a time.
364 if (!options.streamingOptions) {
365 options.streamingOptions = {};
366 }
367 const { maxStreams = default_options_1.defaultOptions.subscription.maxStreams } = options.streamingOptions;
368 options.streamingOptions.maxStreams = Math.min(maxStreams, this.maxMessages);
369 }
370 }
371 /**
372 * Constructs an OpenTelemetry span from the incoming message.
373 *
374 * @param {Message} message One of the received messages
375 * @private
376 */
377 _constructSpan(message) {
378 // Handle cases where OpenTelemetry is disabled or no span context was sent through message
379 if (!this._useOpentelemetry ||
380 !message.attributes ||
381 !message.attributes['googclient_OpenTelemetrySpanContext']) {
382 return undefined;
383 }
384 const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
385 const parentSpanContext = spanValue
386 ? JSON.parse(spanValue)
387 : undefined;
388 const spanAttributes = {
389 // Original span attributes
390 ackId: message.ackId,
391 deliveryAttempt: message.deliveryAttempt,
392 //
393 // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
394 [semantic_conventions_1.SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub',
395 [semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: 'process',
396 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: this.name,
397 [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic',
398 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id,
399 [semantic_conventions_1.SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub',
400 [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: message.data.length,
401 // Not in Opentelemetry semantic convention but mimics naming
402 'messaging.pubsub.received_at': message.received,
403 'messaging.pubsub.acknowlege_id': message.ackId,
404 'messaging.pubsub.delivery_attempt': message.deliveryAttempt,
405 };
406 // Subscriber spans should always have a publisher span as a parent.
407 // Return undefined if no parent is provided
408 const spanName = `${this.name} process`;
409 const span = parentSpanContext
410 ? opentelemetry_tracing_1.createSpan(spanName.trim(), api_1.SpanKind.CONSUMER, spanAttributes, parentSpanContext)
411 : undefined;
412 return span;
413 }
414 /**
415 * Callback to be invoked when a new message is available.
416 *
417 * New messages will be added to the subscribers inventory, which in turn will
418 * automatically extend the messages ack deadline until either:
419 * a. the user acks/nacks it
420 * b. the maxExtension option is hit
421 *
422 * If the message puts us at/over capacity, then we'll pause our message
423 * stream until we've freed up some inventory space.
424 *
425 * New messages must immediately issue a ModifyAckDeadline request
426 * (aka receipt) to confirm with the backend that we did infact receive the
427 * message and its ok to start ticking down on the deadline.
428 *
429 * @private
430 */
431 _onData({ receivedMessages }) {
432 for (const data of receivedMessages) {
433 const message = new Message(this, data);
434 const span = this._constructSpan(message);
435 if (this.isOpen) {
436 message.modAck(this.ackDeadline);
437 this._inventory.add(message);
438 }
439 else {
440 message.nack();
441 }
442 if (span) {
443 span.end();
444 }
445 }
446 }
447 /**
448 * Returns a promise that will resolve once all pending requests have settled.
449 *
450 * @private
451 *
452 * @returns {Promise}
453 */
454 async _waitForFlush() {
455 const promises = [];
456 if (this._acks.numPendingRequests) {
457 promises.push(this._acks.onFlush());
458 this._acks.flush();
459 }
460 if (this._modAcks.numPendingRequests) {
461 promises.push(this._modAcks.onFlush());
462 this._modAcks.flush();
463 }
464 if (this._acks.numInFlightRequests) {
465 promises.push(this._acks.onDrain());
466 }
467 if (this._modAcks.numInFlightRequests) {
468 promises.push(this._modAcks.onDrain());
469 }
470 await Promise.all(promises);
471 }
472}
473exports.Subscriber = Subscriber;
474//# sourceMappingURL=subscriber.js.map
\No newline at end of file