UNPKG

33.4 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscription = void 0;
19const promisify_1 = require("@google-cloud/promisify");
20const events_1 = require("events");
21const extend = require("extend");
22const snakeCase = require("lodash.snakecase");
23const iam_1 = require("./iam");
24const snapshot_1 = require("./snapshot");
25const subscriber_1 = require("./subscriber");
26// JSDoc won't see these, so this is just to let you get typings
27// in your editor of choice.
28//
29// NOTE: These are commented out for now because we don't want to
30// break any existing clients that rely on not-entirely-correct
31// typings. We'll re-enable on the next major.
32/* export declare interface Subscription {
33 on(
34 event: 'message',
35 listener: (message: Message) => void
36 ): this;
37 on(
38 event: 'error',
39 listener: (error: StatusError) => void
40 ): this;
41 on(event: 'close', listener: () => void): this;
42
43 // Only used internally.
44 on(event: 'newListener', listener: Function): this;
45 on(event: 'removeListener', listener: Function): this;
46
47 // Catch-all. If you get an error about this line, it means you're
48 // using an unsupported event type or listener type.
49 on(event: string, listener: void): this;
50} */
51/**
52 * @typedef {object} ExpirationPolicy
53 * A policy that specifies the conditions for this subscription's expiration. A
54 * subscription is considered active as long as any connected subscriber is
55 * successfully consuming messages from the subscription or is issuing
56 * operations on the subscription. If expirationPolicy is not set, a default
57 * policy with ttl of 31 days will be used. The minimum allowed value for
58 * expirationPolicy.ttl is 1 day.
59 * @property {google.protobuf.Duration} ttl Specifies the "time-to-live"
60 * duration for an associated resource. The resource expires if it is not
61 * active for a period of `ttl`. The definition of "activity" depends on the
62 * type of the associated resource. The minimum and maximum allowed values
63 * for `ttl` depend on the type of the associated resource, as well. If
64 * `ttl` is not set, the associated resource never expires.
65 */
66/**
67 * A Subscription object will give you access to your Cloud Pub/Sub
68 * subscription.
69 *
70 * Subscriptions are sometimes retrieved when using various methods:
71 *
72 * - {@link PubSub#getSubscriptions}
73 * - {@link Topic#getSubscriptions}
74 *
75 * Subscription objects may be created directly with:
76 *
77 * - {@link PubSub#createSubscription}
78 * - {@link Topic#createSubscription}
79 *
80 * All Subscription objects are instances of an
81 * [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull
82 * for messages automatically as long as there is at least one listener assigned
83 * for the `message` event. Available events:
84 *
85 * Upon receipt of a message:
86 * on(event: 'message', listener: (message: {@link Message}) => void): this;
87 *
88 * Upon receipt of an error:
89 * on(event: 'error', listener: (error: Error) => void): this;
90 *
91 * Upon the closing of the subscriber:
92 * on(event: 'close', listener: Function): this;
93 *
94 * By default Subscription objects allow you to process 100 messages at the same
95 * time. You can fine tune this value by adjusting the
96 * `options.flowControl.maxMessages` option.
97 *
98 * If your subscription is seeing more re-deliveries than preferable, you might
99 * try increasing your `options.ackDeadline` value or decreasing the
100 * `options.streamingOptions.maxStreams` value.
101 *
102 * Subscription objects handle ack management, by automatically extending the
103 * ack deadline while the message is being processed, to then issue the ack or
104 * nack of such message when the processing is done. **Note:** message
105 * redelivery is still possible.
106 *
107 * By default each {@link PubSub} instance can handle 100 open streams, with
108 * default options this translates to less than 20 Subscriptions per PubSub
109 * instance. If you wish to create more Subscriptions than that, you can either
110 * create multiple PubSub instances or lower the
111 * `options.streamingOptions.maxStreams` value on each Subscription object.
112 *
113 * @class
114 *
115 * @param {PubSub} pubsub PubSub object.
116 * @param {string} name The name of the subscription.
117 * @param {SubscriberOptions} [options] Options for handling messages.
118 *
119 * @example <caption>From {@link PubSub#getSubscriptions}</caption>
120 * const {PubSub} = require('@google-cloud/pubsub');
121 * const pubsub = new PubSub();
122 *
123 * pubsub.getSubscriptions((err, subscriptions) => {
124 * // `subscriptions` is an array of Subscription objects.
125 * });
126 *
127 * @example <caption>From {@link Topic#getSubscriptions}</caption>
128 * const topic = pubsub.topic('my-topic');
129 * topic.getSubscriptions((err, subscriptions) => {
130 * // `subscriptions` is an array of Subscription objects.
131 * });
132 *
133 * @example <caption>{@link Topic#createSubscription}</caption>
134 * const topic = pubsub.topic('my-topic');
135 * topic.createSubscription('new-subscription', (err, subscription) => {
136 * // `subscription` is a Subscription object.
137 * });
138 *
139 * @example <caption>{@link Topic#subscription}</caption>
140 * const topic = pubsub.topic('my-topic');
141 * const subscription = topic.subscription('my-subscription');
142 * // `subscription` is a Subscription object.
143 *
144 * @example <caption>Once you have obtained a subscription object, you may begin
145 * to register listeners. This will automatically trigger pulling for messages.
146 * </caption>
147 * // Register an error handler.
148 * subscription.on('error', (err) => {});
149 *
150 * // Register a close handler in case the subscriber closes unexpectedly
151 * subscription.on('close', () => {});
152 *
153 * // Register a listener for `message` events.
154 * function onMessage(message) {
155 * // Called every time a message is received.
156 *
157 * // message.id = ID of the message.
158 * // message.ackId = ID used to acknowledge the message receival.
159 * // message.data = Contents of the message.
160 * // message.attributes = Attributes of the message.
161 * // message.publishTime = Date when Pub/Sub received the message.
162 *
163 * // Ack the message:
164 * // message.ack();
165 *
166 * // This doesn't ack the message, but allows more messages to be retrieved
167 * // if your limit was hit or if you don't want to ack the message.
168 * // message.nack();
169 * }
170 * subscription.on('message', onMessage);
171 *
172 * // Remove the listener from receiving `message` events.
173 * subscription.removeListener('message', onMessage);
174 *
175 * @example <caption>To apply a fine level of flow control, consider the
176 * following configuration</caption>
177 * const subscription = topic.subscription('my-sub', {
178 * flowControl: {
179 * maxMessages: 1,
180 * // this tells the client to manage and lock any excess messages
181 * allowExcessMessages: false
182 * }
183 * });
184 */
185class Subscription extends events_1.EventEmitter {
186 constructor(pubsub, name, options) {
187 super();
188 options = options || {};
189 this.pubsub = pubsub;
190 this.request = pubsub.request.bind(pubsub);
191 this.name = Subscription.formatName_(this.projectId, name);
192 this.topic = options.topic;
193 /**
194 * [IAM (Identity and Access
195 * Management)](https://cloud.google.com/pubsub/access_control) allows you
196 * to set permissions on individual resources and offers a wider range of
197 * roles: editor, owner, publisher, subscriber, and viewer. This gives you
198 * greater flexibility and allows you to set more fine-grained access
199 * control.
200 *
201 * *The IAM access control features described in this document are Beta,
202 * including the API methods to get and set IAM policies, and to test IAM
203 * permissions. Cloud Pub/Sub's use of IAM features is not covered by
204 * any SLA or deprecation policy, and may be subject to
205 * backward-incompatible changes.*
206 *
207 * @name Subscription#iam
208 * @mixes IAM
209 *
210 * @see [Access Control Overview]{@link https://cloud.google.com/pubsub/access_control}
211 * @see [What is Cloud IAM?]{@link https://cloud.google.com/iam/}
212 *
213 * @example
214 * //-
215 * // Get the IAM policy for your subscription.
216 * //-
217 * subscription.iam.getPolicy((err, policy) => {
218 * console.log(policy);
219 * });
220 *
221 * //-
222 * // If the callback is omitted, we'll return a Promise.
223 * //-
224 * subscription.iam.getPolicy().then((data) => {
225 * const policy = data[0];
226 * const apiResponse = data[1];
227 * });
228 */
229 this.iam = new iam_1.IAM(pubsub, this.name);
230 this._subscriber = new subscriber_1.Subscriber(this, options);
231 this._subscriber
232 .on('error', err => this.emit('error', err))
233 .on('message', message => this.emit('message', message))
234 .on('close', () => this.emit('close'));
235 this._listen();
236 }
237 /**
238 * Indicates if the Subscription is open and receiving messages.
239 *
240 * @type {boolean}
241 */
242 get isOpen() {
243 return !!(this._subscriber && this._subscriber.isOpen);
244 }
245 /**
246 * @type {string}
247 */
248 get projectId() {
249 return (this.pubsub && this.pubsub.projectId) || '{{projectId}}';
250 }
251 /**
252 * Closes the Subscription, once this is called you will no longer receive
253 * message events unless you call {Subscription#open} or add new message
254 * listeners.
255 *
256 * @param {function} [callback] The callback function.
257 * @param {?error} callback.err An error returned while closing the
258 * Subscription.
259 *
260 * @example
261 * subscription.close(err => {
262 * if (err) {
263 * // Error handling omitted.
264 * }
265 * });
266 *
267 * // If the callback is omitted a Promise will be returned.
268 * subscription.close().then(() => {});
269 */
270 close(callback) {
271 this._subscriber.close().then(() => callback(), callback);
272 }
273 /**
274 * Create a subscription.
275 *
276 * @see [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
277 *
278 * @throws {Error} If subscription name is omitted.
279 *
280 * @param {string} name The name of the subscription.
281 * @param {CreateSubscriptionRequest} [options] See a
282 * [Subscription
283 * resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions).
284 * @param {CreateSubscriptionCallback} [callback] Callback function.
285 * @returns {Promise<CreateSubscriptionResponse>}
286 *
287 * @example
288 * const {PubSub} = require('@google-cloud/pubsub');
289 * const pubsub = new PubSub();
290 *
291 * const topic = pubsub.topic('my-topic');
292 * const subscription = topic.subscription('newMessages');
293 * const callback = function(err, subscription, apiResponse) {};
294 *
295 * subscription.create(callback);
296 *
297 * @example <caption>With options</caption>
298 * subscription.create({
299 * ackDeadlineSeconds: 90
300 * }, callback);
301 *
302 * @example <caption>If the callback is omitted, we'll return a
303 * Promise.</caption> const [sub, apiResponse] = await subscription.create();
304 */
305 create(optsOrCallback, callback) {
306 if (!this.topic) {
307 throw new Error('Subscriptions can only be created when accessed through Topics');
308 }
309 const name = this.name.split('/').pop();
310 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
311 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
312 this.pubsub.createSubscription(this.topic, name, options, (err, sub, resp) => {
313 if (err) {
314 callback(err, null, resp);
315 return;
316 }
317 Object.assign(this, sub);
318 callback(null, this, resp);
319 });
320 }
321 /**
322 * @typedef {array} CreateSnapshotResponse
323 * @property {Snapshot} 0 The new {@link Snapshot}.
324 * @property {object} 1 The full API response.
325 */
326 /**
327 * @callback CreateSnapshotCallback
328 * @param {?Error} err Request error, if any.
329 * @param {Snapshot} snapshot The new {@link Snapshot}.
330 * @param {object} apiResponse The full API response.
331 */
332 /**
333 * Create a snapshot with the given name.
334 *
335 * @param {string} name Name of the snapshot.
336 * @param {object} [gaxOpts] Request configuration options, outlined
337 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
338 * @param {CreateSnapshotCallback} [callback] Callback function.
339 * @returns {Promise<CreateSnapshotResponse>}
340 *
341 * @example
342 * const {PubSub} = require('@google-cloud/pubsub');
343 * const pubsub = new PubSub();
344 *
345 * const topic = pubsub.topic('my-topic');
346 * const subscription = topic.subscription('my-subscription');
347 *
348 * const callback = (err, snapshot, apiResponse) => {
349 * if (!err) {
350 * // The snapshot was created successfully.
351 * }
352 * };
353 *
354 * subscription.createSnapshot('my-snapshot', callback);
355 *
356 * //-
357 * // If the callback is omitted, we'll return a Promise.
358 * //-
359 * subscription.createSnapshot('my-snapshot').then((data) => {
360 * const snapshot = data[0];
361 * const apiResponse = data[1];
362 * });
363 */
364 createSnapshot(name, optsOrCallback, callback) {
365 if (typeof name !== 'string') {
366 throw new Error('A name is required to create a snapshot.');
367 }
368 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
369 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
370 const snapshot = this.snapshot(name);
371 const reqOpts = {
372 name: snapshot.name,
373 subscription: this.name,
374 };
375 this.request({
376 client: 'SubscriberClient',
377 method: 'createSnapshot',
378 reqOpts,
379 gaxOpts,
380 }, (err, resp) => {
381 if (err) {
382 callback(err, null, resp);
383 return;
384 }
385 snapshot.metadata = resp;
386 callback(null, snapshot, resp);
387 });
388 }
389 /**
390 * Delete the subscription. Pull requests from the current subscription will
391 * be errored once unsubscription is complete.
392 *
393 * @see [Subscriptions: delete API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/delete}
394 *
395 * @param {object} [gaxOpts] Request configuration options, outlined
396 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
397 * @param {function} [callback] The callback function.
398 * @param {?error} callback.err An error returned while making this
399 * request.
400 * @param {object} callback.apiResponse Raw API response.
401 *
402 * @example
403 * const {PubSub} = require('@google-cloud/pubsub');
404 * const pubsub = new PubSub();
405 *
406 * const topic = pubsub.topic('my-topic');
407 * const subscription = topic.subscription('my-subscription');
408 *
409 * subscription.delete((err, apiResponse) => {});
410 *
411 * //-
412 * // If the callback is omitted, we'll return a Promise.
413 * //-
414 * subscription.delete().then((data) => {
415 * const apiResponse = data[0];
416 * });
417 */
418 delete(optsOrCallback, callback) {
419 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
420 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
421 const reqOpts = {
422 subscription: this.name,
423 };
424 if (this.isOpen) {
425 this._subscriber.close();
426 }
427 this.request({
428 client: 'SubscriberClient',
429 method: 'deleteSubscription',
430 reqOpts,
431 gaxOpts,
432 }, callback);
433 }
434 /**
435 * @typedef {array} SubscriptionDetachedResponse
436 * @property {boolean} 0 Whether the subscription is detached.
437 */
438 /**
439 * @callback SubscriptionDetachedCallback
440 * @param {?Error} err Request error, if any.
441 * @param {boolean} exists Whether the subscription is detached.
442 */
443 /**
444 * Check if a subscription is detached.
445 *
446 * @param {SubscriptionDetachedCallback} [callback] Callback function.
447 * @returns {Promise<SubscriptionDetachedResponse>}
448 *
449 * @example
450 * const {PubSub} = require('@google-cloud/pubsub');
451 * const pubsub = new PubSub();
452 *
453 * const topic = pubsub.topic('my-topic');
454 * const subscription = topic.subscription('my-subscription');
455 *
456 * subscription.detached((err, exists) => {});
457 *
458 * //-
459 * // If the callback is omitted, we'll return a Promise.
460 * //-
461 * subscription.detached().then((data) => {
462 * const detached = data[0];
463 * });
464 */
465 detached(callback) {
466 this.getMetadata((err, metadata) => {
467 if (err) {
468 callback(err);
469 }
470 else {
471 callback(null, metadata.detached);
472 }
473 });
474 }
475 /**
476 * @typedef {array} SubscriptionExistsResponse
477 * @property {boolean} 0 Whether the subscription exists
478 */
479 /**
480 * @callback SubscriptionExistsCallback
481 * @param {?Error} err Request error, if any.
482 * @param {boolean} exists Whether the subscription exists.
483 */
484 /**
485 * Check if a subscription exists.
486 *
487 * @param {SubscriptionExistsCallback} [callback] Callback function.
488 * @returns {Promise<SubscriptionExistsResponse>}
489 *
490 * @example
491 * const {PubSub} = require('@google-cloud/pubsub');
492 * const pubsub = new PubSub();
493 *
494 * const topic = pubsub.topic('my-topic');
495 * const subscription = topic.subscription('my-subscription');
496 *
497 * subscription.exists((err, exists) => {});
498 *
499 * //-
500 * // If the callback is omitted, we'll return a Promise.
501 * //-
502 * subscription.exists().then((data) => {
503 * const exists = data[0];
504 * });
505 */
506 exists(callback) {
507 this.getMetadata(err => {
508 if (!err) {
509 callback(null, true);
510 return;
511 }
512 if (err.code === 5) {
513 callback(null, false);
514 return;
515 }
516 callback(err);
517 });
518 }
519 /**
520 * @typedef {array} GetSubscriptionResponse
521 * @property {Subscription} 0 The {@link Subscription}.
522 * @property {object} 1 The full API response.
523 */
524 /**
525 * @callback GetSubscriptionCallback
526 * @param {?Error} err Request error, if any.
527 * @param {Subscription} subscription The {@link Subscription}.
528 * @param {object} apiResponse The full API response.
529 */
530 /**
531 * Get a subscription if it exists.
532 *
533 * @param {object} [gaxOpts] Request configuration options, outlined
534 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
535 * @param {boolean} [gaxOpts.autoCreate=false] Automatically create the
536 * subscription if it does not already exist.
537 * @param {GetSubscriptionCallback} [callback] Callback function.
538 * @returns {Promise<GetSubscriptionResponse>}
539 *
540 * @example
541 * const {PubSub} = require('@google-cloud/pubsub');
542 * const pubsub = new PubSub();
543 *
544 * const topic = pubsub.topic('my-topic');
545 * const subscription = topic.subscription('my-subscription');
546 *
547 * subscription.get((err, subscription, apiResponse) => {
548 * // The `subscription` data has been populated.
549 * });
550 *
551 * //-
552 * // If the callback is omitted, we'll return a Promise.
553 * //-
554 * subscription.get().then((data) => {
555 * const subscription = data[0];
556 * const apiResponse = data[1];
557 * });
558 */
559 get(optsOrCallback, callback) {
560 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
561 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
562 const autoCreate = !!gaxOpts.autoCreate && this.topic;
563 delete gaxOpts.autoCreate;
564 this.getMetadata(gaxOpts, (err, apiResponse) => {
565 if (!err) {
566 callback(null, this, apiResponse);
567 return;
568 }
569 if (err.code !== 5 || !autoCreate) {
570 callback(err, null, apiResponse);
571 return;
572 }
573 this.create({ gaxOpts }, callback);
574 });
575 }
576 /**
577 * @typedef {array} GetSubscriptionMetadataResponse
578 * @property {object} 0 The full API response.
579 */
580 /**
581 * @callback GetSubscriptionMetadataCallback
582 * @param {?Error} err Request error, if any.
583 * @param {object} apiResponse The full API response.
584 */
585 /**
586 * Fetches the subscriptions metadata.
587 *
588 * @param {object} [gaxOpts] Request configuration options, outlined
589 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
590 * @param {GetSubscriptionMetadataCallback} [callback] Callback function.
591 * @returns {Promise<GetSubscriptionMetadataResponse>}
592 *
593 * @example
594 * const {PubSub} = require('@google-cloud/pubsub');
595 * const pubsub = new PubSub();
596 *
597 * const topic = pubsub.topic('my-topic');
598 * const subscription = topic.subscription('my-subscription');
599 *
600 * subscription.getMetadata((err, apiResponse) => {
601 * if (err) {
602 * // Error handling omitted.
603 * }
604 * });
605 *
606 * //-
607 * // If the callback is omitted, we'll return a Promise.
608 * //-
609 * subscription.getMetadata().then((data) => {
610 * const apiResponse = data[0];
611 * });
612 */
613 getMetadata(optsOrCallback, callback) {
614 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
615 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
616 const reqOpts = {
617 subscription: this.name,
618 };
619 this.request({
620 client: 'SubscriberClient',
621 method: 'getSubscription',
622 reqOpts,
623 gaxOpts,
624 }, (err, apiResponse) => {
625 if (!err) {
626 this.metadata = apiResponse;
627 }
628 callback(err, apiResponse);
629 });
630 }
631 /**
632 * @typedef {array} ModifyPushConfigResponse
633 * @property {object} 0 The full API response.
634 */
635 /**
636 * @callback ModifyPushConfigCallback
637 * @param {?Error} err Request error, if any.
638 * @param {object} apiResponse The full API response.
639 */
640 /**
641 * Modify the push config for the subscription.
642 *
643 * @param {object} config The push config.
644 * @param {string} config.pushEndpoint A URL locating the endpoint to which
645 * messages should be published.
646 * @param {object} config.attributes [PushConfig attributes](https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PushConfig).
647 * @param {object} config.oidcToken If specified, Pub/Sub will generate and
648 * attach an OIDC JWT token as an `Authorization` header in the HTTP
649 * request for every pushed message. This object should have the same
650 * structure as [OidcToken]{@link google.pubsub.v1.OidcToken}
651 * @param {object} [gaxOpts] Request configuration options, outlined
652 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
653 * @param {ModifyPushConfigCallback} [callback] Callback function.
654 * @returns {Promise<ModifyPushConfigResponse>}
655 *
656 * @example
657 * const {PubSub} = require('@google-cloud/pubsub');
658 * const pubsub = new PubSub();
659 *
660 * const topic = pubsub.topic('my-topic');
661 * const subscription = topic.subscription('my-subscription');
662 *
663 * const pushConfig = {
664 * pushEndpoint: 'https://mydomain.com/push',
665 * attributes: {
666 * key: 'value'
667 * },
668 * oidcToken: {
669 * serviceAccountEmail: 'myproject@appspot.gserviceaccount.com',
670 * audience: 'myaudience'
671 * }
672 * };
673 *
674 * subscription.modifyPushConfig(pushConfig, (err, apiResponse) => {
675 * if (err) {
676 * // Error handling omitted.
677 * }
678 * });
679 *
680 * //-
681 * // If the callback is omitted, we'll return a Promise.
682 * //-
683 * subscription.modifyPushConfig(pushConfig).then((data) => {
684 * const apiResponse = data[0];
685 * });
686 */
687 modifyPushConfig(config, optsOrCallback, callback) {
688 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
689 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
690 const reqOpts = {
691 subscription: this.name,
692 pushConfig: config,
693 };
694 this.request({
695 client: 'SubscriberClient',
696 method: 'modifyPushConfig',
697 reqOpts,
698 gaxOpts,
699 }, callback);
700 }
701 /**
702 * Opens the Subscription to receive messages. In general this method
703 * shouldn't need to be called, unless you wish to receive messages after
704 * calling {@link Subscription#close}. Alternatively one could just assign a
705 * new `message` event listener which will also re-open the Subscription.
706 *
707 * @example
708 * subscription.on('message', message => message.ack());
709 *
710 * // Close the subscription.
711 * subscription.close(err => {
712 * if (err) {
713 * // Error handling omitted.
714 * }
715 *
716 * The subscription has been closed and messages will no longer be received.
717 * });
718 *
719 * // Resume receiving messages.
720 * subscription.open();
721 */
722 open() {
723 if (!this._subscriber.isOpen) {
724 this._subscriber.open();
725 }
726 }
727 /**
728 * @typedef {array} SeekResponse
729 * @property {object} 0 The full API response.
730 */
731 /**
732 * @callback SeekCallback
733 * @param {?Error} err Request error, if any.
734 * @param {object} apiResponse The full API response.
735 */
736 /**
737 * Seeks an existing subscription to a point in time or a given snapshot.
738 *
739 * @param {string|date} snapshot The point to seek to. This will accept the
740 * name of the snapshot or a Date object.
741 * @param {object} [gaxOpts] Request configuration options, outlined
742 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
743 * @param {SeekCallback} [callback] Callback function.
744 * @returns {Promise<SeekResponse>}
745 *
746 * @example
747 * const callback = (err, resp) => {
748 * if (!err) {
749 * // Seek was successful.
750 * }
751 * };
752 *
753 * subscription.seek('my-snapshot', callback);
754 *
755 * //-
756 * // Alternatively, to specify a certain point in time, you can provide a
757 * Date
758 * // object.
759 * //-
760 * const date = new Date('October 21 2015');
761 *
762 * subscription.seek(date, callback);
763 */
764 seek(snapshot, optsOrCallback, callback) {
765 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
766 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
767 const reqOpts = {
768 subscription: this.name,
769 };
770 if (typeof snapshot === 'string') {
771 reqOpts.snapshot = snapshot_1.Snapshot.formatName_(this.pubsub.projectId, snapshot);
772 }
773 else if (Object.prototype.toString.call(snapshot) === '[object Date]') {
774 const dateMillis = snapshot.getTime();
775 reqOpts.time = {
776 seconds: Math.floor(dateMillis / 1000),
777 nanos: Math.floor(dateMillis % 1000) * 1000,
778 };
779 }
780 else {
781 throw new Error('Either a snapshot name or Date is needed to seek to.');
782 }
783 this.request({
784 client: 'SubscriberClient',
785 method: 'seek',
786 reqOpts,
787 gaxOpts,
788 }, callback);
789 }
790 /**
791 * @typedef {array} SetSubscriptionMetadataResponse
792 * @property {object} 0 The full API response.
793 */
794 /**
795 * @callback SetSubscriptionMetadataCallback
796 * @param {?Error} err Request error, if any.
797 * @param {object} apiResponse The full API response.
798 */
799 /**
800 * Update the subscription object.
801 *
802 * @param {object} metadata The subscription metadata.
803 * @param {object} [gaxOpts] Request configuration options, outlined
804 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
805 * @param {SetSubscriptionMetadataCallback} [callback] Callback function.
806 * @returns {Promise<SetSubscriptionMetadataResponse>}
807 *
808 * @example
809 * const metadata = {
810 * key: 'value'
811 * };
812 *
813 * subscription.setMetadata(metadata, (err, apiResponse) => {
814 * if (err) {
815 * // Error handling omitted.
816 * }
817 * });
818 *
819 * //-
820 * // If the callback is omitted, we'll return a Promise.
821 * //-
822 * subscription.setMetadata(metadata).then((data) => {
823 * const apiResponse = data[0];
824 * });
825 */
826 setMetadata(metadata, optsOrCallback, callback) {
827 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
828 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
829 const subscription = Subscription.formatMetadata_(metadata);
830 const fields = Object.keys(subscription).map(snakeCase);
831 subscription.name = this.name;
832 const reqOpts = {
833 subscription,
834 updateMask: {
835 paths: fields,
836 },
837 };
838 this.request({
839 client: 'SubscriberClient',
840 method: 'updateSubscription',
841 reqOpts,
842 gaxOpts,
843 }, callback);
844 }
845 /**
846 * Sets the Subscription options.
847 *
848 * @param {SubscriberOptions} options The options.
849 */
850 setOptions(options) {
851 this._subscriber.setOptions(options);
852 }
853 /**
854 * Create a Snapshot object. See {@link Subscription#createSnapshot} to
855 * create a snapshot.
856 *
857 * @throws {Error} If a name is not provided.
858 *
859 * @param {string} name The name of the snapshot.
860 * @returns {Snapshot}
861 *
862 * @example
863 * const snapshot = subscription.snapshot('my-snapshot');
864 */
865 snapshot(name) {
866 return this.pubsub.snapshot.call(this, name);
867 }
868 /**
869 * Watches for incoming message event handlers and open/closes the
870 * subscriber as needed.
871 *
872 * @private
873 */
874 _listen() {
875 this.on('newListener', event => {
876 if (!this.isOpen && event === 'message') {
877 this._subscriber.open();
878 }
879 });
880 this.on('removeListener', () => {
881 if (this.isOpen && this.listenerCount('message') === 0) {
882 this._subscriber.close();
883 }
884 });
885 }
886 /*!
887 * Formats Subscription metadata.
888 *
889 * @private
890 */
891 static formatMetadata_(metadata) {
892 const formatted = extend(true, {}, metadata);
893 if (typeof metadata.messageRetentionDuration === 'number') {
894 formatted.messageRetentionDuration = {
895 seconds: metadata.messageRetentionDuration,
896 nanos: 0,
897 };
898 }
899 if (metadata.pushEndpoint) {
900 formatted.pushConfig = {
901 pushEndpoint: metadata.pushEndpoint,
902 };
903 delete formatted.pushEndpoint;
904 }
905 if (metadata.oidcToken) {
906 formatted.pushConfig = {
907 ...formatted.pushConfig,
908 oidcToken: metadata.oidcToken,
909 };
910 delete formatted.oidcToken;
911 }
912 return formatted;
913 }
914 /*!
915 * Format the name of a subscription. A subscription's full name is in the
916 * format of projects/{projectId}/subscriptions/{subName}.
917 *
918 * @private
919 */
920 static formatName_(projectId, name) {
921 // Simple check if the name is already formatted.
922 if (name.indexOf('/') > -1) {
923 return name;
924 }
925 return 'projects/' + projectId + '/subscriptions/' + name;
926 }
927}
928exports.Subscription = Subscription;
929/*! Developer Documentation
930 *
931 * All async methods (except for streams) will return a Promise in the event
932 * that a callback is omitted.
933 */
934promisify_1.promisifyAll(Subscription, {
935 exclude: ['open', 'snapshot'],
936});
937//# sourceMappingURL=subscription.js.map
\No newline at end of file