1 | ;
|
2 | /*!
|
3 | * Copyright 2014 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.Subscription = void 0;
|
19 | const promisify_1 = require("@google-cloud/promisify");
|
20 | const events_1 = require("events");
|
21 | const extend = require("extend");
|
22 | const snakeCase = require("lodash.snakecase");
|
23 | const iam_1 = require("./iam");
|
24 | const snapshot_1 = require("./snapshot");
|
25 | const subscriber_1 = require("./subscriber");
|
26 | // JSDoc won't see these, so this is just to let you get typings
|
27 | // in your editor of choice.
|
28 | //
|
29 | // NOTE: These are commented out for now because we don't want to
|
30 | // break any existing clients that rely on not-entirely-correct
|
31 | // typings. We'll re-enable on the next major.
|
32 | /* export declare interface Subscription {
|
33 | on(
|
34 | event: 'message',
|
35 | listener: (message: Message) => void
|
36 | ): this;
|
37 | on(
|
38 | event: 'error',
|
39 | listener: (error: StatusError) => void
|
40 | ): this;
|
41 | on(event: 'close', listener: () => void): this;
|
42 |
|
43 | // Only used internally.
|
44 | on(event: 'newListener', listener: Function): this;
|
45 | on(event: 'removeListener', listener: Function): this;
|
46 |
|
47 | // Catch-all. If you get an error about this line, it means you're
|
48 | // using an unsupported event type or listener type.
|
49 | on(event: string, listener: void): this;
|
50 | } */
|
51 | /**
|
52 | * @typedef {object} ExpirationPolicy
|
53 | * A policy that specifies the conditions for this subscription's expiration. A
|
54 | * subscription is considered active as long as any connected subscriber is
|
55 | * successfully consuming messages from the subscription or is issuing
|
56 | * operations on the subscription. If expirationPolicy is not set, a default
|
57 | * policy with ttl of 31 days will be used. The minimum allowed value for
|
58 | * expirationPolicy.ttl is 1 day.
|
59 | * @property {google.protobuf.Duration} ttl Specifies the "time-to-live"
|
60 | * duration for an associated resource. The resource expires if it is not
|
61 | * active for a period of `ttl`. The definition of "activity" depends on the
|
62 | * type of the associated resource. The minimum and maximum allowed values
|
63 | * for `ttl` depend on the type of the associated resource, as well. If
|
64 | * `ttl` is not set, the associated resource never expires.
|
65 | */
|
66 | /**
|
67 | * A Subscription object will give you access to your Cloud Pub/Sub
|
68 | * subscription.
|
69 | *
|
70 | * Subscriptions are sometimes retrieved when using various methods:
|
71 | *
|
72 | * - {@link PubSub#getSubscriptions}
|
73 | * - {@link Topic#getSubscriptions}
|
74 | *
|
75 | * Subscription objects may be created directly with:
|
76 | *
|
77 | * - {@link PubSub#createSubscription}
|
78 | * - {@link Topic#createSubscription}
|
79 | *
|
80 | * All Subscription objects are instances of an
|
81 | * [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull
|
82 | * for messages automatically as long as there is at least one listener assigned
|
83 | * for the `message` event. Available events:
|
84 | *
|
85 | * Upon receipt of a message:
|
86 | * on(event: 'message', listener: (message: {@link Message}) => void): this;
|
87 | *
|
88 | * Upon receipt of an error:
|
89 | * on(event: 'error', listener: (error: Error) => void): this;
|
90 | *
|
91 | * Upon the closing of the subscriber:
|
92 | * on(event: 'close', listener: Function): this;
|
93 | *
|
94 | * By default Subscription objects allow you to process 100 messages at the same
|
95 | * time. You can fine tune this value by adjusting the
|
96 | * `options.flowControl.maxMessages` option.
|
97 | *
|
98 | * If your subscription is seeing more re-deliveries than preferable, you might
|
99 | * try increasing your `options.ackDeadline` value or decreasing the
|
100 | * `options.streamingOptions.maxStreams` value.
|
101 | *
|
102 | * Subscription objects handle ack management, by automatically extending the
|
103 | * ack deadline while the message is being processed, to then issue the ack or
|
104 | * nack of such message when the processing is done. **Note:** message
|
105 | * redelivery is still possible.
|
106 | *
|
107 | * By default each {@link PubSub} instance can handle 100 open streams, with
|
108 | * default options this translates to less than 20 Subscriptions per PubSub
|
109 | * instance. If you wish to create more Subscriptions than that, you can either
|
110 | * create multiple PubSub instances or lower the
|
111 | * `options.streamingOptions.maxStreams` value on each Subscription object.
|
112 | *
|
113 | * @class
|
114 | *
|
115 | * @param {PubSub} pubsub PubSub object.
|
116 | * @param {string} name The name of the subscription.
|
117 | * @param {SubscriberOptions} [options] Options for handling messages.
|
118 | *
|
119 | * @example <caption>From {@link PubSub#getSubscriptions}</caption>
|
120 | * const {PubSub} = require('@google-cloud/pubsub');
|
121 | * const pubsub = new PubSub();
|
122 | *
|
123 | * pubsub.getSubscriptions((err, subscriptions) => {
|
124 | * // `subscriptions` is an array of Subscription objects.
|
125 | * });
|
126 | *
|
127 | * @example <caption>From {@link Topic#getSubscriptions}</caption>
|
128 | * const topic = pubsub.topic('my-topic');
|
129 | * topic.getSubscriptions((err, subscriptions) => {
|
130 | * // `subscriptions` is an array of Subscription objects.
|
131 | * });
|
132 | *
|
133 | * @example <caption>{@link Topic#createSubscription}</caption>
|
134 | * const topic = pubsub.topic('my-topic');
|
135 | * topic.createSubscription('new-subscription', (err, subscription) => {
|
136 | * // `subscription` is a Subscription object.
|
137 | * });
|
138 | *
|
139 | * @example <caption>{@link Topic#subscription}</caption>
|
140 | * const topic = pubsub.topic('my-topic');
|
141 | * const subscription = topic.subscription('my-subscription');
|
142 | * // `subscription` is a Subscription object.
|
143 | *
|
144 | * @example <caption>Once you have obtained a subscription object, you may begin
|
145 | * to register listeners. This will automatically trigger pulling for messages.
|
146 | * </caption>
|
147 | * // Register an error handler.
|
148 | * subscription.on('error', (err) => {});
|
149 | *
|
150 | * // Register a close handler in case the subscriber closes unexpectedly
|
151 | * subscription.on('close', () => {});
|
152 | *
|
153 | * // Register a listener for `message` events.
|
154 | * function onMessage(message) {
|
155 | * // Called every time a message is received.
|
156 | *
|
157 | * // message.id = ID of the message.
|
158 | * // message.ackId = ID used to acknowledge the message receival.
|
159 | * // message.data = Contents of the message.
|
160 | * // message.attributes = Attributes of the message.
|
161 | * // message.publishTime = Date when Pub/Sub received the message.
|
162 | *
|
163 | * // Ack the message:
|
164 | * // message.ack();
|
165 | *
|
166 | * // This doesn't ack the message, but allows more messages to be retrieved
|
167 | * // if your limit was hit or if you don't want to ack the message.
|
168 | * // message.nack();
|
169 | * }
|
170 | * subscription.on('message', onMessage);
|
171 | *
|
172 | * // Remove the listener from receiving `message` events.
|
173 | * subscription.removeListener('message', onMessage);
|
174 | *
|
175 | * @example <caption>To apply a fine level of flow control, consider the
|
176 | * following configuration</caption>
|
177 | * const subscription = topic.subscription('my-sub', {
|
178 | * flowControl: {
|
179 | * maxMessages: 1,
|
180 | * // this tells the client to manage and lock any excess messages
|
181 | * allowExcessMessages: false
|
182 | * }
|
183 | * });
|
184 | */
|
185 | class Subscription extends events_1.EventEmitter {
|
186 | constructor(pubsub, name, options) {
|
187 | super();
|
188 | options = options || {};
|
189 | this.pubsub = pubsub;
|
190 | this.request = pubsub.request.bind(pubsub);
|
191 | this.name = Subscription.formatName_(this.projectId, name);
|
192 | this.topic = options.topic;
|
193 | /**
|
194 | * [IAM (Identity and Access
|
195 | * Management)](https://cloud.google.com/pubsub/access_control) allows you
|
196 | * to set permissions on individual resources and offers a wider range of
|
197 | * roles: editor, owner, publisher, subscriber, and viewer. This gives you
|
198 | * greater flexibility and allows you to set more fine-grained access
|
199 | * control.
|
200 | *
|
201 | * *The IAM access control features described in this document are Beta,
|
202 | * including the API methods to get and set IAM policies, and to test IAM
|
203 | * permissions. Cloud Pub/Sub's use of IAM features is not covered by
|
204 | * any SLA or deprecation policy, and may be subject to
|
205 | * backward-incompatible changes.*
|
206 | *
|
207 | * @name Subscription#iam
|
208 | * @mixes IAM
|
209 | *
|
210 | * @see [Access Control Overview]{@link https://cloud.google.com/pubsub/access_control}
|
211 | * @see [What is Cloud IAM?]{@link https://cloud.google.com/iam/}
|
212 | *
|
213 | * @example
|
214 | * //-
|
215 | * // Get the IAM policy for your subscription.
|
216 | * //-
|
217 | * subscription.iam.getPolicy((err, policy) => {
|
218 | * console.log(policy);
|
219 | * });
|
220 | *
|
221 | * //-
|
222 | * // If the callback is omitted, we'll return a Promise.
|
223 | * //-
|
224 | * subscription.iam.getPolicy().then((data) => {
|
225 | * const policy = data[0];
|
226 | * const apiResponse = data[1];
|
227 | * });
|
228 | */
|
229 | this.iam = new iam_1.IAM(pubsub, this.name);
|
230 | this._subscriber = new subscriber_1.Subscriber(this, options);
|
231 | this._subscriber
|
232 | .on('error', err => this.emit('error', err))
|
233 | .on('message', message => this.emit('message', message))
|
234 | .on('close', () => this.emit('close'));
|
235 | this._listen();
|
236 | }
|
237 | /**
|
238 | * Indicates if the Subscription is open and receiving messages.
|
239 | *
|
240 | * @type {boolean}
|
241 | */
|
242 | get isOpen() {
|
243 | return !!(this._subscriber && this._subscriber.isOpen);
|
244 | }
|
245 | /**
|
246 | * @type {string}
|
247 | */
|
248 | get projectId() {
|
249 | return (this.pubsub && this.pubsub.projectId) || '{{projectId}}';
|
250 | }
|
251 | /**
|
252 | * Closes the Subscription, once this is called you will no longer receive
|
253 | * message events unless you call {Subscription#open} or add new message
|
254 | * listeners.
|
255 | *
|
256 | * @param {function} [callback] The callback function.
|
257 | * @param {?error} callback.err An error returned while closing the
|
258 | * Subscription.
|
259 | *
|
260 | * @example
|
261 | * subscription.close(err => {
|
262 | * if (err) {
|
263 | * // Error handling omitted.
|
264 | * }
|
265 | * });
|
266 | *
|
267 | * // If the callback is omitted a Promise will be returned.
|
268 | * subscription.close().then(() => {});
|
269 | */
|
270 | close(callback) {
|
271 | this._subscriber.close().then(() => callback(), callback);
|
272 | }
|
273 | /**
|
274 | * Create a subscription.
|
275 | *
|
276 | * @see [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
|
277 | *
|
278 | * @throws {Error} If subscription name is omitted.
|
279 | *
|
280 | * @param {string} name The name of the subscription.
|
281 | * @param {CreateSubscriptionRequest} [options] See a
|
282 | * [Subscription
|
283 | * resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions).
|
284 | * @param {CreateSubscriptionCallback} [callback] Callback function.
|
285 | * @returns {Promise<CreateSubscriptionResponse>}
|
286 | *
|
287 | * @example
|
288 | * const {PubSub} = require('@google-cloud/pubsub');
|
289 | * const pubsub = new PubSub();
|
290 | *
|
291 | * const topic = pubsub.topic('my-topic');
|
292 | * const subscription = topic.subscription('newMessages');
|
293 | * const callback = function(err, subscription, apiResponse) {};
|
294 | *
|
295 | * subscription.create(callback);
|
296 | *
|
297 | * @example <caption>With options</caption>
|
298 | * subscription.create({
|
299 | * ackDeadlineSeconds: 90
|
300 | * }, callback);
|
301 | *
|
302 | * @example <caption>If the callback is omitted, we'll return a
|
303 | * Promise.</caption> const [sub, apiResponse] = await subscription.create();
|
304 | */
|
305 | create(optsOrCallback, callback) {
|
306 | if (!this.topic) {
|
307 | throw new Error('Subscriptions can only be created when accessed through Topics');
|
308 | }
|
309 | const name = this.name.split('/').pop();
|
310 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
311 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
312 | this.pubsub.createSubscription(this.topic, name, options, (err, sub, resp) => {
|
313 | if (err) {
|
314 | callback(err, null, resp);
|
315 | return;
|
316 | }
|
317 | Object.assign(this, sub);
|
318 | callback(null, this, resp);
|
319 | });
|
320 | }
|
321 | /**
|
322 | * @typedef {array} CreateSnapshotResponse
|
323 | * @property {Snapshot} 0 The new {@link Snapshot}.
|
324 | * @property {object} 1 The full API response.
|
325 | */
|
326 | /**
|
327 | * @callback CreateSnapshotCallback
|
328 | * @param {?Error} err Request error, if any.
|
329 | * @param {Snapshot} snapshot The new {@link Snapshot}.
|
330 | * @param {object} apiResponse The full API response.
|
331 | */
|
332 | /**
|
333 | * Create a snapshot with the given name.
|
334 | *
|
335 | * @param {string} name Name of the snapshot.
|
336 | * @param {object} [gaxOpts] Request configuration options, outlined
|
337 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
338 | * @param {CreateSnapshotCallback} [callback] Callback function.
|
339 | * @returns {Promise<CreateSnapshotResponse>}
|
340 | *
|
341 | * @example
|
342 | * const {PubSub} = require('@google-cloud/pubsub');
|
343 | * const pubsub = new PubSub();
|
344 | *
|
345 | * const topic = pubsub.topic('my-topic');
|
346 | * const subscription = topic.subscription('my-subscription');
|
347 | *
|
348 | * const callback = (err, snapshot, apiResponse) => {
|
349 | * if (!err) {
|
350 | * // The snapshot was created successfully.
|
351 | * }
|
352 | * };
|
353 | *
|
354 | * subscription.createSnapshot('my-snapshot', callback);
|
355 | *
|
356 | * //-
|
357 | * // If the callback is omitted, we'll return a Promise.
|
358 | * //-
|
359 | * subscription.createSnapshot('my-snapshot').then((data) => {
|
360 | * const snapshot = data[0];
|
361 | * const apiResponse = data[1];
|
362 | * });
|
363 | */
|
364 | createSnapshot(name, optsOrCallback, callback) {
|
365 | if (typeof name !== 'string') {
|
366 | throw new Error('A name is required to create a snapshot.');
|
367 | }
|
368 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
369 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
370 | const snapshot = this.snapshot(name);
|
371 | const reqOpts = {
|
372 | name: snapshot.name,
|
373 | subscription: this.name,
|
374 | };
|
375 | this.request({
|
376 | client: 'SubscriberClient',
|
377 | method: 'createSnapshot',
|
378 | reqOpts,
|
379 | gaxOpts,
|
380 | }, (err, resp) => {
|
381 | if (err) {
|
382 | callback(err, null, resp);
|
383 | return;
|
384 | }
|
385 | snapshot.metadata = resp;
|
386 | callback(null, snapshot, resp);
|
387 | });
|
388 | }
|
389 | /**
|
390 | * Delete the subscription. Pull requests from the current subscription will
|
391 | * be errored once unsubscription is complete.
|
392 | *
|
393 | * @see [Subscriptions: delete API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/delete}
|
394 | *
|
395 | * @param {object} [gaxOpts] Request configuration options, outlined
|
396 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
397 | * @param {function} [callback] The callback function.
|
398 | * @param {?error} callback.err An error returned while making this
|
399 | * request.
|
400 | * @param {object} callback.apiResponse Raw API response.
|
401 | *
|
402 | * @example
|
403 | * const {PubSub} = require('@google-cloud/pubsub');
|
404 | * const pubsub = new PubSub();
|
405 | *
|
406 | * const topic = pubsub.topic('my-topic');
|
407 | * const subscription = topic.subscription('my-subscription');
|
408 | *
|
409 | * subscription.delete((err, apiResponse) => {});
|
410 | *
|
411 | * //-
|
412 | * // If the callback is omitted, we'll return a Promise.
|
413 | * //-
|
414 | * subscription.delete().then((data) => {
|
415 | * const apiResponse = data[0];
|
416 | * });
|
417 | */
|
418 | delete(optsOrCallback, callback) {
|
419 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
420 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
421 | const reqOpts = {
|
422 | subscription: this.name,
|
423 | };
|
424 | if (this.isOpen) {
|
425 | this._subscriber.close();
|
426 | }
|
427 | this.request({
|
428 | client: 'SubscriberClient',
|
429 | method: 'deleteSubscription',
|
430 | reqOpts,
|
431 | gaxOpts,
|
432 | }, callback);
|
433 | }
|
434 | /**
|
435 | * @typedef {array} SubscriptionDetachedResponse
|
436 | * @property {boolean} 0 Whether the subscription is detached.
|
437 | */
|
438 | /**
|
439 | * @callback SubscriptionDetachedCallback
|
440 | * @param {?Error} err Request error, if any.
|
441 | * @param {boolean} exists Whether the subscription is detached.
|
442 | */
|
443 | /**
|
444 | * Check if a subscription is detached.
|
445 | *
|
446 | * @param {SubscriptionDetachedCallback} [callback] Callback function.
|
447 | * @returns {Promise<SubscriptionDetachedResponse>}
|
448 | *
|
449 | * @example
|
450 | * const {PubSub} = require('@google-cloud/pubsub');
|
451 | * const pubsub = new PubSub();
|
452 | *
|
453 | * const topic = pubsub.topic('my-topic');
|
454 | * const subscription = topic.subscription('my-subscription');
|
455 | *
|
456 | * subscription.detached((err, exists) => {});
|
457 | *
|
458 | * //-
|
459 | * // If the callback is omitted, we'll return a Promise.
|
460 | * //-
|
461 | * subscription.detached().then((data) => {
|
462 | * const detached = data[0];
|
463 | * });
|
464 | */
|
465 | detached(callback) {
|
466 | this.getMetadata((err, metadata) => {
|
467 | if (err) {
|
468 | callback(err);
|
469 | }
|
470 | else {
|
471 | callback(null, metadata.detached);
|
472 | }
|
473 | });
|
474 | }
|
475 | /**
|
476 | * @typedef {array} SubscriptionExistsResponse
|
477 | * @property {boolean} 0 Whether the subscription exists
|
478 | */
|
479 | /**
|
480 | * @callback SubscriptionExistsCallback
|
481 | * @param {?Error} err Request error, if any.
|
482 | * @param {boolean} exists Whether the subscription exists.
|
483 | */
|
484 | /**
|
485 | * Check if a subscription exists.
|
486 | *
|
487 | * @param {SubscriptionExistsCallback} [callback] Callback function.
|
488 | * @returns {Promise<SubscriptionExistsResponse>}
|
489 | *
|
490 | * @example
|
491 | * const {PubSub} = require('@google-cloud/pubsub');
|
492 | * const pubsub = new PubSub();
|
493 | *
|
494 | * const topic = pubsub.topic('my-topic');
|
495 | * const subscription = topic.subscription('my-subscription');
|
496 | *
|
497 | * subscription.exists((err, exists) => {});
|
498 | *
|
499 | * //-
|
500 | * // If the callback is omitted, we'll return a Promise.
|
501 | * //-
|
502 | * subscription.exists().then((data) => {
|
503 | * const exists = data[0];
|
504 | * });
|
505 | */
|
506 | exists(callback) {
|
507 | this.getMetadata(err => {
|
508 | if (!err) {
|
509 | callback(null, true);
|
510 | return;
|
511 | }
|
512 | if (err.code === 5) {
|
513 | callback(null, false);
|
514 | return;
|
515 | }
|
516 | callback(err);
|
517 | });
|
518 | }
|
519 | /**
|
520 | * @typedef {array} GetSubscriptionResponse
|
521 | * @property {Subscription} 0 The {@link Subscription}.
|
522 | * @property {object} 1 The full API response.
|
523 | */
|
524 | /**
|
525 | * @callback GetSubscriptionCallback
|
526 | * @param {?Error} err Request error, if any.
|
527 | * @param {Subscription} subscription The {@link Subscription}.
|
528 | * @param {object} apiResponse The full API response.
|
529 | */
|
530 | /**
|
531 | * Get a subscription if it exists.
|
532 | *
|
533 | * @param {object} [gaxOpts] Request configuration options, outlined
|
534 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
535 | * @param {boolean} [gaxOpts.autoCreate=false] Automatically create the
|
536 | * subscription if it does not already exist.
|
537 | * @param {GetSubscriptionCallback} [callback] Callback function.
|
538 | * @returns {Promise<GetSubscriptionResponse>}
|
539 | *
|
540 | * @example
|
541 | * const {PubSub} = require('@google-cloud/pubsub');
|
542 | * const pubsub = new PubSub();
|
543 | *
|
544 | * const topic = pubsub.topic('my-topic');
|
545 | * const subscription = topic.subscription('my-subscription');
|
546 | *
|
547 | * subscription.get((err, subscription, apiResponse) => {
|
548 | * // The `subscription` data has been populated.
|
549 | * });
|
550 | *
|
551 | * //-
|
552 | * // If the callback is omitted, we'll return a Promise.
|
553 | * //-
|
554 | * subscription.get().then((data) => {
|
555 | * const subscription = data[0];
|
556 | * const apiResponse = data[1];
|
557 | * });
|
558 | */
|
559 | get(optsOrCallback, callback) {
|
560 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
561 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
562 | const autoCreate = !!gaxOpts.autoCreate && this.topic;
|
563 | delete gaxOpts.autoCreate;
|
564 | this.getMetadata(gaxOpts, (err, apiResponse) => {
|
565 | if (!err) {
|
566 | callback(null, this, apiResponse);
|
567 | return;
|
568 | }
|
569 | if (err.code !== 5 || !autoCreate) {
|
570 | callback(err, null, apiResponse);
|
571 | return;
|
572 | }
|
573 | this.create({ gaxOpts }, callback);
|
574 | });
|
575 | }
|
576 | /**
|
577 | * @typedef {array} GetSubscriptionMetadataResponse
|
578 | * @property {object} 0 The full API response.
|
579 | */
|
580 | /**
|
581 | * @callback GetSubscriptionMetadataCallback
|
582 | * @param {?Error} err Request error, if any.
|
583 | * @param {object} apiResponse The full API response.
|
584 | */
|
585 | /**
|
586 | * Fetches the subscriptions metadata.
|
587 | *
|
588 | * @param {object} [gaxOpts] Request configuration options, outlined
|
589 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
590 | * @param {GetSubscriptionMetadataCallback} [callback] Callback function.
|
591 | * @returns {Promise<GetSubscriptionMetadataResponse>}
|
592 | *
|
593 | * @example
|
594 | * const {PubSub} = require('@google-cloud/pubsub');
|
595 | * const pubsub = new PubSub();
|
596 | *
|
597 | * const topic = pubsub.topic('my-topic');
|
598 | * const subscription = topic.subscription('my-subscription');
|
599 | *
|
600 | * subscription.getMetadata((err, apiResponse) => {
|
601 | * if (err) {
|
602 | * // Error handling omitted.
|
603 | * }
|
604 | * });
|
605 | *
|
606 | * //-
|
607 | * // If the callback is omitted, we'll return a Promise.
|
608 | * //-
|
609 | * subscription.getMetadata().then((data) => {
|
610 | * const apiResponse = data[0];
|
611 | * });
|
612 | */
|
613 | getMetadata(optsOrCallback, callback) {
|
614 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
615 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
616 | const reqOpts = {
|
617 | subscription: this.name,
|
618 | };
|
619 | this.request({
|
620 | client: 'SubscriberClient',
|
621 | method: 'getSubscription',
|
622 | reqOpts,
|
623 | gaxOpts,
|
624 | }, (err, apiResponse) => {
|
625 | if (!err) {
|
626 | this.metadata = apiResponse;
|
627 | }
|
628 | callback(err, apiResponse);
|
629 | });
|
630 | }
|
631 | /**
|
632 | * @typedef {array} ModifyPushConfigResponse
|
633 | * @property {object} 0 The full API response.
|
634 | */
|
635 | /**
|
636 | * @callback ModifyPushConfigCallback
|
637 | * @param {?Error} err Request error, if any.
|
638 | * @param {object} apiResponse The full API response.
|
639 | */
|
640 | /**
|
641 | * Modify the push config for the subscription.
|
642 | *
|
643 | * @param {object} config The push config.
|
644 | * @param {string} config.pushEndpoint A URL locating the endpoint to which
|
645 | * messages should be published.
|
646 | * @param {object} config.attributes [PushConfig attributes](https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PushConfig).
|
647 | * @param {object} config.oidcToken If specified, Pub/Sub will generate and
|
648 | * attach an OIDC JWT token as an `Authorization` header in the HTTP
|
649 | * request for every pushed message. This object should have the same
|
650 | * structure as [OidcToken]{@link google.pubsub.v1.OidcToken}
|
651 | * @param {object} [gaxOpts] Request configuration options, outlined
|
652 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
653 | * @param {ModifyPushConfigCallback} [callback] Callback function.
|
654 | * @returns {Promise<ModifyPushConfigResponse>}
|
655 | *
|
656 | * @example
|
657 | * const {PubSub} = require('@google-cloud/pubsub');
|
658 | * const pubsub = new PubSub();
|
659 | *
|
660 | * const topic = pubsub.topic('my-topic');
|
661 | * const subscription = topic.subscription('my-subscription');
|
662 | *
|
663 | * const pushConfig = {
|
664 | * pushEndpoint: 'https://mydomain.com/push',
|
665 | * attributes: {
|
666 | * key: 'value'
|
667 | * },
|
668 | * oidcToken: {
|
669 | * serviceAccountEmail: 'myproject@appspot.gserviceaccount.com',
|
670 | * audience: 'myaudience'
|
671 | * }
|
672 | * };
|
673 | *
|
674 | * subscription.modifyPushConfig(pushConfig, (err, apiResponse) => {
|
675 | * if (err) {
|
676 | * // Error handling omitted.
|
677 | * }
|
678 | * });
|
679 | *
|
680 | * //-
|
681 | * // If the callback is omitted, we'll return a Promise.
|
682 | * //-
|
683 | * subscription.modifyPushConfig(pushConfig).then((data) => {
|
684 | * const apiResponse = data[0];
|
685 | * });
|
686 | */
|
687 | modifyPushConfig(config, optsOrCallback, callback) {
|
688 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
689 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
690 | const reqOpts = {
|
691 | subscription: this.name,
|
692 | pushConfig: config,
|
693 | };
|
694 | this.request({
|
695 | client: 'SubscriberClient',
|
696 | method: 'modifyPushConfig',
|
697 | reqOpts,
|
698 | gaxOpts,
|
699 | }, callback);
|
700 | }
|
701 | /**
|
702 | * Opens the Subscription to receive messages. In general this method
|
703 | * shouldn't need to be called, unless you wish to receive messages after
|
704 | * calling {@link Subscription#close}. Alternatively one could just assign a
|
705 | * new `message` event listener which will also re-open the Subscription.
|
706 | *
|
707 | * @example
|
708 | * subscription.on('message', message => message.ack());
|
709 | *
|
710 | * // Close the subscription.
|
711 | * subscription.close(err => {
|
712 | * if (err) {
|
713 | * // Error handling omitted.
|
714 | * }
|
715 | *
|
716 | * The subscription has been closed and messages will no longer be received.
|
717 | * });
|
718 | *
|
719 | * // Resume receiving messages.
|
720 | * subscription.open();
|
721 | */
|
722 | open() {
|
723 | if (!this._subscriber.isOpen) {
|
724 | this._subscriber.open();
|
725 | }
|
726 | }
|
727 | /**
|
728 | * @typedef {array} SeekResponse
|
729 | * @property {object} 0 The full API response.
|
730 | */
|
731 | /**
|
732 | * @callback SeekCallback
|
733 | * @param {?Error} err Request error, if any.
|
734 | * @param {object} apiResponse The full API response.
|
735 | */
|
736 | /**
|
737 | * Seeks an existing subscription to a point in time or a given snapshot.
|
738 | *
|
739 | * @param {string|date} snapshot The point to seek to. This will accept the
|
740 | * name of the snapshot or a Date object.
|
741 | * @param {object} [gaxOpts] Request configuration options, outlined
|
742 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
743 | * @param {SeekCallback} [callback] Callback function.
|
744 | * @returns {Promise<SeekResponse>}
|
745 | *
|
746 | * @example
|
747 | * const callback = (err, resp) => {
|
748 | * if (!err) {
|
749 | * // Seek was successful.
|
750 | * }
|
751 | * };
|
752 | *
|
753 | * subscription.seek('my-snapshot', callback);
|
754 | *
|
755 | * //-
|
756 | * // Alternatively, to specify a certain point in time, you can provide a
|
757 | * Date
|
758 | * // object.
|
759 | * //-
|
760 | * const date = new Date('October 21 2015');
|
761 | *
|
762 | * subscription.seek(date, callback);
|
763 | */
|
764 | seek(snapshot, optsOrCallback, callback) {
|
765 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
766 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
767 | const reqOpts = {
|
768 | subscription: this.name,
|
769 | };
|
770 | if (typeof snapshot === 'string') {
|
771 | reqOpts.snapshot = snapshot_1.Snapshot.formatName_(this.pubsub.projectId, snapshot);
|
772 | }
|
773 | else if (Object.prototype.toString.call(snapshot) === '[object Date]') {
|
774 | const dateMillis = snapshot.getTime();
|
775 | reqOpts.time = {
|
776 | seconds: Math.floor(dateMillis / 1000),
|
777 | nanos: Math.floor(dateMillis % 1000) * 1000,
|
778 | };
|
779 | }
|
780 | else {
|
781 | throw new Error('Either a snapshot name or Date is needed to seek to.');
|
782 | }
|
783 | this.request({
|
784 | client: 'SubscriberClient',
|
785 | method: 'seek',
|
786 | reqOpts,
|
787 | gaxOpts,
|
788 | }, callback);
|
789 | }
|
790 | /**
|
791 | * @typedef {array} SetSubscriptionMetadataResponse
|
792 | * @property {object} 0 The full API response.
|
793 | */
|
794 | /**
|
795 | * @callback SetSubscriptionMetadataCallback
|
796 | * @param {?Error} err Request error, if any.
|
797 | * @param {object} apiResponse The full API response.
|
798 | */
|
799 | /**
|
800 | * Update the subscription object.
|
801 | *
|
802 | * @param {object} metadata The subscription metadata.
|
803 | * @param {object} [gaxOpts] Request configuration options, outlined
|
804 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
805 | * @param {SetSubscriptionMetadataCallback} [callback] Callback function.
|
806 | * @returns {Promise<SetSubscriptionMetadataResponse>}
|
807 | *
|
808 | * @example
|
809 | * const metadata = {
|
810 | * key: 'value'
|
811 | * };
|
812 | *
|
813 | * subscription.setMetadata(metadata, (err, apiResponse) => {
|
814 | * if (err) {
|
815 | * // Error handling omitted.
|
816 | * }
|
817 | * });
|
818 | *
|
819 | * //-
|
820 | * // If the callback is omitted, we'll return a Promise.
|
821 | * //-
|
822 | * subscription.setMetadata(metadata).then((data) => {
|
823 | * const apiResponse = data[0];
|
824 | * });
|
825 | */
|
826 | setMetadata(metadata, optsOrCallback, callback) {
|
827 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
828 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
829 | const subscription = Subscription.formatMetadata_(metadata);
|
830 | const fields = Object.keys(subscription).map(snakeCase);
|
831 | subscription.name = this.name;
|
832 | const reqOpts = {
|
833 | subscription,
|
834 | updateMask: {
|
835 | paths: fields,
|
836 | },
|
837 | };
|
838 | this.request({
|
839 | client: 'SubscriberClient',
|
840 | method: 'updateSubscription',
|
841 | reqOpts,
|
842 | gaxOpts,
|
843 | }, callback);
|
844 | }
|
845 | /**
|
846 | * Sets the Subscription options.
|
847 | *
|
848 | * @param {SubscriberOptions} options The options.
|
849 | */
|
850 | setOptions(options) {
|
851 | this._subscriber.setOptions(options);
|
852 | }
|
853 | /**
|
854 | * Create a Snapshot object. See {@link Subscription#createSnapshot} to
|
855 | * create a snapshot.
|
856 | *
|
857 | * @throws {Error} If a name is not provided.
|
858 | *
|
859 | * @param {string} name The name of the snapshot.
|
860 | * @returns {Snapshot}
|
861 | *
|
862 | * @example
|
863 | * const snapshot = subscription.snapshot('my-snapshot');
|
864 | */
|
865 | snapshot(name) {
|
866 | return this.pubsub.snapshot.call(this, name);
|
867 | }
|
868 | /**
|
869 | * Watches for incoming message event handlers and open/closes the
|
870 | * subscriber as needed.
|
871 | *
|
872 | * @private
|
873 | */
|
874 | _listen() {
|
875 | this.on('newListener', event => {
|
876 | if (!this.isOpen && event === 'message') {
|
877 | this._subscriber.open();
|
878 | }
|
879 | });
|
880 | this.on('removeListener', () => {
|
881 | if (this.isOpen && this.listenerCount('message') === 0) {
|
882 | this._subscriber.close();
|
883 | }
|
884 | });
|
885 | }
|
886 | /*!
|
887 | * Formats Subscription metadata.
|
888 | *
|
889 | * @private
|
890 | */
|
891 | static formatMetadata_(metadata) {
|
892 | const formatted = extend(true, {}, metadata);
|
893 | if (typeof metadata.messageRetentionDuration === 'number') {
|
894 | formatted.messageRetentionDuration = {
|
895 | seconds: metadata.messageRetentionDuration,
|
896 | nanos: 0,
|
897 | };
|
898 | }
|
899 | if (metadata.pushEndpoint) {
|
900 | formatted.pushConfig = {
|
901 | pushEndpoint: metadata.pushEndpoint,
|
902 | };
|
903 | delete formatted.pushEndpoint;
|
904 | }
|
905 | if (metadata.oidcToken) {
|
906 | formatted.pushConfig = {
|
907 | ...formatted.pushConfig,
|
908 | oidcToken: metadata.oidcToken,
|
909 | };
|
910 | delete formatted.oidcToken;
|
911 | }
|
912 | return formatted;
|
913 | }
|
914 | /*!
|
915 | * Format the name of a subscription. A subscription's full name is in the
|
916 | * format of projects/{projectId}/subscriptions/{subName}.
|
917 | *
|
918 | * @private
|
919 | */
|
920 | static formatName_(projectId, name) {
|
921 | // Simple check if the name is already formatted.
|
922 | if (name.indexOf('/') > -1) {
|
923 | return name;
|
924 | }
|
925 | return 'projects/' + projectId + '/subscriptions/' + name;
|
926 | }
|
927 | }
|
928 | exports.Subscription = Subscription;
|
929 | /*! Developer Documentation
|
930 | *
|
931 | * All async methods (except for streams) will return a Promise in the event
|
932 | * that a callback is omitted.
|
933 | */
|
934 | promisify_1.promisifyAll(Subscription, {
|
935 | exclude: ['open', 'snapshot'],
|
936 | });
|
937 | //# sourceMappingURL=subscription.js.map |
\ | No newline at end of file |