UNPKG

19.8 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscription = void 0;
19const events_1 = require("events");
20const extend = require("extend");
21const snakeCase = require("lodash.snakecase");
22const iam_1 = require("./iam");
23const snapshot_1 = require("./snapshot");
24const subscriber_1 = require("./subscriber");
25const util_1 = require("./util");
26// JSDoc won't see these, so this is just to let you get typings
27// in your editor of choice.
28//
29// NOTE: These are commented out for now because we don't want to
30// break any existing clients that rely on not-entirely-correct
31// typings. We'll re-enable on the next major.
32/* export declare interface Subscription {
33 on(
34 event: 'message',
35 listener: (message: Message) => void
36 ): this;
37 on(
38 event: 'error',
39 listener: (error: StatusError) => void
40 ): this;
41 on(event: 'close', listener: () => void): this;
42
43 // Only used internally.
44 on(event: 'newListener', listener: Function): this;
45 on(event: 'removeListener', listener: Function): this;
46
47 // Catch-all. If you get an error about this line, it means you're
48 // using an unsupported event type or listener type.
49 on(event: string, listener: void): this;
50} */
51/**
52 * @typedef {object} ExpirationPolicy
53 * A policy that specifies the conditions for this subscription's expiration. A
54 * subscription is considered active as long as any connected subscriber is
55 * successfully consuming messages from the subscription or is issuing
56 * operations on the subscription. If expirationPolicy is not set, a default
57 * policy with ttl of 31 days will be used. The minimum allowed value for
58 * expirationPolicy.ttl is 1 day.
59 * @property {google.protobuf.Duration} ttl Specifies the "time-to-live"
60 * duration for an associated resource. The resource expires if it is not
61 * active for a period of `ttl`. The definition of "activity" depends on the
62 * type of the associated resource. The minimum and maximum allowed values
63 * for `ttl` depend on the type of the associated resource, as well. If
64 * `ttl` is not set, the associated resource never expires.
65 */
66/**
67 * A Subscription object will give you access to your Cloud Pub/Sub
68 * subscription.
69 *
70 * Subscriptions are sometimes retrieved when using various methods:
71 *
72 * - {@link PubSub#getSubscriptions}
73 * - {@link Topic#getSubscriptions}
74 *
75 * Subscription objects may be created directly with:
76 *
77 * - {@link PubSub#createSubscription}
78 * - {@link Topic#createSubscription}
79 *
80 * All Subscription objects are instances of an
81 * [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull
82 * for messages automatically as long as there is at least one listener assigned
83 * for the `message` event. Available events:
84 *
85 * Upon receipt of a message:
86 * on(event: 'message', listener: (message: {@link Message}) => void): this;
87 *
88 * Upon receipt of an error:
89 * on(event: 'error', listener: (error: Error) => void): this;
90 *
91 * Upon the closing of the subscriber:
92 * on(event: 'close', listener: Function): this;
93 *
94 * By default Subscription objects allow you to process 100 messages at the same
95 * time. You can fine tune this value by adjusting the
96 * `options.flowControl.maxMessages` option.
97 *
98 * If your subscription is seeing more re-deliveries than preferable, you might
99 * try increasing your `options.ackDeadline` value or decreasing the
100 * `options.streamingOptions.maxStreams` value.
101 *
102 * Subscription objects handle ack management, by automatically extending the
103 * ack deadline while the message is being processed, to then issue the ack or
104 * nack of such message when the processing is done. **Note:** message
105 * redelivery is still possible.
106 *
107 * By default each {@link PubSub} instance can handle 100 open streams, with
108 * default options this translates to less than 20 Subscriptions per PubSub
109 * instance. If you wish to create more Subscriptions than that, you can either
110 * create multiple PubSub instances or lower the
111 * `options.streamingOptions.maxStreams` value on each Subscription object.
112 *
113 * @class
114 *
115 * @param {PubSub} pubsub PubSub object.
116 * @param {string} name The name of the subscription.
117 * @param {SubscriberOptions} [options] Options for handling messages.
118 *
119 * @example From {@link PubSub#getSubscriptions}
120 * ```
121 * const {PubSub} = require('@google-cloud/pubsub');
122 * const pubsub = new PubSub();
123 *
124 * pubsub.getSubscriptions((err, subscriptions) => {
125 * // `subscriptions` is an array of Subscription objects.
126 * });
127 *
128 * ```
129 * @example From {@link Topic#getSubscriptions}
130 * ```
131 * const topic = pubsub.topic('my-topic');
132 * topic.getSubscriptions((err, subscriptions) => {
133 * // `subscriptions` is an array of Subscription objects.
134 * });
135 *
136 * ```
137 * @example {@link Topic#createSubscription}
138 * ```
139 * const topic = pubsub.topic('my-topic');
140 * topic.createSubscription('new-subscription', (err, subscription) => {
141 * // `subscription` is a Subscription object.
142 * });
143 *
144 * ```
145 * @example {@link Topic#subscription}
146 * ```
147 * const topic = pubsub.topic('my-topic');
148 * const subscription = topic.subscription('my-subscription');
149 * // `subscription` is a Subscription object.
150 *
151 * ```
152 * @example Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.
153 * ```
154 * // Register an error handler.
155 * subscription.on('error', (err) => {});
156 *
157 * // Register a close handler in case the subscriber closes unexpectedly
158 * subscription.on('close', () => {});
159 *
160 * // Register a listener for `message` events.
161 * function onMessage(message) {
162 * // Called every time a message is received.
163 *
164 * // message.id = ID of the message.
165 * // message.ackId = ID used to acknowledge the message receival.
166 * // message.data = Contents of the message.
167 * // message.attributes = Attributes of the message.
168 * // message.publishTime = Date when Pub/Sub received the message.
169 *
170 * // Ack the message:
171 * // message.ack();
172 *
173 * // This doesn't ack the message, but allows more messages to be retrieved
174 * // if your limit was hit or if you don't want to ack the message.
175 * // message.nack();
176 * }
177 * subscription.on('message', onMessage);
178 *
179 * // Remove the listener from receiving `message` events.
180 * subscription.removeListener('message', onMessage);
181 *
182 * ```
183 * @example To apply a fine level of flow control, consider the following configuration
184 * ```
185 * const subscription = topic.subscription('my-sub', {
186 * flowControl: {
187 * maxMessages: 1,
188 * // this tells the client to manage and lock any excess messages
189 * allowExcessMessages: false
190 * }
191 * });
192 * ```
193 */
194class Subscription extends events_1.EventEmitter {
195 constructor(pubsub, name, options) {
196 super();
197 options = options || {};
198 this.pubsub = pubsub;
199 this.request = pubsub.request.bind(pubsub);
200 this.name = Subscription.formatName_(this.projectId, name);
201 this.topic = options.topic;
202 /**
203 * [IAM (Identity and Access
204 * Management)](https://cloud.google.com/pubsub/access_control) allows you
205 * to set permissions on individual resources and offers a wider range of
206 * roles: editor, owner, publisher, subscriber, and viewer. This gives you
207 * greater flexibility and allows you to set more fine-grained access
208 * control.
209 *
210 * *The IAM access control features described in this document are Beta,
211 * including the API methods to get and set IAM policies, and to test IAM
212 * permissions. Cloud Pub/Sub's use of IAM features is not covered by
213 * any SLA or deprecation policy, and may be subject to
214 * backward-incompatible changes.*
215 *
216 * @name Subscription#iam
217 * @mixes IAM
218 *
219 * @see [Access Control Overview]{@link https://cloud.google.com/pubsub/access_control}
220 * @see [What is Cloud IAM?]{@link https://cloud.google.com/iam/}
221 *
222 * @example
223 * ```
224 * //-
225 * // Get the IAM policy for your subscription.
226 * //-
227 * subscription.iam.getPolicy((err, policy) => {
228 * console.log(policy);
229 * });
230 *
231 * //-
232 * // If the callback is omitted, we'll return a Promise.
233 * //-
234 * subscription.iam.getPolicy().then((data) => {
235 * const policy = data[0];
236 * const apiResponse = data[1];
237 * });
238 * ```
239 */
240 this.iam = new iam_1.IAM(pubsub, this.name);
241 this._subscriber = new subscriber_1.Subscriber(this, options);
242 this._subscriber
243 .on('error', err => this.emit('error', err))
244 .on('message', message => this.emit('message', message))
245 .on('close', () => this.emit('close'));
246 this._listen();
247 }
248 /**
249 * Indicates if the Subscription is open and receiving messages.
250 *
251 * @type {boolean}
252 */
253 get isOpen() {
254 return !!(this._subscriber && this._subscriber.isOpen);
255 }
256 /**
257 * @type {string}
258 */
259 get projectId() {
260 return (this.pubsub && this.pubsub.projectId) || '{{projectId}}';
261 }
262 close(callback) {
263 this._subscriber.close().then(() => callback(), callback);
264 }
265 create(optsOrCallback, callback) {
266 if (!this.topic) {
267 throw new Error('Subscriptions can only be created when accessed through Topics');
268 }
269 const name = this.name.split('/').pop();
270 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
271 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
272 this.pubsub.createSubscription(this.topic, name, options, (err, sub, resp) => {
273 if (err) {
274 callback(err, null, resp);
275 return;
276 }
277 Object.assign(this, sub);
278 callback(null, this, resp);
279 });
280 }
281 createSnapshot(name, optsOrCallback, callback) {
282 if (typeof name !== 'string') {
283 throw new Error('A name is required to create a snapshot.');
284 }
285 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
286 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
287 const snapshot = this.snapshot(name);
288 const reqOpts = {
289 name: snapshot.name,
290 subscription: this.name,
291 };
292 this.request({
293 client: 'SubscriberClient',
294 method: 'createSnapshot',
295 reqOpts,
296 gaxOpts,
297 }, (err, resp) => {
298 if (err) {
299 callback(err, null, resp);
300 return;
301 }
302 snapshot.metadata = resp;
303 callback(null, snapshot, resp);
304 });
305 }
306 delete(optsOrCallback, callback) {
307 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
308 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
309 const reqOpts = {
310 subscription: this.name,
311 };
312 if (this.isOpen) {
313 this._subscriber.close();
314 }
315 this.request({
316 client: 'SubscriberClient',
317 method: 'deleteSubscription',
318 reqOpts,
319 gaxOpts,
320 }, callback);
321 }
322 detached(callback) {
323 this.getMetadata((err, metadata) => {
324 if (err) {
325 callback(err);
326 }
327 else {
328 callback(null, metadata.detached);
329 }
330 });
331 }
332 exists(callback) {
333 this.getMetadata(err => {
334 if (!err) {
335 callback(null, true);
336 return;
337 }
338 if (err.code === 5) {
339 callback(null, false);
340 return;
341 }
342 callback(err);
343 });
344 }
345 get(optsOrCallback, callback) {
346 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
347 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
348 const autoCreate = !!gaxOpts.autoCreate && this.topic;
349 delete gaxOpts.autoCreate;
350 this.getMetadata(gaxOpts, (err, apiResponse) => {
351 if (!err) {
352 callback(null, this, apiResponse);
353 return;
354 }
355 if (err.code !== 5 || !autoCreate) {
356 callback(err, null, apiResponse);
357 return;
358 }
359 this.create({ gaxOpts }, callback);
360 });
361 }
362 getMetadata(optsOrCallback, callback) {
363 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
364 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
365 const reqOpts = {
366 subscription: this.name,
367 };
368 this.request({
369 client: 'SubscriberClient',
370 method: 'getSubscription',
371 reqOpts,
372 gaxOpts,
373 }, (err, apiResponse) => {
374 if (!err) {
375 this.metadata = apiResponse;
376 }
377 callback(err, apiResponse);
378 });
379 }
380 modifyPushConfig(config, optsOrCallback, callback) {
381 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
382 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
383 const reqOpts = {
384 subscription: this.name,
385 pushConfig: config,
386 };
387 this.request({
388 client: 'SubscriberClient',
389 method: 'modifyPushConfig',
390 reqOpts,
391 gaxOpts,
392 }, callback);
393 }
394 /**
395 * Opens the Subscription to receive messages. In general this method
396 * shouldn't need to be called, unless you wish to receive messages after
397 * calling {@link Subscription#close}. Alternatively one could just assign a
398 * new `message` event listener which will also re-open the Subscription.
399 *
400 * @example
401 * ```
402 * subscription.on('message', message => message.ack());
403 *
404 * // Close the subscription.
405 * subscription.close(err => {
406 * if (err) {
407 * // Error handling omitted.
408 * }
409 *
410 * The subscription has been closed and messages will no longer be received.
411 * });
412 *
413 * // Resume receiving messages.
414 * subscription.open();
415 * ```
416 */
417 open() {
418 if (!this._subscriber.isOpen) {
419 this._subscriber.open();
420 }
421 }
422 seek(snapshot, optsOrCallback, callback) {
423 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
424 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
425 const reqOpts = {
426 subscription: this.name,
427 };
428 if (typeof snapshot === 'string') {
429 reqOpts.snapshot = snapshot_1.Snapshot.formatName_(this.pubsub.projectId, snapshot);
430 }
431 else if (Object.prototype.toString.call(snapshot) === '[object Date]') {
432 const dateMillis = snapshot.getTime();
433 reqOpts.time = {
434 seconds: Math.floor(dateMillis / 1000),
435 nanos: Math.floor(dateMillis % 1000) * 1000,
436 };
437 }
438 else {
439 throw new Error('Either a snapshot name or Date is needed to seek to.');
440 }
441 this.request({
442 client: 'SubscriberClient',
443 method: 'seek',
444 reqOpts,
445 gaxOpts,
446 }, callback);
447 }
448 setMetadata(metadata, optsOrCallback, callback) {
449 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
450 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
451 const subscription = Subscription.formatMetadata_(metadata);
452 const fields = Object.keys(subscription).map(snakeCase);
453 subscription.name = this.name;
454 const reqOpts = {
455 subscription,
456 updateMask: {
457 paths: fields,
458 },
459 };
460 this.request({
461 client: 'SubscriberClient',
462 method: 'updateSubscription',
463 reqOpts,
464 gaxOpts,
465 }, callback);
466 }
467 /**
468 * Sets the Subscription options.
469 *
470 * @param {SubscriberOptions} options The options.
471 */
472 setOptions(options) {
473 this._subscriber.setOptions(options);
474 }
475 /**
476 * Create a Snapshot object. See {@link Subscription#createSnapshot} to
477 * create a snapshot.
478 *
479 * @throws {Error} If a name is not provided.
480 *
481 * @param {string} name The name of the snapshot.
482 * @returns {Snapshot}
483 *
484 * @example
485 * ```
486 * const snapshot = subscription.snapshot('my-snapshot');
487 * ```
488 */
489 snapshot(name) {
490 return this.pubsub.snapshot.call(this, name);
491 }
492 /**
493 * Watches for incoming message event handlers and open/closes the
494 * subscriber as needed.
495 *
496 * @private
497 */
498 _listen() {
499 this.on('newListener', event => {
500 if (!this.isOpen && event === 'message') {
501 this._subscriber.open();
502 }
503 });
504 this.on('removeListener', () => {
505 if (this.isOpen && this.listenerCount('message') === 0) {
506 this._subscriber.close();
507 }
508 });
509 }
510 /*!
511 * Formats Subscription metadata.
512 *
513 * @private
514 */
515 static formatMetadata_(metadata) {
516 const formatted = extend(true, {}, metadata);
517 if (typeof metadata.messageRetentionDuration === 'number') {
518 formatted.messageRetentionDuration = {
519 seconds: metadata.messageRetentionDuration,
520 nanos: 0,
521 };
522 }
523 if (metadata.pushEndpoint) {
524 formatted.pushConfig = {
525 pushEndpoint: metadata.pushEndpoint,
526 };
527 delete formatted.pushEndpoint;
528 }
529 if (metadata.oidcToken) {
530 formatted.pushConfig = {
531 ...formatted.pushConfig,
532 oidcToken: metadata.oidcToken,
533 };
534 delete formatted.oidcToken;
535 }
536 return formatted;
537 }
538 /*!
539 * Format the name of a subscription. A subscription's full name is in the
540 * format of projects/{projectId}/subscriptions/{subName}.
541 *
542 * @private
543 */
544 static formatName_(projectId, name) {
545 // Simple check if the name is already formatted.
546 if (name.indexOf('/') > -1) {
547 return name;
548 }
549 return 'projects/' + projectId + '/subscriptions/' + name;
550 }
551}
552exports.Subscription = Subscription;
553/*! Developer Documentation
554 *
555 * All async methods (except for streams) will return a Promise in the event
556 * that a callback is omitted.
557 */
558util_1.promisifySome(Subscription, Subscription.prototype, [
559 'close',
560 'create',
561 'createSnapshot',
562 'delete',
563 'detached',
564 'exists',
565 'get',
566 'getMetadata',
567 'modifyPushConfig',
568 'seek',
569 'setMetadata',
570]);
571//# sourceMappingURL=subscription.js.map
\No newline at end of file