UNPKG

20.2 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Subscription = void 0;
19const events_1 = require("events");
20const extend = require("extend");
21const snakeCase = require("lodash.snakecase");
22const iam_1 = require("./iam");
23const snapshot_1 = require("./snapshot");
24const subscriber_1 = require("./subscriber");
25const util_1 = require("./util");
26// JSDoc won't see these, so this is just to let you get typings
27// in your editor of choice.
28//
29// NOTE: These are commented out for now because we don't want to
30// break any existing clients that rely on not-entirely-correct
31// typings. We'll re-enable on the next major.
32/* export declare interface Subscription {
33 on(
34 event: 'message',
35 listener: (message: Message) => void
36 ): this;
37 on(
38 event: 'error',
39 listener: (error: StatusError) => void
40 ): this;
41 on(event: 'close', listener: () => void): this;
42 on(event: 'debug', listener: (error: StatusError) => void); this;
43
44 // Only used internally.
45 on(event: 'newListener', listener: Function): this;
46 on(event: 'removeListener', listener: Function): this;
47
48 // Catch-all. If you get an error about this line, it means you're
49 // using an unsupported event type or listener type.
50 on(event: string, listener: void): this;
51} */
52/**
53 * @typedef {object} ExpirationPolicy
54 * A policy that specifies the conditions for this subscription's expiration. A
55 * subscription is considered active as long as any connected subscriber is
56 * successfully consuming messages from the subscription or is issuing
57 * operations on the subscription. If expirationPolicy is not set, a default
58 * policy with ttl of 31 days will be used. The minimum allowed value for
59 * expirationPolicy.ttl is 1 day.
60 * @property {google.protobuf.Duration} ttl Specifies the "time-to-live"
61 * duration for an associated resource. The resource expires if it is not
62 * active for a period of `ttl`. The definition of "activity" depends on the
63 * type of the associated resource. The minimum and maximum allowed values
64 * for `ttl` depend on the type of the associated resource, as well. If
65 * `ttl` is not set, the associated resource never expires.
66 */
67/**
68 * A Subscription object will give you access to your Cloud Pub/Sub
69 * subscription.
70 *
71 * Subscriptions are sometimes retrieved when using various methods:
72 *
73 * - {@link PubSub#getSubscriptions}
74 * - {@link Topic#getSubscriptions}
75 *
76 * Subscription objects may be created directly with:
77 *
78 * - {@link PubSub#createSubscription}
79 * - {@link Topic#createSubscription}
80 *
81 * All Subscription objects are instances of an
82 * [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull
83 * for messages automatically as long as there is at least one listener assigned
84 * for the `message` event. Available events:
85 *
86 * Upon receipt of a message:
87 * on(event: 'message', listener: (message: {@link Message}) => void): this;
88 *
89 * Upon receipt of an error:
90 * on(event: 'error', listener: (error: Error) => void): this;
91 *
92 * Upon receipt of a (non-fatal) debug warning:
93 * on(event: 'debug', listener: (error: Error) => void): this;
94 *
95 * Upon the closing of the subscriber:
96 * on(event: 'close', listener: Function): this;
97 *
98 * By default Subscription objects allow you to process 100 messages at the same
99 * time. You can fine tune this value by adjusting the
100 * `options.flowControl.maxMessages` option.
101 *
102 * If your subscription is seeing more re-deliveries than preferable, you might
103 * try increasing your `options.ackDeadline` value or decreasing the
104 * `options.streamingOptions.maxStreams` value.
105 *
106 * Subscription objects handle ack management, by automatically extending the
107 * ack deadline while the message is being processed, to then issue the ack or
108 * nack of such message when the processing is done. **Note:** message
109 * redelivery is still possible.
110 *
111 * By default each {@link PubSub} instance can handle 100 open streams, with
112 * default options this translates to less than 20 Subscriptions per PubSub
113 * instance. If you wish to create more Subscriptions than that, you can either
114 * create multiple PubSub instances or lower the
115 * `options.streamingOptions.maxStreams` value on each Subscription object.
116 *
117 * @class
118 *
119 * @param {PubSub} pubsub PubSub object.
120 * @param {string} name The name of the subscription.
121 * @param {SubscriberOptions} [options] Options for handling messages.
122 *
123 * @example From {@link PubSub#getSubscriptions}
124 * ```
125 * const {PubSub} = require('@google-cloud/pubsub');
126 * const pubsub = new PubSub();
127 *
128 * pubsub.getSubscriptions((err, subscriptions) => {
129 * // `subscriptions` is an array of Subscription objects.
130 * });
131 *
132 * ```
133 * @example From {@link Topic#getSubscriptions}
134 * ```
135 * const topic = pubsub.topic('my-topic');
136 * topic.getSubscriptions((err, subscriptions) => {
137 * // `subscriptions` is an array of Subscription objects.
138 * });
139 *
140 * ```
141 * @example {@link Topic#createSubscription}
142 * ```
143 * const topic = pubsub.topic('my-topic');
144 * topic.createSubscription('new-subscription', (err, subscription) => {
145 * // `subscription` is a Subscription object.
146 * });
147 *
148 * ```
149 * @example {@link Topic#subscription}
150 * ```
151 * const topic = pubsub.topic('my-topic');
152 * const subscription = topic.subscription('my-subscription');
153 * // `subscription` is a Subscription object.
154 *
155 * ```
156 * @example Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.
157 * ```
158 * // Register an error handler.
159 * subscription.on('error', (err) => {});
160 *
161 * // Register a debug handler, to catch non-fatal errors.
162 * subscription.on('debug', (err) => { console.error(err); });
163 *
164 * // Register a close handler in case the subscriber closes unexpectedly
165 * subscription.on('close', () => {});
166 *
167 * // Register a listener for `message` events.
168 * function onMessage(message) {
169 * // Called every time a message is received.
170 *
171 * // message.id = ID of the message.
172 * // message.ackId = ID used to acknowledge the message receival.
173 * // message.data = Contents of the message.
174 * // message.attributes = Attributes of the message.
175 * // message.publishTime = Date when Pub/Sub received the message.
176 *
177 * // Ack the message:
178 * // message.ack();
179 *
180 * // This doesn't ack the message, but allows more messages to be retrieved
181 * // if your limit was hit or if you don't want to ack the message.
182 * // message.nack();
183 * }
184 * subscription.on('message', onMessage);
185 *
186 * // Remove the listener from receiving `message` events.
187 * subscription.removeListener('message', onMessage);
188 *
189 * ```
190 * @example To apply a fine level of flow control, consider the following configuration
191 * ```
192 * const subscription = topic.subscription('my-sub', {
193 * flowControl: {
194 * maxMessages: 1,
195 * // this tells the client to manage and lock any excess messages
196 * allowExcessMessages: false
197 * }
198 * });
199 * ```
200 */
201class Subscription extends events_1.EventEmitter {
202 constructor(pubsub, name, options) {
203 super();
204 options = options || {};
205 this.pubsub = pubsub;
206 this.request = pubsub.request.bind(pubsub);
207 this.name = Subscription.formatName_(this.projectId, name);
208 this.topic = options.topic;
209 /**
210 * [IAM (Identity and Access
211 * Management)](https://cloud.google.com/pubsub/access_control) allows you
212 * to set permissions on individual resources and offers a wider range of
213 * roles: editor, owner, publisher, subscriber, and viewer. This gives you
214 * greater flexibility and allows you to set more fine-grained access
215 * control.
216 *
217 * *The IAM access control features described in this document are Beta,
218 * including the API methods to get and set IAM policies, and to test IAM
219 * permissions. Cloud Pub/Sub's use of IAM features is not covered by
220 * any SLA or deprecation policy, and may be subject to
221 * backward-incompatible changes.*
222 *
223 * @name Subscription#iam
224 * @mixes IAM
225 *
226 * @see [Access Control Overview]{@link https://cloud.google.com/pubsub/access_control}
227 * @see [What is Cloud IAM?]{@link https://cloud.google.com/iam/}
228 *
229 * @example
230 * ```
231 * //-
232 * // Get the IAM policy for your subscription.
233 * //-
234 * subscription.iam.getPolicy((err, policy) => {
235 * console.log(policy);
236 * });
237 *
238 * //-
239 * // If the callback is omitted, we'll return a Promise.
240 * //-
241 * subscription.iam.getPolicy().then((data) => {
242 * const policy = data[0];
243 * const apiResponse = data[1];
244 * });
245 * ```
246 */
247 this.iam = new iam_1.IAM(pubsub, this.name);
248 this._subscriber = new subscriber_1.Subscriber(this, options);
249 this._subscriber
250 .on('error', err => this.emit('error', err))
251 .on('debug', err => this.emit('debug', err))
252 .on('message', message => this.emit('message', message))
253 .on('close', () => this.emit('close'));
254 this._listen();
255 }
256 /**
257 * Indicates if the Subscription is open and receiving messages.
258 *
259 * @type {boolean}
260 */
261 get isOpen() {
262 return !!(this._subscriber && this._subscriber.isOpen);
263 }
264 /**
265 * @type {string}
266 */
267 get projectId() {
268 return (this.pubsub && this.pubsub.projectId) || '{{projectId}}';
269 }
270 close(callback) {
271 this._subscriber.close().then(() => callback(), callback);
272 }
273 create(optsOrCallback, callback) {
274 if (!this.topic) {
275 throw new Error('Subscriptions can only be created when accessed through Topics');
276 }
277 const name = this.name.split('/').pop();
278 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
279 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
280 this.pubsub.createSubscription(this.topic, name, options, (err, sub, resp) => {
281 if (err) {
282 callback(err, null, resp);
283 return;
284 }
285 Object.assign(this, sub);
286 callback(null, this, resp);
287 });
288 }
289 createSnapshot(name, optsOrCallback, callback) {
290 if (typeof name !== 'string') {
291 throw new Error('A name is required to create a snapshot.');
292 }
293 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
294 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
295 const snapshot = this.snapshot(name);
296 const reqOpts = {
297 name: snapshot.name,
298 subscription: this.name,
299 };
300 this.request({
301 client: 'SubscriberClient',
302 method: 'createSnapshot',
303 reqOpts,
304 gaxOpts,
305 }, (err, resp) => {
306 if (err) {
307 callback(err, null, resp);
308 return;
309 }
310 snapshot.metadata = resp;
311 callback(null, snapshot, resp);
312 });
313 }
314 delete(optsOrCallback, callback) {
315 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
316 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
317 const reqOpts = {
318 subscription: this.name,
319 };
320 if (this.isOpen) {
321 this._subscriber.close();
322 }
323 this.request({
324 client: 'SubscriberClient',
325 method: 'deleteSubscription',
326 reqOpts,
327 gaxOpts,
328 }, callback);
329 }
330 detached(callback) {
331 this.getMetadata((err, metadata) => {
332 if (err) {
333 callback(err);
334 }
335 else {
336 callback(null, metadata.detached);
337 }
338 });
339 }
340 exists(callback) {
341 this.getMetadata(err => {
342 if (!err) {
343 callback(null, true);
344 return;
345 }
346 if (err.code === 5) {
347 callback(null, false);
348 return;
349 }
350 callback(err);
351 });
352 }
353 get(optsOrCallback, callback) {
354 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
355 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
356 const autoCreate = !!gaxOpts.autoCreate && this.topic;
357 delete gaxOpts.autoCreate;
358 this.getMetadata(gaxOpts, (err, apiResponse) => {
359 if (!err) {
360 callback(null, this, apiResponse);
361 return;
362 }
363 if (err.code !== 5 || !autoCreate) {
364 callback(err, null, apiResponse);
365 return;
366 }
367 this.create({ gaxOpts }, callback);
368 });
369 }
370 getMetadata(optsOrCallback, callback) {
371 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
372 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
373 const reqOpts = {
374 subscription: this.name,
375 };
376 this.request({
377 client: 'SubscriberClient',
378 method: 'getSubscription',
379 reqOpts,
380 gaxOpts,
381 }, (err, apiResponse) => {
382 if (!err) {
383 this.metadata = apiResponse;
384 }
385 callback(err, apiResponse);
386 });
387 }
388 modifyPushConfig(config, optsOrCallback, callback) {
389 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
390 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
391 const reqOpts = {
392 subscription: this.name,
393 pushConfig: config,
394 };
395 this.request({
396 client: 'SubscriberClient',
397 method: 'modifyPushConfig',
398 reqOpts,
399 gaxOpts,
400 }, callback);
401 }
402 /**
403 * Opens the Subscription to receive messages. In general this method
404 * shouldn't need to be called, unless you wish to receive messages after
405 * calling {@link Subscription#close}. Alternatively one could just assign a
406 * new `message` event listener which will also re-open the Subscription.
407 *
408 * @example
409 * ```
410 * subscription.on('message', message => message.ack());
411 *
412 * // Close the subscription.
413 * subscription.close(err => {
414 * if (err) {
415 * // Error handling omitted.
416 * }
417 *
418 * The subscription has been closed and messages will no longer be received.
419 * });
420 *
421 * // Resume receiving messages.
422 * subscription.open();
423 * ```
424 */
425 open() {
426 if (!this._subscriber.isOpen) {
427 this._subscriber.open();
428 }
429 }
430 seek(snapshot, optsOrCallback, callback) {
431 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
432 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
433 const reqOpts = {
434 subscription: this.name,
435 };
436 if (typeof snapshot === 'string') {
437 reqOpts.snapshot = snapshot_1.Snapshot.formatName_(this.pubsub.projectId, snapshot);
438 }
439 else if (Object.prototype.toString.call(snapshot) === '[object Date]') {
440 const dateMillis = snapshot.getTime();
441 reqOpts.time = {
442 seconds: Math.floor(dateMillis / 1000),
443 nanos: Math.floor(dateMillis % 1000) * 1000,
444 };
445 }
446 else {
447 throw new Error('Either a snapshot name or Date is needed to seek to.');
448 }
449 this.request({
450 client: 'SubscriberClient',
451 method: 'seek',
452 reqOpts,
453 gaxOpts,
454 }, callback);
455 }
456 setMetadata(metadata, optsOrCallback, callback) {
457 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
458 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
459 const subscription = Subscription.formatMetadata_(metadata);
460 const fields = Object.keys(subscription).map(snakeCase);
461 subscription.name = this.name;
462 const reqOpts = {
463 subscription,
464 updateMask: {
465 paths: fields,
466 },
467 };
468 this.request({
469 client: 'SubscriberClient',
470 method: 'updateSubscription',
471 reqOpts,
472 gaxOpts,
473 }, callback);
474 }
475 /**
476 * Sets the Subscription options.
477 *
478 * @param {SubscriberOptions} options The options.
479 */
480 setOptions(options) {
481 this._subscriber.setOptions(options);
482 }
483 /**
484 * Create a Snapshot object. See {@link Subscription#createSnapshot} to
485 * create a snapshot.
486 *
487 * @throws {Error} If a name is not provided.
488 *
489 * @param {string} name The name of the snapshot.
490 * @returns {Snapshot}
491 *
492 * @example
493 * ```
494 * const snapshot = subscription.snapshot('my-snapshot');
495 * ```
496 */
497 snapshot(name) {
498 return this.pubsub.snapshot.call(this, name);
499 }
500 /**
501 * Watches for incoming message event handlers and open/closes the
502 * subscriber as needed.
503 *
504 * @private
505 */
506 _listen() {
507 this.on('newListener', event => {
508 if (!this.isOpen && event === 'message') {
509 this._subscriber.open();
510 }
511 });
512 this.on('removeListener', () => {
513 if (this.isOpen && this.listenerCount('message') === 0) {
514 this._subscriber.close();
515 }
516 });
517 }
518 /*!
519 * Formats Subscription metadata.
520 *
521 * @private
522 */
523 static formatMetadata_(metadata) {
524 const formatted = extend(true, {}, metadata);
525 if (typeof metadata.messageRetentionDuration === 'number') {
526 formatted.messageRetentionDuration = {
527 seconds: metadata.messageRetentionDuration,
528 nanos: 0,
529 };
530 }
531 if (metadata.pushEndpoint) {
532 formatted.pushConfig = {
533 pushEndpoint: metadata.pushEndpoint,
534 };
535 delete formatted.pushEndpoint;
536 }
537 if (metadata.oidcToken) {
538 formatted.pushConfig = {
539 ...formatted.pushConfig,
540 oidcToken: metadata.oidcToken,
541 };
542 delete formatted.oidcToken;
543 }
544 return formatted;
545 }
546 /*!
547 * Format the name of a subscription. A subscription's full name is in the
548 * format of projects/{projectId}/subscriptions/{subName}.
549 *
550 * @private
551 */
552 static formatName_(projectId, name) {
553 // Simple check if the name is already formatted.
554 if (name.indexOf('/') > -1) {
555 return name;
556 }
557 return 'projects/' + projectId + '/subscriptions/' + name;
558 }
559}
560exports.Subscription = Subscription;
561/*! Developer Documentation
562 *
563 * All async methods (except for streams) will return a Promise in the event
564 * that a callback is omitted.
565 */
566util_1.promisifySome(Subscription, Subscription.prototype, [
567 'close',
568 'create',
569 'createSnapshot',
570 'delete',
571 'detached',
572 'exists',
573 'get',
574 'getMetadata',
575 'modifyPushConfig',
576 'seek',
577 'setMetadata',
578]);
579//# sourceMappingURL=subscription.js.map
\No newline at end of file