1 | ;
|
2 | /*!
|
3 | * Copyright 2014 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.PubSub = void 0;
|
19 | const paginator_1 = require("@google-cloud/paginator");
|
20 | const projectify_1 = require("@google-cloud/projectify");
|
21 | const extend = require("extend");
|
22 | const google_auth_library_1 = require("google-auth-library");
|
23 | const gax = require("google-gax");
|
24 | // eslint-disable-next-line @typescript-eslint/no-var-requires
|
25 | const PKG = require('../../package.json');
|
26 | // eslint-disable-next-line @typescript-eslint/no-var-requires
|
27 | const v1 = require('./v1');
|
28 | const util_1 = require("./util");
|
29 | const schema_1 = require("./schema");
|
30 | const snapshot_1 = require("./snapshot");
|
31 | const subscription_1 = require("./subscription");
|
32 | const topic_1 = require("./topic");
|
33 | /**
|
34 | * Project ID placeholder.
|
35 | * @type {string}
|
36 | * @private
|
37 | */
|
38 | const PROJECT_ID_PLACEHOLDER = '{{projectId}}';
|
39 | /**
|
40 | * @typedef {object} ClientConfig
|
41 | * @property {string} [projectId] The project ID from the Google Developer's
|
42 | * Console, e.g. 'grape-spaceship-123'. We will also check the environment
|
43 | * variable `GCLOUD_PROJECT` for your project ID. If your app is running in
|
44 | * an environment which supports {@link
|
45 | * https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
|
46 | * Application Default Credentials}, your project ID will be detected
|
47 | * automatically.
|
48 | * @property {string} [keyFilename] Full path to the a .json, .pem, or .p12 key
|
49 | * downloaded from the Google Developers Console. If you provide a path to a
|
50 | * JSON file, the `projectId` option above is not necessary. NOTE: .pem and
|
51 | * .p12 require you to specify the `email` option as well.
|
52 | * @property {string} [apiEndpoint] The `apiEndpoint` from options will set the
|
53 | * host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from the
|
54 | * gcloud SDK is honored. We also check the `CLOUD_API_ENDPOINT_OVERRIDES_PUBSUB`
|
55 | * environment variable used by `gcloud alpha pubsub`. Otherwise the actual API
|
56 | * endpoint will be used. Note that if the URL doesn't end in '.googleapis.com',
|
57 | * we will assume that it's an emulator and disable strict SSL checks.
|
58 | * @property {string} [email] Account email address. Required when using a .pem
|
59 | * or .p12 keyFilename.
|
60 | * @property {object} [credentials] Credentials object.
|
61 | * @property {string} [credentials.client_email]
|
62 | * @property {string} [credentials.private_key]
|
63 | * @property {boolean} [autoRetry=true] Automatically retry requests if the
|
64 | * response is related to rate limits or certain intermittent server errors.
|
65 | * We will exponentially backoff subsequent requests by default.
|
66 | * @property {Constructor} [promise] Custom promise module to use instead of
|
67 | * native Promises.
|
68 | */
|
69 | /**
|
70 | * [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a
|
71 | * reliable, many-to-many, asynchronous messaging service from Cloud
|
72 | * Platform.
|
73 | *
|
74 | * @class
|
75 | *
|
76 | * @see [Cloud Pub/Sub overview]{@link https://developers.google.com/pubsub/overview}
|
77 | *
|
78 | * @param {ClientConfig} [options] Configuration options.
|
79 | *
|
80 | * @example Import the client library
|
81 | * ```
|
82 | * const {PubSub} = require('@google-cloud/pubsub');
|
83 | *
|
84 | * ```
|
85 | * @example Create a client that uses <a href="https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application">Application Default Credentials (ADC)</a>:
|
86 | * ```
|
87 | * const pubsub = new PubSub();
|
88 | *
|
89 | * ```
|
90 | * @example Create a client with <a href="https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually">explicit credentials</a>:
|
91 | * ```
|
92 | * const pubsub = new PubSub({
|
93 | * projectId: 'your-project-id',
|
94 | * keyFilename: '/path/to/keyfile.json'
|
95 | * });
|
96 | *
|
97 | * ```
|
98 | * @example <caption>include:samples/quickstart.js</caption>
|
99 | * region_tag:pubsub_quickstart_create_topic
|
100 | * Full quickstart example:
|
101 | */
|
102 | class PubSub {
|
103 | constructor(options) {
|
104 | this.getSubscriptionsStream = paginator_1.paginator.streamify('getSubscriptions');
|
105 | this.getSnapshotsStream = paginator_1.paginator.streamify('getSnapshots');
|
106 | this.getTopicsStream = paginator_1.paginator.streamify('getTopics');
|
107 | this.isOpen = true;
|
108 | options = Object.assign({}, options || {});
|
109 | // Needed for potentially large responses that may come from using exactly-once delivery.
|
110 | // This will get passed down to grpc client objects.
|
111 | const maxMetadataSize = 'grpc.max_metadata_size';
|
112 | // eslint-disable-next-line @typescript-eslint/no-explicit-any
|
113 | const optionsAny = options;
|
114 | if (optionsAny[maxMetadataSize] === undefined) {
|
115 | optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB
|
116 | }
|
117 | // Determine what scopes are needed.
|
118 | // It is the union of the scopes on both clients.
|
119 | const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
|
120 | const allScopes = {};
|
121 | for (const clientClass of clientClasses) {
|
122 | for (const scope of clientClass.scopes) {
|
123 | allScopes[scope] = true;
|
124 | }
|
125 | }
|
126 | this.options = Object.assign({
|
127 | libName: 'gccl',
|
128 | libVersion: PKG.version,
|
129 | scopes: Object.keys(allScopes),
|
130 | }, options);
|
131 | /**
|
132 | * @name PubSub#isEmulator
|
133 | * @type {boolean}
|
134 | */
|
135 | this.isEmulator = false;
|
136 | this.determineBaseUrl_();
|
137 | this.api = {};
|
138 | this.auth = new google_auth_library_1.GoogleAuth(this.options);
|
139 | this.projectId = this.options.projectId || PROJECT_ID_PLACEHOLDER;
|
140 | if (this.projectId !== PROJECT_ID_PLACEHOLDER) {
|
141 | this.name = PubSub.formatName_(this.projectId);
|
142 | }
|
143 | }
|
144 | /**
|
145 | * Returns true if we have actually resolved the full project name.
|
146 | *
|
147 | * @returns {boolean} true if the name is resolved.
|
148 | */
|
149 | get isIdResolved() {
|
150 | return this.projectId.indexOf(PROJECT_ID_PLACEHOLDER) < 0;
|
151 | }
|
152 | close(callback) {
|
153 | const definedCallback = callback || (() => { });
|
154 | if (this.isOpen) {
|
155 | this.isOpen = false;
|
156 | this.closeAllClients_()
|
157 | .then(() => { var _a; return (_a = this.schemaClient) === null || _a === void 0 ? void 0 : _a.close(); })
|
158 | .then(() => {
|
159 | definedCallback(null);
|
160 | })
|
161 | .catch(definedCallback);
|
162 | }
|
163 | else {
|
164 | definedCallback(null);
|
165 | }
|
166 | }
|
167 | /**
|
168 | * Create a schema in the project.
|
169 | *
|
170 | * @see [Schemas: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/create}
|
171 | * @see {@link Schema#create}
|
172 | *
|
173 | * @throws {Error} If a schema ID or name is not provided.
|
174 | * @throws {Error} If an invalid SchemaType is provided.
|
175 | * @throws {Error} If an invalid schema definition is provided.
|
176 | *
|
177 | * @param {string} schemaId The name or ID of the subscription.
|
178 | * @param {SchemaType} type The type of the schema (Protobuf, Avro, etc).
|
179 | * @param {string} definition The text describing the schema in terms of the type.
|
180 | * @param {object} [options] Request configuration options, outlined
|
181 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
182 | * @returns {Promise<Schema>}
|
183 | *
|
184 | * @example Create a schema.
|
185 | * ```
|
186 | * const {PubSub} = require('@google-cloud/pubsub');
|
187 | * const pubsub = new PubSub();
|
188 | *
|
189 | * await pubsub.createSchema(
|
190 | * 'messageType',
|
191 | * SchemaTypes.Avro,
|
192 | * '{...avro definition...}'
|
193 | * );
|
194 | * ```
|
195 | */
|
196 | async createSchema(schemaId, type, definition, gaxOpts) {
|
197 | // This populates projectId for us.
|
198 | await this.getClientConfig();
|
199 | const schemaName = schema_1.Schema.formatName_(this.projectId, schemaId);
|
200 | const request = {
|
201 | parent: this.name,
|
202 | schemaId,
|
203 | schema: {
|
204 | name: schemaName,
|
205 | type,
|
206 | definition,
|
207 | },
|
208 | };
|
209 | const client = await this.getSchemaClient_();
|
210 | await client.createSchema(request, gaxOpts);
|
211 | return new schema_1.Schema(this, schemaName);
|
212 | }
|
213 | createSubscription(topic, name, optionsOrCallback, callback) {
|
214 | if (typeof topic !== 'string' && !(topic instanceof topic_1.Topic)) {
|
215 | throw new Error('A Topic is required for a new subscription.');
|
216 | }
|
217 | if (typeof name !== 'string') {
|
218 | throw new Error('A subscription name is required.');
|
219 | }
|
220 | if (typeof topic === 'string') {
|
221 | topic = this.topic(topic);
|
222 | }
|
223 | let options = typeof optionsOrCallback === 'object'
|
224 | ? optionsOrCallback
|
225 | : {};
|
226 | callback =
|
227 | typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;
|
228 | // Make a deep copy of options to not pollute caller object.
|
229 | options = extend(true, {}, options);
|
230 | const gaxOpts = options.gaxOpts;
|
231 | const flowControl = options.flowControl;
|
232 | delete options.gaxOpts;
|
233 | delete options.flowControl;
|
234 | const metadata = subscription_1.Subscription.formatMetadata_(options);
|
235 | let subscriptionCtorOptions = flowControl ? { flowControl } : {};
|
236 | subscriptionCtorOptions = Object.assign(subscriptionCtorOptions, metadata);
|
237 | const subscription = this.subscription(name, subscriptionCtorOptions);
|
238 | const reqOpts = Object.assign(metadata, {
|
239 | topic: topic.name,
|
240 | name: subscription.name,
|
241 | });
|
242 | this.request({
|
243 | client: 'SubscriberClient',
|
244 | method: 'createSubscription',
|
245 | reqOpts,
|
246 | gaxOpts,
|
247 | }, (err, resp) => {
|
248 | if (err) {
|
249 | callback(err, null, resp);
|
250 | return;
|
251 | }
|
252 | subscription.metadata = resp;
|
253 | callback(null, subscription, resp);
|
254 | });
|
255 | }
|
256 | createTopic(name, optsOrCallback, callback) {
|
257 | const reqOpts = typeof name === 'string'
|
258 | ? {
|
259 | name,
|
260 | }
|
261 | : name;
|
262 | // We don't allow a blank name, but this will let topic() handle that case.
|
263 | const topic = this.topic(reqOpts.name || '');
|
264 | // Topic#constructor might have canonicalized the name.
|
265 | reqOpts.name = topic.name;
|
266 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
267 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
268 | this.request({
|
269 | client: 'PublisherClient',
|
270 | method: 'createTopic',
|
271 | reqOpts,
|
272 | gaxOpts,
|
273 | }, (err, resp) => {
|
274 | if (err) {
|
275 | callback(err, null, resp);
|
276 | return;
|
277 | }
|
278 | topic.metadata = resp;
|
279 | callback(null, topic, resp);
|
280 | });
|
281 | }
|
282 | detachSubscription(name, optsOrCallback, callback) {
|
283 | if (typeof name !== 'string') {
|
284 | throw new Error('A subscription name is required.');
|
285 | }
|
286 | const sub = this.subscription(name);
|
287 | const reqOpts = {
|
288 | subscription: sub.name,
|
289 | };
|
290 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
291 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
292 | this.request({
|
293 | client: 'PublisherClient',
|
294 | method: 'detachSubscription',
|
295 | reqOpts,
|
296 | gaxOpts: gaxOpts,
|
297 | }, callback);
|
298 | }
|
299 | /**
|
300 | * Determine the appropriate endpoint to use for API requests, first trying
|
301 | * the `apiEndpoint` parameter. If that isn't set, we try the Pub/Sub emulator
|
302 | * environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try
|
303 | * the standard `gcloud alpha pubsub` environment variable
|
304 | * (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production
|
305 | * API is used.
|
306 | *
|
307 | * Note that if the URL doesn't end in '.googleapis.com', we will assume that
|
308 | * it's an emulator and disable strict SSL checks.
|
309 | *
|
310 | * @private
|
311 | */
|
312 | determineBaseUrl_() {
|
313 | // We allow an override from the client object options, or from
|
314 | // one of these variables. The CLOUDSDK variable is provided for
|
315 | // compatibility with the `gcloud alpha` utility.
|
316 | const gcloudVarName = 'CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB';
|
317 | const emulatorVarName = 'PUBSUB_EMULATOR_HOST';
|
318 | const apiEndpoint = this.options.apiEndpoint ||
|
319 | process.env[emulatorVarName] ||
|
320 | process.env[gcloudVarName];
|
321 | if (!apiEndpoint) {
|
322 | return;
|
323 | }
|
324 | // Parse the URL into a hostname and port, if possible.
|
325 | const leadingProtocol = new RegExp('^https?://');
|
326 | const trailingSlashes = new RegExp('/*$');
|
327 | const baseUrlParts = apiEndpoint
|
328 | .replace(leadingProtocol, '')
|
329 | .replace(trailingSlashes, '')
|
330 | .split(':');
|
331 | this.options.servicePath = baseUrlParts[0];
|
332 | if (!baseUrlParts[1]) {
|
333 | // No port was given -- figure it out from the protocol.
|
334 | if (apiEndpoint.startsWith('https')) {
|
335 | this.options.port = 443;
|
336 | }
|
337 | else if (apiEndpoint.startsWith('http')) {
|
338 | this.options.port = 80;
|
339 | }
|
340 | else {
|
341 | this.options.port = undefined;
|
342 | }
|
343 | }
|
344 | else {
|
345 | this.options.port = parseInt(baseUrlParts[1], 10);
|
346 | }
|
347 | // If this looks like a GCP URL of some kind, don't go into emulator
|
348 | // mode. Otherwise, supply a fake SSL provider so a real cert isn't
|
349 | // required for running the emulator.
|
350 | const officialUrlMatch = this.options.servicePath.endsWith('.googleapis.com');
|
351 | if (!officialUrlMatch) {
|
352 | const grpcInstance = this.options.grpc || gax.grpc;
|
353 | this.options.sslCreds = grpcInstance.credentials.createInsecure();
|
354 | this.isEmulator = true;
|
355 | }
|
356 | if (!this.options.projectId && process.env.PUBSUB_PROJECT_ID) {
|
357 | this.options.projectId = process.env.PUBSUB_PROJECT_ID;
|
358 | }
|
359 | }
|
360 | /**
|
361 | * Get a list of schemas associated with your project.
|
362 | *
|
363 | * The returned AsyncIterable will resolve to {@link google.pubsub.v1.ISchema} objects.
|
364 | *
|
365 | * This method returns an async iterable. These objects can be adapted
|
366 | * to work in a Promise/then framework, as well as with callbacks, but
|
367 | * this discussion is considered out of scope for these docs.
|
368 | *
|
369 | * @see [Schemas: list API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/list}
|
370 | * @see [More about async iterators]{@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of}
|
371 | *
|
372 | * @param {google.pubsub.v1.SchemaView} [view] The type of schema objects
|
373 | * requested, which should be an enum value from {@link SchemaViews}. Defaults
|
374 | * to Full.
|
375 | * @param {object} [options] Request configuration options, outlined
|
376 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
377 | * @returns {AsyncIterable<ISchema>}
|
378 | *
|
379 | * @example
|
380 | * ```
|
381 | * for await (const s of pubsub.listSchemas()) {
|
382 | * const moreInfo = await s.get();
|
383 | * }
|
384 | * ```
|
385 | */
|
386 | async *listSchemas(view = schema_1.SchemaViews.Basic, options) {
|
387 | const client = await this.getSchemaClient_();
|
388 | const query = {
|
389 | parent: this.name,
|
390 | view,
|
391 | };
|
392 | for await (const s of client.listSchemasAsync(query, options)) {
|
393 | yield s;
|
394 | }
|
395 | }
|
396 | getSnapshots(optsOrCallback, callback) {
|
397 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
398 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
399 | const reqOpts = Object.assign({
|
400 | project: PubSub.formatName_(this.projectId),
|
401 | }, options);
|
402 | delete reqOpts.gaxOpts;
|
403 | delete reqOpts.autoPaginate;
|
404 | const gaxOpts = Object.assign({
|
405 | autoPaginate: options.autoPaginate,
|
406 | }, options.gaxOpts);
|
407 | this.request({
|
408 | client: 'SubscriberClient',
|
409 | method: 'listSnapshots',
|
410 | reqOpts,
|
411 | gaxOpts,
|
412 | }, (err, rawSnapshots, ...args) => {
|
413 | let snapshots;
|
414 | if (rawSnapshots) {
|
415 | snapshots = rawSnapshots.map((snapshot) => {
|
416 | const snapshotInstance = this.snapshot(snapshot.name);
|
417 | snapshotInstance.metadata = snapshot;
|
418 | return snapshotInstance;
|
419 | });
|
420 | }
|
421 | callback(err, snapshots, ...args);
|
422 | });
|
423 | }
|
424 | getSubscriptions(optsOrCallback, callback) {
|
425 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
426 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
427 | let topic = options.topic;
|
428 | if (topic) {
|
429 | if (!(topic instanceof topic_1.Topic)) {
|
430 | topic = this.topic(topic);
|
431 | }
|
432 | return topic.getSubscriptions(options, callback);
|
433 | }
|
434 | const reqOpts = Object.assign({}, options);
|
435 | reqOpts.project = 'projects/' + this.projectId;
|
436 | delete reqOpts.gaxOpts;
|
437 | delete reqOpts.autoPaginate;
|
438 | const gaxOpts = Object.assign({
|
439 | autoPaginate: options.autoPaginate,
|
440 | }, options.gaxOpts);
|
441 | this.request({
|
442 | client: 'SubscriberClient',
|
443 | method: 'listSubscriptions',
|
444 | reqOpts,
|
445 | gaxOpts,
|
446 | }, (err, rawSubs, ...args) => {
|
447 | let subscriptions;
|
448 | if (rawSubs) {
|
449 | subscriptions = rawSubs.map((sub) => {
|
450 | const subscriptionInstance = this.subscription(sub.name);
|
451 | subscriptionInstance.metadata = sub;
|
452 | return subscriptionInstance;
|
453 | });
|
454 | }
|
455 | callback(err, subscriptions, ...args);
|
456 | });
|
457 | }
|
458 | getTopics(optsOrCallback, callback) {
|
459 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
460 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
461 | const reqOpts = Object.assign({
|
462 | project: 'projects/' + this.projectId,
|
463 | }, options);
|
464 | delete reqOpts.gaxOpts;
|
465 | delete reqOpts.autoPaginate;
|
466 | const gaxOpts = Object.assign({
|
467 | autoPaginate: options.autoPaginate,
|
468 | }, options.gaxOpts);
|
469 | this.request({
|
470 | client: 'PublisherClient',
|
471 | method: 'listTopics',
|
472 | reqOpts,
|
473 | gaxOpts,
|
474 | }, (err, rawTopics, ...args) => {
|
475 | let topics;
|
476 | if (rawTopics) {
|
477 | topics = rawTopics.map(topic => {
|
478 | const topicInstance = this.topic(topic.name);
|
479 | topicInstance.metadata = topic;
|
480 | return topicInstance;
|
481 | });
|
482 | }
|
483 | callback(err, topics, ...args);
|
484 | });
|
485 | }
|
486 | /**
|
487 | * Retrieve a client configuration, suitable for passing into a GAPIC
|
488 | * 'v1' class constructor. This will fill out projectId, emulator URLs,
|
489 | * and so forth.
|
490 | *
|
491 | * @returns {Promise<ClientConfig>} the filled client configuration.
|
492 | */
|
493 | async getClientConfig() {
|
494 | if (!this.projectId || this.projectId === PROJECT_ID_PLACEHOLDER) {
|
495 | let projectId;
|
496 | try {
|
497 | projectId = await this.auth.getProjectId();
|
498 | }
|
499 | catch (e) {
|
500 | if (!this.isEmulator) {
|
501 | throw e;
|
502 | }
|
503 | projectId = '';
|
504 | }
|
505 | this.projectId = projectId;
|
506 | this.name = PubSub.formatName_(this.projectId);
|
507 | this.options.projectId = projectId;
|
508 | }
|
509 | return this.options;
|
510 | }
|
511 | /**
|
512 | * Gets a schema client, creating one if needed.
|
513 | * @private
|
514 | */
|
515 | async getSchemaClient_() {
|
516 | if (!this.schemaClient) {
|
517 | const options = await this.getClientConfig();
|
518 | this.schemaClient = new v1.SchemaServiceClient(options);
|
519 | }
|
520 | return this.schemaClient;
|
521 | }
|
522 | /**
|
523 | * Callback function to PubSub.getClient_().
|
524 | * @private
|
525 | * @callback GetClientCallback
|
526 | * @param err - Error, if any.
|
527 | * @param gaxClient - The gax client specified in RequestConfig.client.
|
528 | * Typed any since it's importing Javascript source.
|
529 | */
|
530 | /**
|
531 | * Get the PubSub client object.
|
532 | *
|
533 | * @private
|
534 | *
|
535 | * @param {object} config Configuration object.
|
536 | * @param {object} config.gaxOpts GAX options.
|
537 | * @param {function} config.method The gax method to call.
|
538 | * @param {object} config.reqOpts Request options.
|
539 | * @param {function} [callback] The callback function.
|
540 | */
|
541 | getClient_(config, callback) {
|
542 | this.getClientAsync_(config).then(client => callback(null, client), callback);
|
543 | }
|
544 | /**
|
545 | * Get the PubSub client object.
|
546 | *
|
547 | * @private
|
548 | *
|
549 | * @param {object} config Configuration object.
|
550 | * @param {object} config.gaxOpts GAX options.
|
551 | * @param {function} config.method The gax method to call.
|
552 | * @param {object} config.reqOpts Request options.
|
553 | * @returns {Promise}
|
554 | */
|
555 | async getClientAsync_(config) {
|
556 | // Make sure we've got a fully created config with projectId and such.
|
557 | const options = await this.getClientConfig();
|
558 | let gaxClient = this.api[config.client];
|
559 | if (!gaxClient) {
|
560 | // Lazily instantiate client.
|
561 | gaxClient = new v1[config.client](options);
|
562 | this.api[config.client] = gaxClient;
|
563 | }
|
564 | return gaxClient;
|
565 | }
|
566 | /**
|
567 | * Close all open client objects.
|
568 | *
|
569 | * @private
|
570 | *
|
571 | * @returns {Promise}
|
572 | */
|
573 | async closeAllClients_() {
|
574 | const promises = [];
|
575 | for (const clientConfig of Object.keys(this.api)) {
|
576 | const gaxClient = this.api[clientConfig];
|
577 | promises.push(gaxClient.close());
|
578 | delete this.api[clientConfig];
|
579 | }
|
580 | await Promise.all(promises);
|
581 | }
|
582 | /**
|
583 | * Funnel all API requests through this method, to be sure we have a project
|
584 | * ID.
|
585 | *
|
586 | * @private
|
587 | *
|
588 | * @param {object} config Configuration object.
|
589 | * @param {object} config.gaxOpts GAX options.
|
590 | * @param {function} config.method The gax method to call.
|
591 | * @param {object} config.reqOpts Request options.
|
592 | * @param {function} [callback] The callback function.
|
593 | */
|
594 | request(config, callback) {
|
595 | // This prevents further requests, in case any publishers were hanging around.
|
596 | if (!this.isOpen) {
|
597 | const statusObject = {
|
598 | code: 0,
|
599 | details: 'Cannot use a closed PubSub object.',
|
600 | metadata: null,
|
601 | };
|
602 | const err = new Error(statusObject.details);
|
603 | Object.assign(err, statusObject);
|
604 | callback(err);
|
605 | return;
|
606 | }
|
607 | this.getClient_(config, (err, client) => {
|
608 | if (err) {
|
609 | callback(err);
|
610 | return;
|
611 | }
|
612 | let reqOpts = extend(true, {}, config.reqOpts);
|
613 | reqOpts = projectify_1.replaceProjectIdToken(reqOpts, this.projectId);
|
614 | client[config.method](reqOpts, config.gaxOpts, callback);
|
615 | });
|
616 | }
|
617 | /**
|
618 | * Create a Schema object, representing a schema within the project.
|
619 | * See {@link PubSub#createSchema} or {@link Schema#create} to create a schema.
|
620 | *
|
621 | * @throws {Error} If a name is not provided.
|
622 | *
|
623 | * @param {string} name The ID or name of the schema.
|
624 | * @returns {Schema} A {@link Schema} instance.
|
625 | *
|
626 | * @example
|
627 | * ```
|
628 | * const {PubSub} = require('@google-cloud/pubsub');
|
629 | * const pubsub = new PubSub();
|
630 | *
|
631 | * const schema = pubsub.schema('my-schema');
|
632 | * ```
|
633 | */
|
634 | schema(idOrName) {
|
635 | return new schema_1.Schema(this, idOrName);
|
636 | }
|
637 | /**
|
638 | * Create a Snapshot object. See {@link Subscription#createSnapshot} to
|
639 | * create a snapshot.
|
640 | *
|
641 | * @throws {Error} If a name is not provided.
|
642 | *
|
643 | * @param {string} name The name of the snapshot.
|
644 | * @returns {Snapshot} A {@link Snapshot} instance.
|
645 | *
|
646 | * @example
|
647 | * ```
|
648 | * const {PubSub} = require('@google-cloud/pubsub');
|
649 | * const pubsub = new PubSub();
|
650 | *
|
651 | * const snapshot = pubsub.snapshot('my-snapshot');
|
652 | * ```
|
653 | */
|
654 | snapshot(name) {
|
655 | if (typeof name !== 'string') {
|
656 | throw new Error('You must supply a valid name for the snapshot.');
|
657 | }
|
658 | return new snapshot_1.Snapshot(this, name);
|
659 | }
|
660 | /**
|
661 | * Create a Subscription object. This command by itself will not run any API
|
662 | * requests. You will receive a {@link Subscription} object,
|
663 | * which will allow you to interact with a subscription.
|
664 | *
|
665 | * @throws {Error} If subscription name is omitted.
|
666 | *
|
667 | * @param {string} name Name of the subscription.
|
668 | * @param {SubscriberOptions} [options] Configuration object.
|
669 | * @returns {Subscription} A {@link Subscription} instance.
|
670 | *
|
671 | * @example
|
672 | * ```
|
673 | * const {PubSub} = require('@google-cloud/pubsub');
|
674 | * const pubsub = new PubSub();
|
675 | *
|
676 | * const subscription = pubsub.subscription('my-subscription');
|
677 | *
|
678 | * // Register a listener for `message` events.
|
679 | * subscription.on('message', function(message) {
|
680 | * // Called every time a message is received.
|
681 | * // message.id = ID of the message.
|
682 | * // message.ackId = ID used to acknowledge the message receival.
|
683 | * // message.data = Contents of the message.
|
684 | * // message.attributes = Attributes of the message.
|
685 | * // message.publishTime = Date when Pub/Sub received the message.
|
686 | * });
|
687 | * ```
|
688 | */
|
689 | subscription(name, options) {
|
690 | if (!name) {
|
691 | throw new Error('A name must be specified for a subscription.');
|
692 | }
|
693 | return new subscription_1.Subscription(this, name, options);
|
694 | }
|
695 | /**
|
696 | * Create a Topic object. See {@link PubSub#createTopic} to create a topic.
|
697 | *
|
698 | * @throws {Error} If a name is not provided.
|
699 | *
|
700 | * @param {string} name The name of the topic.
|
701 | * @param {PublishOptions} [options] Publisher configuration object.
|
702 | * @returns {Topic} A {@link Topic} instance.
|
703 | *
|
704 | * @example
|
705 | * ```
|
706 | * const {PubSub} = require('@google-cloud/pubsub');
|
707 | * const pubsub = new PubSub();
|
708 | *
|
709 | * const topic = pubsub.topic('my-topic');
|
710 | * ```
|
711 | */
|
712 | topic(name, options) {
|
713 | if (!name) {
|
714 | throw new Error('A name must be specified for a topic.');
|
715 | }
|
716 | return new topic_1.Topic(this, name, options);
|
717 | }
|
718 | /**
|
719 | * Validate a schema definition.
|
720 | *
|
721 | * @see [Schemas: validateSchema API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/validate}
|
722 | *
|
723 | * @throws {Error} if the validation fails.
|
724 | *
|
725 | * @param {ISchema} schema The schema definition you wish to validate.
|
726 | * @param {object} [options] Request configuration options, outlined
|
727 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
728 | * @returns {Promise<void>}
|
729 | */
|
730 | async validateSchema(schema, gaxOpts) {
|
731 | const client = await this.getSchemaClient_();
|
732 | await client.validateSchema({
|
733 | parent: this.name,
|
734 | schema,
|
735 | }, gaxOpts);
|
736 | }
|
737 | /*!
|
738 | * Format the name of a project. A project's full name is in the
|
739 | * format of projects/{projectId}.
|
740 | *
|
741 | * The GAPIC client should do this for us, but since we maintain
|
742 | * names rather than IDs, this is simpler.
|
743 | *
|
744 | * @private
|
745 | */
|
746 | static formatName_(name) {
|
747 | if (typeof name !== 'string') {
|
748 | throw new Error('A name is required to identify a project.');
|
749 | }
|
750 | // Simple check if the name is already formatted.
|
751 | if (name.indexOf('/') > -1) {
|
752 | return name;
|
753 | }
|
754 | return `projects/${name}`;
|
755 | }
|
756 | }
|
757 | exports.PubSub = PubSub;
|
758 | /**
|
759 | * Get a list of the {@link Snapshot} objects as a readable object stream.
|
760 | *
|
761 | * @method PubSub#getSnapshotsStream
|
762 | * @param {GetSnapshotsRequest} [options] Configuration object. See
|
763 | * {@link PubSub#getSnapshots} for a complete list of options.
|
764 | * @returns {ReadableStream} A readable stream of {@link Snapshot} instances.
|
765 | *
|
766 | * @example
|
767 | * ```
|
768 | * const {PubSub} = require('@google-cloud/pubsub');
|
769 | * const pubsub = new PubSub();
|
770 | *
|
771 | * pubsub.getSnapshotsStream()
|
772 | * .on('error', console.error)
|
773 | * .on('data', function(snapshot) {
|
774 | * // snapshot is a Snapshot object.
|
775 | * })
|
776 | * .on('end', function() {
|
777 | * // All snapshots retrieved.
|
778 | * });
|
779 | *
|
780 | * //-
|
781 | * // If you anticipate many results, you can end a stream early to prevent
|
782 | * // unnecessary processing and API requests.
|
783 | * //-
|
784 | * pubsub.getSnapshotsStream()
|
785 | * .on('data', function(snapshot) {
|
786 | * this.end();
|
787 | * });
|
788 | * ```
|
789 | */
|
790 | /**
|
791 | * Get a list of the {@link Subscription} objects registered to all of
|
792 | * your project's topics as a readable object stream.
|
793 | *
|
794 | * @method PubSub#getSubscriptionsStream
|
795 | * @param {GetSubscriptionsRequest} [options] Configuration object. See
|
796 | * {@link PubSub#getSubscriptions} for a complete list of options.
|
797 | * @returns {ReadableStream} A readable stream of {@link Subscription} instances.
|
798 | *
|
799 | * @example
|
800 | * ```
|
801 | * const {PubSub} = require('@google-cloud/pubsub');
|
802 | * const pubsub = new PubSub();
|
803 | *
|
804 | * pubsub.getSubscriptionsStream()
|
805 | * .on('error', console.error)
|
806 | * .on('data', function(subscription) {
|
807 | * // subscription is a Subscription object.
|
808 | * })
|
809 | * .on('end', function() {
|
810 | * // All subscriptions retrieved.
|
811 | * });
|
812 | *
|
813 | * //-
|
814 | * // If you anticipate many results, you can end a stream early to prevent
|
815 | * // unnecessary processing and API requests.
|
816 | * //-
|
817 | * pubsub.getSubscriptionsStream()
|
818 | * .on('data', function(subscription) {
|
819 | * this.end();
|
820 | * });
|
821 | * ```
|
822 | */
|
823 | /**
|
824 | * Get a list of the {module:pubsub/topic} objects registered to your project as
|
825 | * a readable object stream.
|
826 | *
|
827 | * @method PubSub#getTopicsStream
|
828 | * @param {GetTopicsRequest} [options] Configuration object. See
|
829 | * {@link PubSub#getTopics} for a complete list of options.
|
830 | * @returns {ReadableStream} A readable stream of {@link Topic} instances.
|
831 | *
|
832 | * @example
|
833 | * ```
|
834 | * const {PubSub} = require('@google-cloud/pubsub');
|
835 | * const pubsub = new PubSub();
|
836 | *
|
837 | * pubsub.getTopicsStream()
|
838 | * .on('error', console.error)
|
839 | * .on('data', function(topic) {
|
840 | * // topic is a Topic object.
|
841 | * })
|
842 | * .on('end', function() {
|
843 | * // All topics retrieved.
|
844 | * });
|
845 | *
|
846 | * //-
|
847 | * // If you anticipate many results, you can end a stream early to prevent
|
848 | * // unnecessary processing and API requests.
|
849 | * //-
|
850 | * pubsub.getTopicsStream()
|
851 | * .on('data', function(topic) {
|
852 | * this.end();
|
853 | * });
|
854 | * ```
|
855 | */
|
856 | /*! Developer Documentation
|
857 | *
|
858 | * These methods can be auto-paginated.
|
859 | */
|
860 | paginator_1.paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);
|
861 | /*! Developer Documentation
|
862 | *
|
863 | * Existing async methods (except for streams) will return a Promise in the event
|
864 | * that a callback is omitted. Future methods will not allow for a callback.
|
865 | * (Use .then() on the returned Promise instead.)
|
866 | */
|
867 | util_1.promisifySome(PubSub, PubSub.prototype, [
|
868 | 'close',
|
869 | 'createSubscription',
|
870 | 'createTopic',
|
871 | 'detachSubscription',
|
872 | 'getSnapshots',
|
873 | 'getSubscriptions',
|
874 | 'getTopics',
|
875 | ]);
|
876 | //# sourceMappingURL=pubsub.js.map |
\ | No newline at end of file |