UNPKG

32.5 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.PubSub = void 0;
19const paginator_1 = require("@google-cloud/paginator");
20const projectify_1 = require("@google-cloud/projectify");
21const extend = require("extend");
22const google_auth_library_1 = require("google-auth-library");
23const gax = require("google-gax");
24// eslint-disable-next-line @typescript-eslint/no-var-requires
25const PKG = require('../../package.json');
26// eslint-disable-next-line @typescript-eslint/no-var-requires
27const v1 = require('./v1');
28const util_1 = require("./util");
29const schema_1 = require("./schema");
30const snapshot_1 = require("./snapshot");
31const subscription_1 = require("./subscription");
32const topic_1 = require("./topic");
33/**
34 * Project ID placeholder.
35 * @type {string}
36 * @private
37 */
38const PROJECT_ID_PLACEHOLDER = '{{projectId}}';
39/**
40 * @typedef {object} ClientConfig
41 * @property {string} [projectId] The project ID from the Google Developer's
42 * Console, e.g. 'grape-spaceship-123'. We will also check the environment
43 * variable `GCLOUD_PROJECT` for your project ID. If your app is running in
44 * an environment which supports {@link
45 * https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
46 * Application Default Credentials}, your project ID will be detected
47 * automatically.
48 * @property {string} [keyFilename] Full path to the a .json, .pem, or .p12 key
49 * downloaded from the Google Developers Console. If you provide a path to a
50 * JSON file, the `projectId` option above is not necessary. NOTE: .pem and
51 * .p12 require you to specify the `email` option as well.
52 * @property {string} [apiEndpoint] The `apiEndpoint` from options will set the
53 * host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from the
54 * gcloud SDK is honored. We also check the `CLOUD_API_ENDPOINT_OVERRIDES_PUBSUB`
55 * environment variable used by `gcloud alpha pubsub`. Otherwise the actual API
56 * endpoint will be used. Note that if the URL doesn't end in '.googleapis.com',
57 * we will assume that it's an emulator and disable strict SSL checks.
58 * @property {string} [email] Account email address. Required when using a .pem
59 * or .p12 keyFilename.
60 * @property {object} [credentials] Credentials object.
61 * @property {string} [credentials.client_email]
62 * @property {string} [credentials.private_key]
63 * @property {boolean} [autoRetry=true] Automatically retry requests if the
64 * response is related to rate limits or certain intermittent server errors.
65 * We will exponentially backoff subsequent requests by default.
66 * @property {Constructor} [promise] Custom promise module to use instead of
67 * native Promises.
68 */
69/**
70 * [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a
71 * reliable, many-to-many, asynchronous messaging service from Cloud
72 * Platform.
73 *
74 * @class
75 *
76 * @see [Cloud Pub/Sub overview]{@link https://developers.google.com/pubsub/overview}
77 *
78 * @param {ClientConfig} [options] Configuration options.
79 *
80 * @example Import the client library
81 * ```
82 * const {PubSub} = require('@google-cloud/pubsub');
83 *
84 * ```
85 * @example Create a client that uses <a href="https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application">Application Default Credentials (ADC)</a>:
86 * ```
87 * const pubsub = new PubSub();
88 *
89 * ```
90 * @example Create a client with <a href="https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually">explicit credentials</a>:
91 * ```
92 * const pubsub = new PubSub({
93 * projectId: 'your-project-id',
94 * keyFilename: '/path/to/keyfile.json'
95 * });
96 *
97 * ```
98 * @example <caption>include:samples/quickstart.js</caption>
99 * region_tag:pubsub_quickstart_create_topic
100 * Full quickstart example:
101 */
102class PubSub {
103 constructor(options) {
104 this.getSubscriptionsStream = paginator_1.paginator.streamify('getSubscriptions');
105 this.getSnapshotsStream = paginator_1.paginator.streamify('getSnapshots');
106 this.getTopicsStream = paginator_1.paginator.streamify('getTopics');
107 this.isOpen = true;
108 options = Object.assign({}, options || {});
109 // Needed for potentially large responses that may come from using exactly-once delivery.
110 // This will get passed down to grpc client objects.
111 const maxMetadataSize = 'grpc.max_metadata_size';
112 // eslint-disable-next-line @typescript-eslint/no-explicit-any
113 const optionsAny = options;
114 if (optionsAny[maxMetadataSize] === undefined) {
115 optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB
116 }
117 // Determine what scopes are needed.
118 // It is the union of the scopes on both clients.
119 const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
120 const allScopes = {};
121 for (const clientClass of clientClasses) {
122 for (const scope of clientClass.scopes) {
123 allScopes[scope] = true;
124 }
125 }
126 this.options = Object.assign({
127 libName: 'gccl',
128 libVersion: PKG.version,
129 scopes: Object.keys(allScopes),
130 }, options);
131 /**
132 * @name PubSub#isEmulator
133 * @type {boolean}
134 */
135 this.isEmulator = false;
136 this.determineBaseUrl_();
137 this.api = {};
138 this.auth = new google_auth_library_1.GoogleAuth(this.options);
139 this.projectId = this.options.projectId || PROJECT_ID_PLACEHOLDER;
140 if (this.projectId !== PROJECT_ID_PLACEHOLDER) {
141 this.name = PubSub.formatName_(this.projectId);
142 }
143 }
144 /**
145 * Returns true if we have actually resolved the full project name.
146 *
147 * @returns {boolean} true if the name is resolved.
148 */
149 get isIdResolved() {
150 return this.projectId.indexOf(PROJECT_ID_PLACEHOLDER) < 0;
151 }
152 close(callback) {
153 const definedCallback = callback || (() => { });
154 if (this.isOpen) {
155 this.isOpen = false;
156 this.closeAllClients_()
157 .then(() => { var _a; return (_a = this.schemaClient) === null || _a === void 0 ? void 0 : _a.close(); })
158 .then(() => {
159 definedCallback(null);
160 })
161 .catch(definedCallback);
162 }
163 else {
164 definedCallback(null);
165 }
166 }
167 /**
168 * Create a schema in the project.
169 *
170 * @see [Schemas: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/create}
171 * @see {@link Schema#create}
172 *
173 * @throws {Error} If a schema ID or name is not provided.
174 * @throws {Error} If an invalid SchemaType is provided.
175 * @throws {Error} If an invalid schema definition is provided.
176 *
177 * @param {string} schemaId The name or ID of the subscription.
178 * @param {SchemaType} type The type of the schema (Protobuf, Avro, etc).
179 * @param {string} definition The text describing the schema in terms of the type.
180 * @param {object} [options] Request configuration options, outlined
181 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
182 * @returns {Promise<Schema>}
183 *
184 * @example Create a schema.
185 * ```
186 * const {PubSub} = require('@google-cloud/pubsub');
187 * const pubsub = new PubSub();
188 *
189 * await pubsub.createSchema(
190 * 'messageType',
191 * SchemaTypes.Avro,
192 * '{...avro definition...}'
193 * );
194 * ```
195 */
196 async createSchema(schemaId, type, definition, gaxOpts) {
197 // This populates projectId for us.
198 await this.getClientConfig();
199 const schemaName = schema_1.Schema.formatName_(this.projectId, schemaId);
200 const request = {
201 parent: this.name,
202 schemaId,
203 schema: {
204 name: schemaName,
205 type,
206 definition,
207 },
208 };
209 const client = await this.getSchemaClient_();
210 await client.createSchema(request, gaxOpts);
211 return new schema_1.Schema(this, schemaName);
212 }
213 createSubscription(topic, name, optionsOrCallback, callback) {
214 if (typeof topic !== 'string' && !(topic instanceof topic_1.Topic)) {
215 throw new Error('A Topic is required for a new subscription.');
216 }
217 if (typeof name !== 'string') {
218 throw new Error('A subscription name is required.');
219 }
220 if (typeof topic === 'string') {
221 topic = this.topic(topic);
222 }
223 let options = typeof optionsOrCallback === 'object'
224 ? optionsOrCallback
225 : {};
226 callback =
227 typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;
228 // Make a deep copy of options to not pollute caller object.
229 options = extend(true, {}, options);
230 const gaxOpts = options.gaxOpts;
231 const flowControl = options.flowControl;
232 delete options.gaxOpts;
233 delete options.flowControl;
234 const metadata = subscription_1.Subscription.formatMetadata_(options);
235 let subscriptionCtorOptions = flowControl ? { flowControl } : {};
236 subscriptionCtorOptions = Object.assign(subscriptionCtorOptions, metadata);
237 const subscription = this.subscription(name, subscriptionCtorOptions);
238 const reqOpts = Object.assign(metadata, {
239 topic: topic.name,
240 name: subscription.name,
241 });
242 this.request({
243 client: 'SubscriberClient',
244 method: 'createSubscription',
245 reqOpts,
246 gaxOpts,
247 }, (err, resp) => {
248 if (err) {
249 callback(err, null, resp);
250 return;
251 }
252 subscription.metadata = resp;
253 callback(null, subscription, resp);
254 });
255 }
256 createTopic(name, optsOrCallback, callback) {
257 const reqOpts = typeof name === 'string'
258 ? {
259 name,
260 }
261 : name;
262 // We don't allow a blank name, but this will let topic() handle that case.
263 const topic = this.topic(reqOpts.name || '');
264 // Topic#constructor might have canonicalized the name.
265 reqOpts.name = topic.name;
266 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
267 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
268 this.request({
269 client: 'PublisherClient',
270 method: 'createTopic',
271 reqOpts,
272 gaxOpts,
273 }, (err, resp) => {
274 if (err) {
275 callback(err, null, resp);
276 return;
277 }
278 topic.metadata = resp;
279 callback(null, topic, resp);
280 });
281 }
282 detachSubscription(name, optsOrCallback, callback) {
283 if (typeof name !== 'string') {
284 throw new Error('A subscription name is required.');
285 }
286 const sub = this.subscription(name);
287 const reqOpts = {
288 subscription: sub.name,
289 };
290 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
291 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
292 this.request({
293 client: 'PublisherClient',
294 method: 'detachSubscription',
295 reqOpts,
296 gaxOpts: gaxOpts,
297 }, callback);
298 }
299 /**
300 * Determine the appropriate endpoint to use for API requests, first trying
301 * the `apiEndpoint` parameter. If that isn't set, we try the Pub/Sub emulator
302 * environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try
303 * the standard `gcloud alpha pubsub` environment variable
304 * (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production
305 * API is used.
306 *
307 * Note that if the URL doesn't end in '.googleapis.com', we will assume that
308 * it's an emulator and disable strict SSL checks.
309 *
310 * @private
311 */
312 determineBaseUrl_() {
313 // We allow an override from the client object options, or from
314 // one of these variables. The CLOUDSDK variable is provided for
315 // compatibility with the `gcloud alpha` utility.
316 const gcloudVarName = 'CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB';
317 const emulatorVarName = 'PUBSUB_EMULATOR_HOST';
318 const apiEndpoint = this.options.apiEndpoint ||
319 process.env[emulatorVarName] ||
320 process.env[gcloudVarName];
321 if (!apiEndpoint) {
322 return;
323 }
324 // Parse the URL into a hostname and port, if possible.
325 const leadingProtocol = new RegExp('^https?://');
326 const trailingSlashes = new RegExp('/*$');
327 const baseUrlParts = apiEndpoint
328 .replace(leadingProtocol, '')
329 .replace(trailingSlashes, '')
330 .split(':');
331 this.options.servicePath = baseUrlParts[0];
332 if (!baseUrlParts[1]) {
333 // No port was given -- figure it out from the protocol.
334 if (apiEndpoint.startsWith('https')) {
335 this.options.port = 443;
336 }
337 else if (apiEndpoint.startsWith('http')) {
338 this.options.port = 80;
339 }
340 else {
341 this.options.port = undefined;
342 }
343 }
344 else {
345 this.options.port = parseInt(baseUrlParts[1], 10);
346 }
347 // If this looks like a GCP URL of some kind, don't go into emulator
348 // mode. Otherwise, supply a fake SSL provider so a real cert isn't
349 // required for running the emulator.
350 const officialUrlMatch = this.options.servicePath.endsWith('.googleapis.com');
351 if (!officialUrlMatch) {
352 const grpcInstance = this.options.grpc || gax.grpc;
353 this.options.sslCreds = grpcInstance.credentials.createInsecure();
354 this.isEmulator = true;
355 }
356 if (!this.options.projectId && process.env.PUBSUB_PROJECT_ID) {
357 this.options.projectId = process.env.PUBSUB_PROJECT_ID;
358 }
359 }
360 /**
361 * Get a list of schemas associated with your project.
362 *
363 * The returned AsyncIterable will resolve to {@link google.pubsub.v1.ISchema} objects.
364 *
365 * This method returns an async iterable. These objects can be adapted
366 * to work in a Promise/then framework, as well as with callbacks, but
367 * this discussion is considered out of scope for these docs.
368 *
369 * @see [Schemas: list API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/list}
370 * @see [More about async iterators]{@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of}
371 *
372 * @param {google.pubsub.v1.SchemaView} [view] The type of schema objects
373 * requested, which should be an enum value from {@link SchemaViews}. Defaults
374 * to Full.
375 * @param {object} [options] Request configuration options, outlined
376 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
377 * @returns {AsyncIterable<ISchema>}
378 *
379 * @example
380 * ```
381 * for await (const s of pubsub.listSchemas()) {
382 * const moreInfo = await s.get();
383 * }
384 * ```
385 */
386 async *listSchemas(view = schema_1.SchemaViews.Basic, options) {
387 const client = await this.getSchemaClient_();
388 const query = {
389 parent: this.name,
390 view,
391 };
392 for await (const s of client.listSchemasAsync(query, options)) {
393 yield s;
394 }
395 }
396 getSnapshots(optsOrCallback, callback) {
397 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
398 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
399 const reqOpts = Object.assign({
400 project: PubSub.formatName_(this.projectId),
401 }, options);
402 delete reqOpts.gaxOpts;
403 delete reqOpts.autoPaginate;
404 const gaxOpts = Object.assign({
405 autoPaginate: options.autoPaginate,
406 }, options.gaxOpts);
407 this.request({
408 client: 'SubscriberClient',
409 method: 'listSnapshots',
410 reqOpts,
411 gaxOpts,
412 }, (err, rawSnapshots, ...args) => {
413 let snapshots;
414 if (rawSnapshots) {
415 snapshots = rawSnapshots.map((snapshot) => {
416 const snapshotInstance = this.snapshot(snapshot.name);
417 snapshotInstance.metadata = snapshot;
418 return snapshotInstance;
419 });
420 }
421 callback(err, snapshots, ...args);
422 });
423 }
424 getSubscriptions(optsOrCallback, callback) {
425 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
426 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
427 let topic = options.topic;
428 if (topic) {
429 if (!(topic instanceof topic_1.Topic)) {
430 topic = this.topic(topic);
431 }
432 return topic.getSubscriptions(options, callback);
433 }
434 const reqOpts = Object.assign({}, options);
435 reqOpts.project = 'projects/' + this.projectId;
436 delete reqOpts.gaxOpts;
437 delete reqOpts.autoPaginate;
438 const gaxOpts = Object.assign({
439 autoPaginate: options.autoPaginate,
440 }, options.gaxOpts);
441 this.request({
442 client: 'SubscriberClient',
443 method: 'listSubscriptions',
444 reqOpts,
445 gaxOpts,
446 }, (err, rawSubs, ...args) => {
447 let subscriptions;
448 if (rawSubs) {
449 subscriptions = rawSubs.map((sub) => {
450 const subscriptionInstance = this.subscription(sub.name);
451 subscriptionInstance.metadata = sub;
452 return subscriptionInstance;
453 });
454 }
455 callback(err, subscriptions, ...args);
456 });
457 }
458 getTopics(optsOrCallback, callback) {
459 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
460 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
461 const reqOpts = Object.assign({
462 project: 'projects/' + this.projectId,
463 }, options);
464 delete reqOpts.gaxOpts;
465 delete reqOpts.autoPaginate;
466 const gaxOpts = Object.assign({
467 autoPaginate: options.autoPaginate,
468 }, options.gaxOpts);
469 this.request({
470 client: 'PublisherClient',
471 method: 'listTopics',
472 reqOpts,
473 gaxOpts,
474 }, (err, rawTopics, ...args) => {
475 let topics;
476 if (rawTopics) {
477 topics = rawTopics.map(topic => {
478 const topicInstance = this.topic(topic.name);
479 topicInstance.metadata = topic;
480 return topicInstance;
481 });
482 }
483 callback(err, topics, ...args);
484 });
485 }
486 /**
487 * Retrieve a client configuration, suitable for passing into a GAPIC
488 * 'v1' class constructor. This will fill out projectId, emulator URLs,
489 * and so forth.
490 *
491 * @returns {Promise<ClientConfig>} the filled client configuration.
492 */
493 async getClientConfig() {
494 if (!this.projectId || this.projectId === PROJECT_ID_PLACEHOLDER) {
495 let projectId;
496 try {
497 projectId = await this.auth.getProjectId();
498 }
499 catch (e) {
500 if (!this.isEmulator) {
501 throw e;
502 }
503 projectId = '';
504 }
505 this.projectId = projectId;
506 this.name = PubSub.formatName_(this.projectId);
507 this.options.projectId = projectId;
508 }
509 return this.options;
510 }
511 /**
512 * Gets a schema client, creating one if needed.
513 * @private
514 */
515 async getSchemaClient_() {
516 if (!this.schemaClient) {
517 const options = await this.getClientConfig();
518 this.schemaClient = new v1.SchemaServiceClient(options);
519 }
520 return this.schemaClient;
521 }
522 /**
523 * Callback function to PubSub.getClient_().
524 * @private
525 * @callback GetClientCallback
526 * @param err - Error, if any.
527 * @param gaxClient - The gax client specified in RequestConfig.client.
528 * Typed any since it's importing Javascript source.
529 */
530 /**
531 * Get the PubSub client object.
532 *
533 * @private
534 *
535 * @param {object} config Configuration object.
536 * @param {object} config.gaxOpts GAX options.
537 * @param {function} config.method The gax method to call.
538 * @param {object} config.reqOpts Request options.
539 * @param {function} [callback] The callback function.
540 */
541 getClient_(config, callback) {
542 this.getClientAsync_(config).then(client => callback(null, client), callback);
543 }
544 /**
545 * Get the PubSub client object.
546 *
547 * @private
548 *
549 * @param {object} config Configuration object.
550 * @param {object} config.gaxOpts GAX options.
551 * @param {function} config.method The gax method to call.
552 * @param {object} config.reqOpts Request options.
553 * @returns {Promise}
554 */
555 async getClientAsync_(config) {
556 // Make sure we've got a fully created config with projectId and such.
557 const options = await this.getClientConfig();
558 let gaxClient = this.api[config.client];
559 if (!gaxClient) {
560 // Lazily instantiate client.
561 gaxClient = new v1[config.client](options);
562 this.api[config.client] = gaxClient;
563 }
564 return gaxClient;
565 }
566 /**
567 * Close all open client objects.
568 *
569 * @private
570 *
571 * @returns {Promise}
572 */
573 async closeAllClients_() {
574 const promises = [];
575 for (const clientConfig of Object.keys(this.api)) {
576 const gaxClient = this.api[clientConfig];
577 promises.push(gaxClient.close());
578 delete this.api[clientConfig];
579 }
580 await Promise.all(promises);
581 }
582 /**
583 * Funnel all API requests through this method, to be sure we have a project
584 * ID.
585 *
586 * @private
587 *
588 * @param {object} config Configuration object.
589 * @param {object} config.gaxOpts GAX options.
590 * @param {function} config.method The gax method to call.
591 * @param {object} config.reqOpts Request options.
592 * @param {function} [callback] The callback function.
593 */
594 request(config, callback) {
595 // This prevents further requests, in case any publishers were hanging around.
596 if (!this.isOpen) {
597 const statusObject = {
598 code: 0,
599 details: 'Cannot use a closed PubSub object.',
600 metadata: null,
601 };
602 const err = new Error(statusObject.details);
603 Object.assign(err, statusObject);
604 callback(err);
605 return;
606 }
607 this.getClient_(config, (err, client) => {
608 if (err) {
609 callback(err);
610 return;
611 }
612 let reqOpts = extend(true, {}, config.reqOpts);
613 reqOpts = projectify_1.replaceProjectIdToken(reqOpts, this.projectId);
614 client[config.method](reqOpts, config.gaxOpts, callback);
615 });
616 }
617 /**
618 * Create a Schema object, representing a schema within the project.
619 * See {@link PubSub#createSchema} or {@link Schema#create} to create a schema.
620 *
621 * @throws {Error} If a name is not provided.
622 *
623 * @param {string} name The ID or name of the schema.
624 * @returns {Schema} A {@link Schema} instance.
625 *
626 * @example
627 * ```
628 * const {PubSub} = require('@google-cloud/pubsub');
629 * const pubsub = new PubSub();
630 *
631 * const schema = pubsub.schema('my-schema');
632 * ```
633 */
634 schema(idOrName) {
635 return new schema_1.Schema(this, idOrName);
636 }
637 /**
638 * Create a Snapshot object. See {@link Subscription#createSnapshot} to
639 * create a snapshot.
640 *
641 * @throws {Error} If a name is not provided.
642 *
643 * @param {string} name The name of the snapshot.
644 * @returns {Snapshot} A {@link Snapshot} instance.
645 *
646 * @example
647 * ```
648 * const {PubSub} = require('@google-cloud/pubsub');
649 * const pubsub = new PubSub();
650 *
651 * const snapshot = pubsub.snapshot('my-snapshot');
652 * ```
653 */
654 snapshot(name) {
655 if (typeof name !== 'string') {
656 throw new Error('You must supply a valid name for the snapshot.');
657 }
658 return new snapshot_1.Snapshot(this, name);
659 }
660 /**
661 * Create a Subscription object. This command by itself will not run any API
662 * requests. You will receive a {@link Subscription} object,
663 * which will allow you to interact with a subscription.
664 *
665 * @throws {Error} If subscription name is omitted.
666 *
667 * @param {string} name Name of the subscription.
668 * @param {SubscriberOptions} [options] Configuration object.
669 * @returns {Subscription} A {@link Subscription} instance.
670 *
671 * @example
672 * ```
673 * const {PubSub} = require('@google-cloud/pubsub');
674 * const pubsub = new PubSub();
675 *
676 * const subscription = pubsub.subscription('my-subscription');
677 *
678 * // Register a listener for `message` events.
679 * subscription.on('message', function(message) {
680 * // Called every time a message is received.
681 * // message.id = ID of the message.
682 * // message.ackId = ID used to acknowledge the message receival.
683 * // message.data = Contents of the message.
684 * // message.attributes = Attributes of the message.
685 * // message.publishTime = Date when Pub/Sub received the message.
686 * });
687 * ```
688 */
689 subscription(name, options) {
690 if (!name) {
691 throw new Error('A name must be specified for a subscription.');
692 }
693 return new subscription_1.Subscription(this, name, options);
694 }
695 /**
696 * Create a Topic object. See {@link PubSub#createTopic} to create a topic.
697 *
698 * @throws {Error} If a name is not provided.
699 *
700 * @param {string} name The name of the topic.
701 * @param {PublishOptions} [options] Publisher configuration object.
702 * @returns {Topic} A {@link Topic} instance.
703 *
704 * @example
705 * ```
706 * const {PubSub} = require('@google-cloud/pubsub');
707 * const pubsub = new PubSub();
708 *
709 * const topic = pubsub.topic('my-topic');
710 * ```
711 */
712 topic(name, options) {
713 if (!name) {
714 throw new Error('A name must be specified for a topic.');
715 }
716 return new topic_1.Topic(this, name, options);
717 }
718 /**
719 * Validate a schema definition.
720 *
721 * @see [Schemas: validateSchema API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/validate}
722 *
723 * @throws {Error} if the validation fails.
724 *
725 * @param {ISchema} schema The schema definition you wish to validate.
726 * @param {object} [options] Request configuration options, outlined
727 * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
728 * @returns {Promise<void>}
729 */
730 async validateSchema(schema, gaxOpts) {
731 const client = await this.getSchemaClient_();
732 await client.validateSchema({
733 parent: this.name,
734 schema,
735 }, gaxOpts);
736 }
737 /*!
738 * Format the name of a project. A project's full name is in the
739 * format of projects/{projectId}.
740 *
741 * The GAPIC client should do this for us, but since we maintain
742 * names rather than IDs, this is simpler.
743 *
744 * @private
745 */
746 static formatName_(name) {
747 if (typeof name !== 'string') {
748 throw new Error('A name is required to identify a project.');
749 }
750 // Simple check if the name is already formatted.
751 if (name.indexOf('/') > -1) {
752 return name;
753 }
754 return `projects/${name}`;
755 }
756}
757exports.PubSub = PubSub;
758/**
759 * Get a list of the {@link Snapshot} objects as a readable object stream.
760 *
761 * @method PubSub#getSnapshotsStream
762 * @param {GetSnapshotsRequest} [options] Configuration object. See
763 * {@link PubSub#getSnapshots} for a complete list of options.
764 * @returns {ReadableStream} A readable stream of {@link Snapshot} instances.
765 *
766 * @example
767 * ```
768 * const {PubSub} = require('@google-cloud/pubsub');
769 * const pubsub = new PubSub();
770 *
771 * pubsub.getSnapshotsStream()
772 * .on('error', console.error)
773 * .on('data', function(snapshot) {
774 * // snapshot is a Snapshot object.
775 * })
776 * .on('end', function() {
777 * // All snapshots retrieved.
778 * });
779 *
780 * //-
781 * // If you anticipate many results, you can end a stream early to prevent
782 * // unnecessary processing and API requests.
783 * //-
784 * pubsub.getSnapshotsStream()
785 * .on('data', function(snapshot) {
786 * this.end();
787 * });
788 * ```
789 */
790/**
791 * Get a list of the {@link Subscription} objects registered to all of
792 * your project's topics as a readable object stream.
793 *
794 * @method PubSub#getSubscriptionsStream
795 * @param {GetSubscriptionsRequest} [options] Configuration object. See
796 * {@link PubSub#getSubscriptions} for a complete list of options.
797 * @returns {ReadableStream} A readable stream of {@link Subscription} instances.
798 *
799 * @example
800 * ```
801 * const {PubSub} = require('@google-cloud/pubsub');
802 * const pubsub = new PubSub();
803 *
804 * pubsub.getSubscriptionsStream()
805 * .on('error', console.error)
806 * .on('data', function(subscription) {
807 * // subscription is a Subscription object.
808 * })
809 * .on('end', function() {
810 * // All subscriptions retrieved.
811 * });
812 *
813 * //-
814 * // If you anticipate many results, you can end a stream early to prevent
815 * // unnecessary processing and API requests.
816 * //-
817 * pubsub.getSubscriptionsStream()
818 * .on('data', function(subscription) {
819 * this.end();
820 * });
821 * ```
822 */
823/**
824 * Get a list of the {module:pubsub/topic} objects registered to your project as
825 * a readable object stream.
826 *
827 * @method PubSub#getTopicsStream
828 * @param {GetTopicsRequest} [options] Configuration object. See
829 * {@link PubSub#getTopics} for a complete list of options.
830 * @returns {ReadableStream} A readable stream of {@link Topic} instances.
831 *
832 * @example
833 * ```
834 * const {PubSub} = require('@google-cloud/pubsub');
835 * const pubsub = new PubSub();
836 *
837 * pubsub.getTopicsStream()
838 * .on('error', console.error)
839 * .on('data', function(topic) {
840 * // topic is a Topic object.
841 * })
842 * .on('end', function() {
843 * // All topics retrieved.
844 * });
845 *
846 * //-
847 * // If you anticipate many results, you can end a stream early to prevent
848 * // unnecessary processing and API requests.
849 * //-
850 * pubsub.getTopicsStream()
851 * .on('data', function(topic) {
852 * this.end();
853 * });
854 * ```
855 */
856/*! Developer Documentation
857 *
858 * These methods can be auto-paginated.
859 */
860paginator_1.paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);
861/*! Developer Documentation
862 *
863 * Existing async methods (except for streams) will return a Promise in the event
864 * that a callback is omitted. Future methods will not allow for a callback.
865 * (Use .then() on the returned Promise instead.)
866 */
867util_1.promisifySome(PubSub, PubSub.prototype, [
868 'close',
869 'createSubscription',
870 'createTopic',
871 'detachSubscription',
872 'getSnapshots',
873 'getSubscriptions',
874 'getTopics',
875]);
876//# sourceMappingURL=pubsub.js.map
\No newline at end of file