1 | ;
|
2 | /*!
|
3 | * Copyright 2014 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.PubSub = void 0;
|
19 | const paginator_1 = require("@google-cloud/paginator");
|
20 | const projectify_1 = require("@google-cloud/projectify");
|
21 | const extend = require("extend");
|
22 | const google_auth_library_1 = require("google-auth-library");
|
23 | const gax = require("google-gax");
|
24 | // eslint-disable-next-line @typescript-eslint/no-var-requires
|
25 | const PKG = require('../../package.json');
|
26 | // eslint-disable-next-line @typescript-eslint/no-var-requires
|
27 | const v1 = require('./v1');
|
28 | const util_1 = require("./util");
|
29 | const schema_1 = require("./schema");
|
30 | const snapshot_1 = require("./snapshot");
|
31 | const subscription_1 = require("./subscription");
|
32 | const topic_1 = require("./topic");
|
33 | /**
|
34 | * Project ID placeholder.
|
35 | * @type {string}
|
36 | * @private
|
37 | */
|
38 | const PROJECT_ID_PLACEHOLDER = '{{projectId}}';
|
39 | /**
|
40 | * @typedef {object} ClientConfig
|
41 | * @property {string} [projectId] The project ID from the Google Developer's
|
42 | * Console, e.g. 'grape-spaceship-123'. We will also check the environment
|
43 | * variable `GCLOUD_PROJECT` for your project ID. If your app is running in
|
44 | * an environment which supports {@link
|
45 | * https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
|
46 | * Application Default Credentials}, your project ID will be detected
|
47 | * automatically.
|
48 | * @property {string} [keyFilename] Full path to the a .json, .pem, or .p12 key
|
49 | * downloaded from the Google Developers Console. If you provide a path to a
|
50 | * JSON file, the `projectId` option above is not necessary. NOTE: .pem and
|
51 | * .p12 require you to specify the `email` option as well.
|
52 | * @property {string} [apiEndpoint] The `apiEndpoint` from options will set the
|
53 | * host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from the
|
54 | * gcloud SDK is honored. We also check the `CLOUD_API_ENDPOINT_OVERRIDES_PUBSUB`
|
55 | * environment variable used by `gcloud alpha pubsub`. Otherwise the actual API
|
56 | * endpoint will be used. Note that if the URL doesn't end in '.googleapis.com',
|
57 | * we will assume that it's an emulator and disable strict SSL checks.
|
58 | * @property {string} [email] Account email address. Required when using a .pem
|
59 | * or .p12 keyFilename.
|
60 | * @property {object} [credentials] Credentials object.
|
61 | * @property {string} [credentials.client_email]
|
62 | * @property {string} [credentials.private_key]
|
63 | * @property {boolean} [autoRetry=true] Automatically retry requests if the
|
64 | * response is related to rate limits or certain intermittent server errors.
|
65 | * We will exponentially backoff subsequent requests by default.
|
66 | * @property {Constructor} [promise] Custom promise module to use instead of
|
67 | * native Promises.
|
68 | */
|
69 | /**
|
70 | * [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a
|
71 | * reliable, many-to-many, asynchronous messaging service from Cloud
|
72 | * Platform.
|
73 | *
|
74 | * @class
|
75 | *
|
76 | * @see [Cloud Pub/Sub overview]{@link https://developers.google.com/pubsub/overview}
|
77 | *
|
78 | * @param {ClientConfig} [options] Configuration options.
|
79 | *
|
80 | * @example Import the client library
|
81 | * ```
|
82 | * const {PubSub} = require('@google-cloud/pubsub');
|
83 | *
|
84 | * ```
|
85 | * @example Create a client that uses <a href="https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application">Application Default Credentials (ADC)</a>:
|
86 | * ```
|
87 | * const pubsub = new PubSub();
|
88 | *
|
89 | * ```
|
90 | * @example Create a client with <a href="https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually">explicit credentials</a>:
|
91 | * ```
|
92 | * const pubsub = new PubSub({
|
93 | * projectId: 'your-project-id',
|
94 | * keyFilename: '/path/to/keyfile.json'
|
95 | * });
|
96 | *
|
97 | * ```
|
98 | * @example <caption>include:samples/quickstart.js</caption>
|
99 | * region_tag:pubsub_quickstart_create_topic
|
100 | * Full quickstart example:
|
101 | */
|
102 | class PubSub {
|
103 | constructor(options) {
|
104 | this.getSubscriptionsStream = paginator_1.paginator.streamify('getSubscriptions');
|
105 | this.getSnapshotsStream = paginator_1.paginator.streamify('getSnapshots');
|
106 | this.getTopicsStream = paginator_1.paginator.streamify('getTopics');
|
107 | this.isOpen = true;
|
108 | options = options || {};
|
109 | // Determine what scopes are needed.
|
110 | // It is the union of the scopes on both clients.
|
111 | const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
|
112 | const allScopes = {};
|
113 | for (const clientClass of clientClasses) {
|
114 | for (const scope of clientClass.scopes) {
|
115 | allScopes[scope] = true;
|
116 | }
|
117 | }
|
118 | this.options = Object.assign({
|
119 | libName: 'gccl',
|
120 | libVersion: PKG.version,
|
121 | scopes: Object.keys(allScopes),
|
122 | }, options);
|
123 | /**
|
124 | * @name PubSub#isEmulator
|
125 | * @type {boolean}
|
126 | */
|
127 | this.isEmulator = false;
|
128 | this.determineBaseUrl_();
|
129 | this.api = {};
|
130 | this.auth = new google_auth_library_1.GoogleAuth(this.options);
|
131 | this.projectId = this.options.projectId || PROJECT_ID_PLACEHOLDER;
|
132 | if (this.projectId !== PROJECT_ID_PLACEHOLDER) {
|
133 | this.name = PubSub.formatName_(this.projectId);
|
134 | }
|
135 | }
|
136 | /**
|
137 | * Returns true if we have actually resolved the full project name.
|
138 | *
|
139 | * @returns {boolean} true if the name is resolved.
|
140 | */
|
141 | get isIdResolved() {
|
142 | return this.projectId.indexOf(PROJECT_ID_PLACEHOLDER) < 0;
|
143 | }
|
144 | close(callback) {
|
145 | const definedCallback = callback || (() => { });
|
146 | if (this.isOpen) {
|
147 | this.isOpen = false;
|
148 | this.closeAllClients_()
|
149 | .then(() => { var _a; return (_a = this.schemaClient) === null || _a === void 0 ? void 0 : _a.close(); })
|
150 | .then(() => {
|
151 | definedCallback(null);
|
152 | })
|
153 | .catch(definedCallback);
|
154 | }
|
155 | else {
|
156 | definedCallback(null);
|
157 | }
|
158 | }
|
159 | /**
|
160 | * Create a schema in the project.
|
161 | *
|
162 | * @see [Schemas: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/create}
|
163 | * @see {@link Schema#create}
|
164 | *
|
165 | * @throws {Error} If a schema ID or name is not provided.
|
166 | * @throws {Error} If an invalid SchemaType is provided.
|
167 | * @throws {Error} If an invalid schema definition is provided.
|
168 | *
|
169 | * @param {string} schemaId The name or ID of the subscription.
|
170 | * @param {SchemaType} type The type of the schema (Protobuf, Avro, etc).
|
171 | * @param {string} definition The text describing the schema in terms of the type.
|
172 | * @param {object} [options] Request configuration options, outlined
|
173 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
174 | * @returns {Promise<Schema>}
|
175 | *
|
176 | * @example Create a schema.
|
177 | * ```
|
178 | * const {PubSub} = require('@google-cloud/pubsub');
|
179 | * const pubsub = new PubSub();
|
180 | *
|
181 | * await pubsub.createSchema(
|
182 | * 'messageType',
|
183 | * SchemaTypes.Avro,
|
184 | * '{...avro definition...}'
|
185 | * );
|
186 | * ```
|
187 | */
|
188 | async createSchema(schemaId, type, definition, gaxOpts) {
|
189 | // This populates projectId for us.
|
190 | await this.getClientConfig();
|
191 | const schemaName = schema_1.Schema.formatName_(this.projectId, schemaId);
|
192 | const request = {
|
193 | parent: this.name,
|
194 | schemaId,
|
195 | schema: {
|
196 | name: schemaName,
|
197 | type,
|
198 | definition,
|
199 | },
|
200 | };
|
201 | const client = await this.getSchemaClient_();
|
202 | await client.createSchema(request, gaxOpts);
|
203 | return new schema_1.Schema(this, schemaName);
|
204 | }
|
205 | createSubscription(topic, name, optionsOrCallback, callback) {
|
206 | if (typeof topic !== 'string' && !(topic instanceof topic_1.Topic)) {
|
207 | throw new Error('A Topic is required for a new subscription.');
|
208 | }
|
209 | if (typeof name !== 'string') {
|
210 | throw new Error('A subscription name is required.');
|
211 | }
|
212 | if (typeof topic === 'string') {
|
213 | topic = this.topic(topic);
|
214 | }
|
215 | let options = typeof optionsOrCallback === 'object'
|
216 | ? optionsOrCallback
|
217 | : {};
|
218 | callback =
|
219 | typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;
|
220 | // Make a deep copy of options to not pollute caller object.
|
221 | options = extend(true, {}, options);
|
222 | const gaxOpts = options.gaxOpts;
|
223 | const flowControl = options.flowControl;
|
224 | delete options.gaxOpts;
|
225 | delete options.flowControl;
|
226 | const metadata = subscription_1.Subscription.formatMetadata_(options);
|
227 | let subscriptionCtorOptions = flowControl ? { flowControl } : {};
|
228 | subscriptionCtorOptions = Object.assign(subscriptionCtorOptions, metadata);
|
229 | const subscription = this.subscription(name, subscriptionCtorOptions);
|
230 | const reqOpts = Object.assign(metadata, {
|
231 | topic: topic.name,
|
232 | name: subscription.name,
|
233 | });
|
234 | this.request({
|
235 | client: 'SubscriberClient',
|
236 | method: 'createSubscription',
|
237 | reqOpts,
|
238 | gaxOpts,
|
239 | }, (err, resp) => {
|
240 | if (err) {
|
241 | callback(err, null, resp);
|
242 | return;
|
243 | }
|
244 | subscription.metadata = resp;
|
245 | callback(null, subscription, resp);
|
246 | });
|
247 | }
|
248 | createTopic(name, optsOrCallback, callback) {
|
249 | const reqOpts = typeof name === 'string'
|
250 | ? {
|
251 | name,
|
252 | }
|
253 | : name;
|
254 | // We don't allow a blank name, but this will let topic() handle that case.
|
255 | const topic = this.topic(reqOpts.name || '');
|
256 | // Topic#constructor might have canonicalized the name.
|
257 | reqOpts.name = topic.name;
|
258 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
259 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
260 | this.request({
|
261 | client: 'PublisherClient',
|
262 | method: 'createTopic',
|
263 | reqOpts,
|
264 | gaxOpts,
|
265 | }, (err, resp) => {
|
266 | if (err) {
|
267 | callback(err, null, resp);
|
268 | return;
|
269 | }
|
270 | topic.metadata = resp;
|
271 | callback(null, topic, resp);
|
272 | });
|
273 | }
|
274 | detachSubscription(name, optsOrCallback, callback) {
|
275 | if (typeof name !== 'string') {
|
276 | throw new Error('A subscription name is required.');
|
277 | }
|
278 | const sub = this.subscription(name);
|
279 | const reqOpts = {
|
280 | subscription: sub.name,
|
281 | };
|
282 | const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
283 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
284 | this.request({
|
285 | client: 'PublisherClient',
|
286 | method: 'detachSubscription',
|
287 | reqOpts,
|
288 | gaxOpts: gaxOpts,
|
289 | }, callback);
|
290 | }
|
291 | /**
|
292 | * Determine the appropriate endpoint to use for API requests, first trying
|
293 | * the `apiEndpoint` parameter. If that isn't set, we try the Pub/Sub emulator
|
294 | * environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try
|
295 | * the standard `gcloud alpha pubsub` environment variable
|
296 | * (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production
|
297 | * API is used.
|
298 | *
|
299 | * Note that if the URL doesn't end in '.googleapis.com', we will assume that
|
300 | * it's an emulator and disable strict SSL checks.
|
301 | *
|
302 | * @private
|
303 | */
|
304 | determineBaseUrl_() {
|
305 | // We allow an override from the client object options, or from
|
306 | // one of these variables. The CLOUDSDK variable is provided for
|
307 | // compatibility with the `gcloud alpha` utility.
|
308 | const gcloudVarName = 'CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB';
|
309 | const emulatorVarName = 'PUBSUB_EMULATOR_HOST';
|
310 | const apiEndpoint = this.options.apiEndpoint ||
|
311 | process.env[emulatorVarName] ||
|
312 | process.env[gcloudVarName];
|
313 | if (!apiEndpoint) {
|
314 | return;
|
315 | }
|
316 | // Parse the URL into a hostname and port, if possible.
|
317 | const leadingProtocol = new RegExp('^https?://');
|
318 | const trailingSlashes = new RegExp('/*$');
|
319 | const baseUrlParts = apiEndpoint
|
320 | .replace(leadingProtocol, '')
|
321 | .replace(trailingSlashes, '')
|
322 | .split(':');
|
323 | this.options.servicePath = baseUrlParts[0];
|
324 | if (!baseUrlParts[1]) {
|
325 | // No port was given -- figure it out from the protocol.
|
326 | if (apiEndpoint.startsWith('https')) {
|
327 | this.options.port = 443;
|
328 | }
|
329 | else if (apiEndpoint.startsWith('http')) {
|
330 | this.options.port = 80;
|
331 | }
|
332 | else {
|
333 | this.options.port = undefined;
|
334 | }
|
335 | }
|
336 | else {
|
337 | this.options.port = parseInt(baseUrlParts[1], 10);
|
338 | }
|
339 | // If this looks like a GCP URL of some kind, don't go into emulator
|
340 | // mode. Otherwise, supply a fake SSL provider so a real cert isn't
|
341 | // required for running the emulator.
|
342 | const officialUrlMatch = this.options.servicePath.endsWith('.googleapis.com');
|
343 | if (!officialUrlMatch) {
|
344 | const grpcInstance = this.options.grpc || gax.grpc;
|
345 | this.options.sslCreds = grpcInstance.credentials.createInsecure();
|
346 | this.isEmulator = true;
|
347 | }
|
348 | if (!this.options.projectId && process.env.PUBSUB_PROJECT_ID) {
|
349 | this.options.projectId = process.env.PUBSUB_PROJECT_ID;
|
350 | }
|
351 | }
|
352 | /**
|
353 | * Get a list of schemas associated with your project.
|
354 | *
|
355 | * The returned AsyncIterable will resolve to {@link google.pubsub.v1.ISchema} objects.
|
356 | *
|
357 | * This method returns an async iterable. These objects can be adapted
|
358 | * to work in a Promise/then framework, as well as with callbacks, but
|
359 | * this discussion is considered out of scope for these docs.
|
360 | *
|
361 | * @see [Schemas: list API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/list}
|
362 | * @see [More about async iterators]{@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of}
|
363 | *
|
364 | * @param {google.pubsub.v1.SchemaView} [view] The type of schema objects
|
365 | * requested, which should be an enum value from {@link SchemaViews}. Defaults
|
366 | * to Full.
|
367 | * @param {object} [options] Request configuration options, outlined
|
368 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
369 | * @returns {AsyncIterable<ISchema>}
|
370 | *
|
371 | * @example
|
372 | * ```
|
373 | * for await (const s of pubsub.listSchemas()) {
|
374 | * const moreInfo = await s.get();
|
375 | * }
|
376 | * ```
|
377 | */
|
378 | async *listSchemas(view = schema_1.SchemaViews.Basic, options) {
|
379 | const client = await this.getSchemaClient_();
|
380 | const query = {
|
381 | parent: this.name,
|
382 | view,
|
383 | };
|
384 | for await (const s of client.listSchemasAsync(query, options)) {
|
385 | yield s;
|
386 | }
|
387 | }
|
388 | getSnapshots(optsOrCallback, callback) {
|
389 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
390 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
391 | const reqOpts = Object.assign({
|
392 | project: PubSub.formatName_(this.projectId),
|
393 | }, options);
|
394 | delete reqOpts.gaxOpts;
|
395 | delete reqOpts.autoPaginate;
|
396 | const gaxOpts = Object.assign({
|
397 | autoPaginate: options.autoPaginate,
|
398 | }, options.gaxOpts);
|
399 | this.request({
|
400 | client: 'SubscriberClient',
|
401 | method: 'listSnapshots',
|
402 | reqOpts,
|
403 | gaxOpts,
|
404 | }, (err, rawSnapshots, ...args) => {
|
405 | let snapshots;
|
406 | if (rawSnapshots) {
|
407 | snapshots = rawSnapshots.map((snapshot) => {
|
408 | const snapshotInstance = this.snapshot(snapshot.name);
|
409 | snapshotInstance.metadata = snapshot;
|
410 | return snapshotInstance;
|
411 | });
|
412 | }
|
413 | callback(err, snapshots, ...args);
|
414 | });
|
415 | }
|
416 | getSubscriptions(optsOrCallback, callback) {
|
417 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
418 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
419 | let topic = options.topic;
|
420 | if (topic) {
|
421 | if (!(topic instanceof topic_1.Topic)) {
|
422 | topic = this.topic(topic);
|
423 | }
|
424 | return topic.getSubscriptions(options, callback);
|
425 | }
|
426 | const reqOpts = Object.assign({}, options);
|
427 | reqOpts.project = 'projects/' + this.projectId;
|
428 | delete reqOpts.gaxOpts;
|
429 | delete reqOpts.autoPaginate;
|
430 | const gaxOpts = Object.assign({
|
431 | autoPaginate: options.autoPaginate,
|
432 | }, options.gaxOpts);
|
433 | this.request({
|
434 | client: 'SubscriberClient',
|
435 | method: 'listSubscriptions',
|
436 | reqOpts,
|
437 | gaxOpts,
|
438 | }, (err, rawSubs, ...args) => {
|
439 | let subscriptions;
|
440 | if (rawSubs) {
|
441 | subscriptions = rawSubs.map((sub) => {
|
442 | const subscriptionInstance = this.subscription(sub.name);
|
443 | subscriptionInstance.metadata = sub;
|
444 | return subscriptionInstance;
|
445 | });
|
446 | }
|
447 | callback(err, subscriptions, ...args);
|
448 | });
|
449 | }
|
450 | getTopics(optsOrCallback, callback) {
|
451 | const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
|
452 | callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
|
453 | const reqOpts = Object.assign({
|
454 | project: 'projects/' + this.projectId,
|
455 | }, options);
|
456 | delete reqOpts.gaxOpts;
|
457 | delete reqOpts.autoPaginate;
|
458 | const gaxOpts = Object.assign({
|
459 | autoPaginate: options.autoPaginate,
|
460 | }, options.gaxOpts);
|
461 | this.request({
|
462 | client: 'PublisherClient',
|
463 | method: 'listTopics',
|
464 | reqOpts,
|
465 | gaxOpts,
|
466 | }, (err, rawTopics, ...args) => {
|
467 | let topics;
|
468 | if (rawTopics) {
|
469 | topics = rawTopics.map(topic => {
|
470 | const topicInstance = this.topic(topic.name);
|
471 | topicInstance.metadata = topic;
|
472 | return topicInstance;
|
473 | });
|
474 | }
|
475 | callback(err, topics, ...args);
|
476 | });
|
477 | }
|
478 | /**
|
479 | * Retrieve a client configuration, suitable for passing into a GAPIC
|
480 | * 'v1' class constructor. This will fill out projectId, emulator URLs,
|
481 | * and so forth.
|
482 | *
|
483 | * @returns {Promise<ClientConfig>} the filled client configuration.
|
484 | */
|
485 | async getClientConfig() {
|
486 | if (!this.projectId || this.projectId === PROJECT_ID_PLACEHOLDER) {
|
487 | let projectId;
|
488 | try {
|
489 | projectId = await this.auth.getProjectId();
|
490 | }
|
491 | catch (e) {
|
492 | if (!this.isEmulator) {
|
493 | throw e;
|
494 | }
|
495 | projectId = '';
|
496 | }
|
497 | this.projectId = projectId;
|
498 | this.name = PubSub.formatName_(this.projectId);
|
499 | this.options.projectId = projectId;
|
500 | }
|
501 | return this.options;
|
502 | }
|
503 | /**
|
504 | * Gets a schema client, creating one if needed.
|
505 | * @private
|
506 | */
|
507 | async getSchemaClient_() {
|
508 | if (!this.schemaClient) {
|
509 | const options = await this.getClientConfig();
|
510 | this.schemaClient = new v1.SchemaServiceClient(options);
|
511 | }
|
512 | return this.schemaClient;
|
513 | }
|
514 | /**
|
515 | * Callback function to PubSub.getClient_().
|
516 | * @private
|
517 | * @callback GetClientCallback
|
518 | * @param err - Error, if any.
|
519 | * @param gaxClient - The gax client specified in RequestConfig.client.
|
520 | * Typed any since it's importing Javascript source.
|
521 | */
|
522 | /**
|
523 | * Get the PubSub client object.
|
524 | *
|
525 | * @private
|
526 | *
|
527 | * @param {object} config Configuration object.
|
528 | * @param {object} config.gaxOpts GAX options.
|
529 | * @param {function} config.method The gax method to call.
|
530 | * @param {object} config.reqOpts Request options.
|
531 | * @param {function} [callback] The callback function.
|
532 | */
|
533 | getClient_(config, callback) {
|
534 | this.getClientAsync_(config).then(client => callback(null, client), callback);
|
535 | }
|
536 | /**
|
537 | * Get the PubSub client object.
|
538 | *
|
539 | * @private
|
540 | *
|
541 | * @param {object} config Configuration object.
|
542 | * @param {object} config.gaxOpts GAX options.
|
543 | * @param {function} config.method The gax method to call.
|
544 | * @param {object} config.reqOpts Request options.
|
545 | * @returns {Promise}
|
546 | */
|
547 | async getClientAsync_(config) {
|
548 | // Make sure we've got a fully created config with projectId and such.
|
549 | const options = await this.getClientConfig();
|
550 | let gaxClient = this.api[config.client];
|
551 | if (!gaxClient) {
|
552 | // Lazily instantiate client.
|
553 | gaxClient = new v1[config.client](options);
|
554 | this.api[config.client] = gaxClient;
|
555 | }
|
556 | return gaxClient;
|
557 | }
|
558 | /**
|
559 | * Close all open client objects.
|
560 | *
|
561 | * @private
|
562 | *
|
563 | * @returns {Promise}
|
564 | */
|
565 | async closeAllClients_() {
|
566 | const promises = [];
|
567 | for (const clientConfig of Object.keys(this.api)) {
|
568 | const gaxClient = this.api[clientConfig];
|
569 | promises.push(gaxClient.close());
|
570 | delete this.api[clientConfig];
|
571 | }
|
572 | await Promise.all(promises);
|
573 | }
|
574 | /**
|
575 | * Funnel all API requests through this method, to be sure we have a project
|
576 | * ID.
|
577 | *
|
578 | * @private
|
579 | *
|
580 | * @param {object} config Configuration object.
|
581 | * @param {object} config.gaxOpts GAX options.
|
582 | * @param {function} config.method The gax method to call.
|
583 | * @param {object} config.reqOpts Request options.
|
584 | * @param {function} [callback] The callback function.
|
585 | */
|
586 | request(config, callback) {
|
587 | // This prevents further requests, in case any publishers were hanging around.
|
588 | if (!this.isOpen) {
|
589 | const statusObject = {
|
590 | code: 0,
|
591 | details: 'Cannot use a closed PubSub object.',
|
592 | metadata: null,
|
593 | };
|
594 | const err = new Error(statusObject.details);
|
595 | Object.assign(err, statusObject);
|
596 | callback(err);
|
597 | return;
|
598 | }
|
599 | this.getClient_(config, (err, client) => {
|
600 | if (err) {
|
601 | callback(err);
|
602 | return;
|
603 | }
|
604 | let reqOpts = extend(true, {}, config.reqOpts);
|
605 | reqOpts = projectify_1.replaceProjectIdToken(reqOpts, this.projectId);
|
606 | client[config.method](reqOpts, config.gaxOpts, callback);
|
607 | });
|
608 | }
|
609 | /**
|
610 | * Create a Schema object, representing a schema within the project.
|
611 | * See {@link PubSub#createSchema} or {@link Schema#create} to create a schema.
|
612 | *
|
613 | * @throws {Error} If a name is not provided.
|
614 | *
|
615 | * @param {string} name The ID or name of the schema.
|
616 | * @returns {Schema} A {@link Schema} instance.
|
617 | *
|
618 | * @example
|
619 | * ```
|
620 | * const {PubSub} = require('@google-cloud/pubsub');
|
621 | * const pubsub = new PubSub();
|
622 | *
|
623 | * const schema = pubsub.schema('my-schema');
|
624 | * ```
|
625 | */
|
626 | schema(idOrName) {
|
627 | return new schema_1.Schema(this, idOrName);
|
628 | }
|
629 | /**
|
630 | * Create a Snapshot object. See {@link Subscription#createSnapshot} to
|
631 | * create a snapshot.
|
632 | *
|
633 | * @throws {Error} If a name is not provided.
|
634 | *
|
635 | * @param {string} name The name of the snapshot.
|
636 | * @returns {Snapshot} A {@link Snapshot} instance.
|
637 | *
|
638 | * @example
|
639 | * ```
|
640 | * const {PubSub} = require('@google-cloud/pubsub');
|
641 | * const pubsub = new PubSub();
|
642 | *
|
643 | * const snapshot = pubsub.snapshot('my-snapshot');
|
644 | * ```
|
645 | */
|
646 | snapshot(name) {
|
647 | if (typeof name !== 'string') {
|
648 | throw new Error('You must supply a valid name for the snapshot.');
|
649 | }
|
650 | return new snapshot_1.Snapshot(this, name);
|
651 | }
|
652 | /**
|
653 | * Create a Subscription object. This command by itself will not run any API
|
654 | * requests. You will receive a {@link Subscription} object,
|
655 | * which will allow you to interact with a subscription.
|
656 | *
|
657 | * @throws {Error} If subscription name is omitted.
|
658 | *
|
659 | * @param {string} name Name of the subscription.
|
660 | * @param {SubscriberOptions} [options] Configuration object.
|
661 | * @returns {Subscription} A {@link Subscription} instance.
|
662 | *
|
663 | * @example
|
664 | * ```
|
665 | * const {PubSub} = require('@google-cloud/pubsub');
|
666 | * const pubsub = new PubSub();
|
667 | *
|
668 | * const subscription = pubsub.subscription('my-subscription');
|
669 | *
|
670 | * // Register a listener for `message` events.
|
671 | * subscription.on('message', function(message) {
|
672 | * // Called every time a message is received.
|
673 | * // message.id = ID of the message.
|
674 | * // message.ackId = ID used to acknowledge the message receival.
|
675 | * // message.data = Contents of the message.
|
676 | * // message.attributes = Attributes of the message.
|
677 | * // message.publishTime = Date when Pub/Sub received the message.
|
678 | * });
|
679 | * ```
|
680 | */
|
681 | subscription(name, options) {
|
682 | if (!name) {
|
683 | throw new Error('A name must be specified for a subscription.');
|
684 | }
|
685 | return new subscription_1.Subscription(this, name, options);
|
686 | }
|
687 | /**
|
688 | * Create a Topic object. See {@link PubSub#createTopic} to create a topic.
|
689 | *
|
690 | * @throws {Error} If a name is not provided.
|
691 | *
|
692 | * @param {string} name The name of the topic.
|
693 | * @param {PublishOptions} [options] Publisher configuration object.
|
694 | * @returns {Topic} A {@link Topic} instance.
|
695 | *
|
696 | * @example
|
697 | * ```
|
698 | * const {PubSub} = require('@google-cloud/pubsub');
|
699 | * const pubsub = new PubSub();
|
700 | *
|
701 | * const topic = pubsub.topic('my-topic');
|
702 | * ```
|
703 | */
|
704 | topic(name, options) {
|
705 | if (!name) {
|
706 | throw new Error('A name must be specified for a topic.');
|
707 | }
|
708 | return new topic_1.Topic(this, name, options);
|
709 | }
|
710 | /**
|
711 | * Validate a schema definition.
|
712 | *
|
713 | * @see [Schemas: validateSchema API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/validate}
|
714 | *
|
715 | * @throws {Error} if the validation fails.
|
716 | *
|
717 | * @param {ISchema} schema The schema definition you wish to validate.
|
718 | * @param {object} [options] Request configuration options, outlined
|
719 | * here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.
|
720 | * @returns {Promise<void>}
|
721 | */
|
722 | async validateSchema(schema, gaxOpts) {
|
723 | const client = await this.getSchemaClient_();
|
724 | await client.validateSchema({
|
725 | parent: this.name,
|
726 | schema,
|
727 | }, gaxOpts);
|
728 | }
|
729 | /*!
|
730 | * Format the name of a project. A project's full name is in the
|
731 | * format of projects/{projectId}.
|
732 | *
|
733 | * The GAPIC client should do this for us, but since we maintain
|
734 | * names rather than IDs, this is simpler.
|
735 | *
|
736 | * @private
|
737 | */
|
738 | static formatName_(name) {
|
739 | if (typeof name !== 'string') {
|
740 | throw new Error('A name is required to identify a project.');
|
741 | }
|
742 | // Simple check if the name is already formatted.
|
743 | if (name.indexOf('/') > -1) {
|
744 | return name;
|
745 | }
|
746 | return `projects/${name}`;
|
747 | }
|
748 | }
|
749 | exports.PubSub = PubSub;
|
750 | /**
|
751 | * Get a list of the {@link Snapshot} objects as a readable object stream.
|
752 | *
|
753 | * @method PubSub#getSnapshotsStream
|
754 | * @param {GetSnapshotsRequest} [options] Configuration object. See
|
755 | * {@link PubSub#getSnapshots} for a complete list of options.
|
756 | * @returns {ReadableStream} A readable stream of {@link Snapshot} instances.
|
757 | *
|
758 | * @example
|
759 | * ```
|
760 | * const {PubSub} = require('@google-cloud/pubsub');
|
761 | * const pubsub = new PubSub();
|
762 | *
|
763 | * pubsub.getSnapshotsStream()
|
764 | * .on('error', console.error)
|
765 | * .on('data', function(snapshot) {
|
766 | * // snapshot is a Snapshot object.
|
767 | * })
|
768 | * .on('end', function() {
|
769 | * // All snapshots retrieved.
|
770 | * });
|
771 | *
|
772 | * //-
|
773 | * // If you anticipate many results, you can end a stream early to prevent
|
774 | * // unnecessary processing and API requests.
|
775 | * //-
|
776 | * pubsub.getSnapshotsStream()
|
777 | * .on('data', function(snapshot) {
|
778 | * this.end();
|
779 | * });
|
780 | * ```
|
781 | */
|
782 | /**
|
783 | * Get a list of the {@link Subscription} objects registered to all of
|
784 | * your project's topics as a readable object stream.
|
785 | *
|
786 | * @method PubSub#getSubscriptionsStream
|
787 | * @param {GetSubscriptionsRequest} [options] Configuration object. See
|
788 | * {@link PubSub#getSubscriptions} for a complete list of options.
|
789 | * @returns {ReadableStream} A readable stream of {@link Subscription} instances.
|
790 | *
|
791 | * @example
|
792 | * ```
|
793 | * const {PubSub} = require('@google-cloud/pubsub');
|
794 | * const pubsub = new PubSub();
|
795 | *
|
796 | * pubsub.getSubscriptionsStream()
|
797 | * .on('error', console.error)
|
798 | * .on('data', function(subscription) {
|
799 | * // subscription is a Subscription object.
|
800 | * })
|
801 | * .on('end', function() {
|
802 | * // All subscriptions retrieved.
|
803 | * });
|
804 | *
|
805 | * //-
|
806 | * // If you anticipate many results, you can end a stream early to prevent
|
807 | * // unnecessary processing and API requests.
|
808 | * //-
|
809 | * pubsub.getSubscriptionsStream()
|
810 | * .on('data', function(subscription) {
|
811 | * this.end();
|
812 | * });
|
813 | * ```
|
814 | */
|
815 | /**
|
816 | * Get a list of the {module:pubsub/topic} objects registered to your project as
|
817 | * a readable object stream.
|
818 | *
|
819 | * @method PubSub#getTopicsStream
|
820 | * @param {GetTopicsRequest} [options] Configuration object. See
|
821 | * {@link PubSub#getTopics} for a complete list of options.
|
822 | * @returns {ReadableStream} A readable stream of {@link Topic} instances.
|
823 | *
|
824 | * @example
|
825 | * ```
|
826 | * const {PubSub} = require('@google-cloud/pubsub');
|
827 | * const pubsub = new PubSub();
|
828 | *
|
829 | * pubsub.getTopicsStream()
|
830 | * .on('error', console.error)
|
831 | * .on('data', function(topic) {
|
832 | * // topic is a Topic object.
|
833 | * })
|
834 | * .on('end', function() {
|
835 | * // All topics retrieved.
|
836 | * });
|
837 | *
|
838 | * //-
|
839 | * // If you anticipate many results, you can end a stream early to prevent
|
840 | * // unnecessary processing and API requests.
|
841 | * //-
|
842 | * pubsub.getTopicsStream()
|
843 | * .on('data', function(topic) {
|
844 | * this.end();
|
845 | * });
|
846 | * ```
|
847 | */
|
848 | /*! Developer Documentation
|
849 | *
|
850 | * These methods can be auto-paginated.
|
851 | */
|
852 | paginator_1.paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);
|
853 | /*! Developer Documentation
|
854 | *
|
855 | * Existing async methods (except for streams) will return a Promise in the event
|
856 | * that a callback is omitted. Future methods will not allow for a callback.
|
857 | * (Use .then() on the returned Promise instead.)
|
858 | */
|
859 | util_1.promisifySome(PubSub, PubSub.prototype, [
|
860 | 'close',
|
861 | 'createSubscription',
|
862 | 'createTopic',
|
863 | 'detachSubscription',
|
864 | 'getSnapshots',
|
865 | 'getSubscriptions',
|
866 | 'getTopics',
|
867 | ]);
|
868 | //# sourceMappingURL=pubsub.js.map |
\ | No newline at end of file |