1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.Subscriber = exports.Message = void 0;
|
19 | const precise_date_1 = require("@google-cloud/precise-date");
|
20 | const projectify_1 = require("@google-cloud/projectify");
|
21 | const promisify_1 = require("@google-cloud/promisify");
|
22 | const events_1 = require("events");
|
23 | const api_1 = require("@opentelemetry/api");
|
24 | const semantic_conventions_1 = require("@opentelemetry/semantic-conventions");
|
25 | const histogram_1 = require("./histogram");
|
26 | const lease_manager_1 = require("./lease-manager");
|
27 | const message_queues_1 = require("./message-queues");
|
28 | const message_stream_1 = require("./message-stream");
|
29 | const default_options_1 = require("./default-options");
|
30 | const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
|
31 | /**
|
32 | * Date object with nanosecond precision. Supports all standard Date arguments
|
33 | * in addition to several custom types.
|
34 | *
|
35 | * @external PreciseDate
|
36 | * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
|
37 | */
|
38 | /**
|
39 | * Message objects provide a simple interface for users to get message data and
|
40 | * acknowledge the message.
|
41 | *
|
42 | * @example
|
43 | * ```
|
44 | * subscription.on('message', message => {
|
45 | * // {
|
46 | * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
|
47 | * // attributes: {key: 'value'},
|
48 | * // data: Buffer.from('Hello, world!'),
|
49 | * // id: '1551297743043',
|
50 | * // orderingKey: 'ordering-key',
|
51 | * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
|
52 | * // received: 1551297743043,
|
53 | * // length: 13
|
54 | * // }
|
55 | * });
|
56 | * ```
|
57 | */
|
58 | class Message {
|
59 | /**
|
60 | * @hideconstructor
|
61 | *
|
62 | * @param {Subscriber} sub The parent subscriber.
|
63 | * @param {object} message The raw message response.
|
64 | */
|
65 | constructor(sub, { ackId, message, deliveryAttempt }) {
|
66 | /**
|
67 | * This ID is used to acknowledge the message.
|
68 | *
|
69 | * @name Message#ackId
|
70 | * @type {string}
|
71 | */
|
72 | this.ackId = ackId;
|
73 | /**
|
74 | * Optional attributes for this message.
|
75 | *
|
76 | * @name Message#attributes
|
77 | * @type {object}
|
78 | */
|
79 | this.attributes = message.attributes || {};
|
80 | /**
|
81 | * The message data as a Buffer.
|
82 | *
|
83 | * @name Message#data
|
84 | * @type {Buffer}
|
85 | */
|
86 | this.data = message.data;
|
87 | /**
|
88 | * Delivery attempt counter is 1 + (the sum of number of NACKs and number of
|
89 | * ack_deadline exceeds) for this message.
|
90 | *
|
91 | * @name Message#deliveryAttempt
|
92 | * @type {number}
|
93 | */
|
94 | this.deliveryAttempt = Number(deliveryAttempt || 0);
|
95 | /**
|
96 | * ID of the message, assigned by the server when the message is published.
|
97 | * Guaranteed to be unique within the topic.
|
98 | *
|
99 | * @name Message#id
|
100 | * @type {string}
|
101 | */
|
102 | this.id = message.messageId;
|
103 | /**
|
104 | * Identifies related messages for which publish order should be respected.
|
105 | * If a `Subscription` has `enableMessageOrdering` set to `true`, messages
|
106 | * published with the same `orderingKey` value will be delivered to
|
107 | * subscribers in the order in which they are received by the Pub/Sub
|
108 | * system.
|
109 | *
|
110 | * **EXPERIMENTAL:** This feature is part of a closed alpha release. This
|
111 | * API might be changed in backward-incompatible ways and is not recommended
|
112 | * for production use. It is not subject to any SLA or deprecation policy.
|
113 | *
|
114 | * @name Message#orderingKey
|
115 | * @type {string}
|
116 | */
|
117 | this.orderingKey = message.orderingKey;
|
118 | /**
|
119 | * The time at which the message was published.
|
120 | *
|
121 | * @name Message#publishTime
|
122 | * @type {external:PreciseDate}
|
123 | */
|
124 | this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
|
125 | /**
|
126 | * The time at which the message was recieved by the subscription.
|
127 | *
|
128 | * @name Message#received
|
129 | * @type {number}
|
130 | */
|
131 | this.received = Date.now();
|
132 | this._handled = false;
|
133 | this._length = this.data.length;
|
134 | this._subscriber = sub;
|
135 | }
|
136 | /**
|
137 | * The length of the message data.
|
138 | *
|
139 | * @type {number}
|
140 | */
|
141 | get length() {
|
142 | return this._length;
|
143 | }
|
144 | /**
|
145 | * Acknowledges the message.
|
146 | *
|
147 | * @example
|
148 | * ```
|
149 | * subscription.on('message', message => {
|
150 | * message.ack();
|
151 | * });
|
152 | * ```
|
153 | */
|
154 | ack() {
|
155 | if (!this._handled) {
|
156 | this._handled = true;
|
157 | this._subscriber.ack(this);
|
158 | }
|
159 | }
|
160 | /**
|
161 | * Modifies the ack deadline.
|
162 | *
|
163 | * @param {number} deadline The number of seconds to extend the deadline.
|
164 | * @private
|
165 | */
|
166 | modAck(deadline) {
|
167 | if (!this._handled) {
|
168 | this._subscriber.modAck(this, deadline);
|
169 | }
|
170 | }
|
171 | /**
|
172 | * Removes the message from our inventory and schedules it to be redelivered.
|
173 | *
|
174 | * @example
|
175 | * ```
|
176 | * subscription.on('message', message => {
|
177 | * message.nack();
|
178 | * });
|
179 | * ```
|
180 | */
|
181 | nack() {
|
182 | if (!this._handled) {
|
183 | this._handled = true;
|
184 | this._subscriber.nack(this);
|
185 | }
|
186 | }
|
187 | }
|
188 | exports.Message = Message;
|
189 | /**
|
190 | * @typedef {object} SubscriberOptions
|
191 | * @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left
|
192 | * unset the initial value will be 10 seconds, but it will evolve into the
|
193 | * 99th percentile time it takes to acknowledge a message.
|
194 | * @property {BatchOptions} [batching] Request batching options.
|
195 | * @property {FlowControlOptions} [flowControl] Flow control options.
|
196 | * @property {boolean} [useLegacyFlowControl] Disables enforcing flow control
|
197 | * settings at the Cloud PubSub server and uses the less accurate method
|
198 | * of only enforcing flow control at the client side.
|
199 | * @property {MessageStreamOptions} [streamingOptions] Streaming options.
|
200 | */
|
201 | /**
|
202 | * Subscriber class is used to manage all message related functionality.
|
203 | *
|
204 | * @private
|
205 | * @class
|
206 | *
|
207 | * @param {Subscription} subscription The corresponding subscription.
|
208 | * @param {SubscriberOptions} options The subscriber options.
|
209 | */
|
210 | class Subscriber extends events_1.EventEmitter {
|
211 | constructor(subscription, options = {}) {
|
212 | super();
|
213 | this.ackDeadline = 10;
|
214 | this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages;
|
215 | this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes;
|
216 | this.useLegacyFlowControl = false;
|
217 | this.isOpen = false;
|
218 | this._isUserSetDeadline = false;
|
219 | this._useOpentelemetry = false;
|
220 | this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
|
221 | this._latencies = new histogram_1.Histogram();
|
222 | this._subscription = subscription;
|
223 | this.setOptions(options);
|
224 | }
|
225 | /**
|
226 | * The 99th percentile of request latencies.
|
227 | *
|
228 | * @type {number}
|
229 | * @private
|
230 | */
|
231 | get modAckLatency() {
|
232 | const latency = this._latencies.percentile(99);
|
233 | let bufferTime = 0;
|
234 | if (this._modAcks) {
|
235 | bufferTime = this._modAcks.maxMilliseconds;
|
236 | }
|
237 | return latency * 1000 + bufferTime;
|
238 | }
|
239 | /**
|
240 | * The full name of the Subscription.
|
241 | *
|
242 | * @type {string}
|
243 | * @private
|
244 | */
|
245 | get name() {
|
246 | if (!this._name) {
|
247 | const { name, projectId } = this._subscription;
|
248 | this._name = projectify_1.replaceProjectIdToken(name, projectId);
|
249 | }
|
250 | return this._name;
|
251 | }
|
252 | /**
|
253 | * Acknowledges the supplied message.
|
254 | *
|
255 | * @param {Message} message The message to acknowledge.
|
256 | * @returns {Promise}
|
257 | * @private
|
258 | */
|
259 | async ack(message) {
|
260 | if (!this._isUserSetDeadline) {
|
261 | const ackTimeSeconds = (Date.now() - message.received) / 1000;
|
262 | this._histogram.add(ackTimeSeconds);
|
263 | this.ackDeadline = this._histogram.percentile(99);
|
264 | }
|
265 | this._acks.add(message);
|
266 | await this._acks.onFlush();
|
267 | this._inventory.remove(message);
|
268 | }
|
269 | /**
|
270 | * Closes the subscriber. The returned promise will resolve once any pending
|
271 | * acks/modAcks are finished.
|
272 | *
|
273 | * @returns {Promise}
|
274 | * @private
|
275 | */
|
276 | async close() {
|
277 | if (!this.isOpen) {
|
278 | return;
|
279 | }
|
280 | this.isOpen = false;
|
281 | this._stream.destroy();
|
282 | this._inventory.clear();
|
283 | await this._waitForFlush();
|
284 | this.emit('close');
|
285 | }
|
286 | /**
|
287 | * Gets the subscriber client instance.
|
288 | *
|
289 | * @returns {Promise<object>}
|
290 | * @private
|
291 | */
|
292 | async getClient() {
|
293 | const pubsub = this._subscription.pubsub;
|
294 | const [client] = await promisify_1.promisify(pubsub.getClient_).call(pubsub, {
|
295 | client: 'SubscriberClient',
|
296 | });
|
297 | return client;
|
298 | }
|
299 | /**
|
300 | * Modifies the acknowledge deadline for the provided message.
|
301 | *
|
302 | * @param {Message} message The message to modify.
|
303 | * @param {number} deadline The deadline.
|
304 | * @returns {Promise}
|
305 | * @private
|
306 | */
|
307 | async modAck(message, deadline) {
|
308 | const startTime = Date.now();
|
309 | this._modAcks.add(message, deadline);
|
310 | await this._modAcks.onFlush();
|
311 | const latency = (Date.now() - startTime) / 1000;
|
312 | this._latencies.add(latency);
|
313 | }
|
314 | /**
|
315 | * Modfies the acknowledge deadline for the provided message and then removes
|
316 | * it from our inventory.
|
317 | *
|
318 | * @param {Message} message The message.
|
319 | * @return {Promise}
|
320 | * @private
|
321 | */
|
322 | async nack(message) {
|
323 | await this.modAck(message, 0);
|
324 | this._inventory.remove(message);
|
325 | }
|
326 | /**
|
327 | * Starts pulling messages.
|
328 | * @private
|
329 | */
|
330 | open() {
|
331 | const { batching, flowControl, streamingOptions } = this._options;
|
332 | this._acks = new message_queues_1.AckQueue(this, batching);
|
333 | this._modAcks = new message_queues_1.ModAckQueue(this, batching);
|
334 | this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
|
335 | this._stream = new message_stream_1.MessageStream(this, streamingOptions);
|
336 | this._stream
|
337 | .on('error', err => this.emit('error', err))
|
338 | .on('debug', err => this.emit('debug', err))
|
339 | .on('data', (data) => this._onData(data))
|
340 | .once('close', () => this.close());
|
341 | this._inventory
|
342 | .on('full', () => this._stream.pause())
|
343 | .on('free', () => this._stream.resume());
|
344 | this.isOpen = true;
|
345 | }
|
346 | /**
|
347 | * Sets subscriber options.
|
348 | *
|
349 | * @param {SubscriberOptions} options The options.
|
350 | * @private
|
351 | */
|
352 | setOptions(options) {
|
353 | this._options = options;
|
354 | this._useOpentelemetry = options.enableOpenTelemetryTracing || false;
|
355 | if (options.ackDeadline) {
|
356 | this.ackDeadline = options.ackDeadline;
|
357 | this._isUserSetDeadline = true;
|
358 | }
|
359 | this.useLegacyFlowControl = options.useLegacyFlowControl || false;
|
360 | if (options.flowControl) {
|
361 | this.maxMessages =
|
362 | options.flowControl.maxMessages ||
|
363 | default_options_1.defaultOptions.subscription.maxOutstandingMessages;
|
364 | this.maxBytes =
|
365 | options.flowControl.maxBytes ||
|
366 | default_options_1.defaultOptions.subscription.maxOutstandingBytes;
|
367 | // In the event that the user has specified the maxMessages option, we
|
368 | // want to make sure that the maxStreams option isn't higher.
|
369 | // It doesn't really make sense to open 5 streams if the user only wants
|
370 | // 1 message at a time.
|
371 | if (!options.streamingOptions) {
|
372 | options.streamingOptions = {};
|
373 | }
|
374 | const { maxStreams = default_options_1.defaultOptions.subscription.maxStreams } = options.streamingOptions;
|
375 | options.streamingOptions.maxStreams = Math.min(maxStreams, this.maxMessages);
|
376 | }
|
377 | }
|
378 | /**
|
379 | * Constructs an OpenTelemetry span from the incoming message.
|
380 | *
|
381 | * @param {Message} message One of the received messages
|
382 | * @private
|
383 | */
|
384 | _constructSpan(message) {
|
385 | // Handle cases where OpenTelemetry is disabled or no span context was sent through message
|
386 | if (!this._useOpentelemetry ||
|
387 | !message.attributes ||
|
388 | !message.attributes['googclient_OpenTelemetrySpanContext']) {
|
389 | return undefined;
|
390 | }
|
391 | const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
|
392 | const parentSpanContext = spanValue
|
393 | ? JSON.parse(spanValue)
|
394 | : undefined;
|
395 | const spanAttributes = {
|
396 | // Original span attributes
|
397 | ackId: message.ackId,
|
398 | deliveryAttempt: message.deliveryAttempt,
|
399 | //
|
400 | // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
|
401 | [semantic_conventions_1.SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub',
|
402 | [semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: 'process',
|
403 | [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: this.name,
|
404 | [semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic',
|
405 | [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id,
|
406 | [semantic_conventions_1.SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub',
|
407 | [semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: message.data.length,
|
408 | // Not in Opentelemetry semantic convention but mimics naming
|
409 | 'messaging.pubsub.received_at': message.received,
|
410 | 'messaging.pubsub.acknowlege_id': message.ackId,
|
411 | 'messaging.pubsub.delivery_attempt': message.deliveryAttempt,
|
412 | };
|
413 | // Subscriber spans should always have a publisher span as a parent.
|
414 | // Return undefined if no parent is provided
|
415 | const spanName = `${this.name} process`;
|
416 | const span = parentSpanContext
|
417 | ? opentelemetry_tracing_1.createSpan(spanName.trim(), api_1.SpanKind.CONSUMER, spanAttributes, parentSpanContext)
|
418 | : undefined;
|
419 | return span;
|
420 | }
|
421 | /**
|
422 | * Callback to be invoked when a new message is available.
|
423 | *
|
424 | * New messages will be added to the subscribers inventory, which in turn will
|
425 | * automatically extend the messages ack deadline until either:
|
426 | * a. the user acks/nacks it
|
427 | * b. the maxExtension option is hit
|
428 | *
|
429 | * If the message puts us at/over capacity, then we'll pause our message
|
430 | * stream until we've freed up some inventory space.
|
431 | *
|
432 | * New messages must immediately issue a ModifyAckDeadline request
|
433 | * (aka receipt) to confirm with the backend that we did infact receive the
|
434 | * message and its ok to start ticking down on the deadline.
|
435 | *
|
436 | * @private
|
437 | */
|
438 | _onData({ receivedMessages }) {
|
439 | for (const data of receivedMessages) {
|
440 | const message = new Message(this, data);
|
441 | const span = this._constructSpan(message);
|
442 | if (this.isOpen) {
|
443 | message.modAck(this.ackDeadline);
|
444 | this._inventory.add(message);
|
445 | }
|
446 | else {
|
447 | message.nack();
|
448 | }
|
449 | if (span) {
|
450 | span.end();
|
451 | }
|
452 | }
|
453 | }
|
454 | /**
|
455 | * Returns a promise that will resolve once all pending requests have settled.
|
456 | *
|
457 | * @private
|
458 | *
|
459 | * @returns {Promise}
|
460 | */
|
461 | async _waitForFlush() {
|
462 | const promises = [];
|
463 | if (this._acks.numPendingRequests) {
|
464 | promises.push(this._acks.onFlush());
|
465 | this._acks.flush();
|
466 | }
|
467 | if (this._modAcks.numPendingRequests) {
|
468 | promises.push(this._modAcks.onFlush());
|
469 | this._modAcks.flush();
|
470 | }
|
471 | if (this._acks.numInFlightRequests) {
|
472 | promises.push(this._acks.onDrain());
|
473 | }
|
474 | if (this._modAcks.numInFlightRequests) {
|
475 | promises.push(this._modAcks.onDrain());
|
476 | }
|
477 | await Promise.all(promises);
|
478 | }
|
479 | }
|
480 | exports.Subscriber = Subscriber;
|
481 | //# sourceMappingURL=subscriber.js.map |
\ | No newline at end of file |