UNPKG

15.8 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2017 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Topic = void 0;
19const paginator_1 = require("@google-cloud/paginator");
20const iam_1 = require("./iam");
21const publisher_1 = require("./publisher");
22const flow_publisher_1 = require("./publisher/flow-publisher");
23const util_1 = require("./util");
24/**
25 * A Topic object allows you to interact with a Cloud Pub/Sub topic.
26 *
27 * @class
28 * @param {PubSub} pubsub PubSub object.
29 * @param {string} name Name of the topic.
30 * @param {PublishOptions} [options] Publisher configuration object.
31 *
32 * @example
33 * ```
34 * const {PubSub} = require('@google-cloud/pubsub');
35 * const pubsub = new PubSub();
36 *
37 * const topic = pubsub.topic('my-topic');
38 *
39 * ```
40 * @example To enable message ordering, set `enableMessageOrdering` to true. Please note that this does not persist to an actual topic.
41 * ```
42 * const topic = pubsub.topic('ordered-topic', {enableMessageOrdering: true});
43 * ```
44 */
45class Topic {
46 constructor(pubsub, name, options) {
47 this.getSubscriptionsStream = paginator_1.paginator.streamify('getSubscriptions');
48 /**
49 * The fully qualified name of this topic.
50 * @name Topic#name
51 * @type {string}
52 */
53 this.name = Topic.formatName_(pubsub.projectId, name);
54 this.publisher = new publisher_1.Publisher(this, options);
55 /**
56 * The parent {@link PubSub} instance of this topic instance.
57 * @name Topic#pubsub
58 * @type {PubSub}
59 */
60 /**
61 * The parent {@link PubSub} instance of this topic instance.
62 * @name Topic#parent
63 * @type {PubSub}
64 */
65 this.parent = this.pubsub = pubsub;
66 this.request = pubsub.request.bind(pubsub);
67 /**
68 * [IAM (Identity and Access
69 * Management)](https://cloud.google.com/pubsub/access_control) allows you
70 * to set permissions on individual resources and offers a wider range of
71 * roles: editor, owner, publisher, subscriber, and viewer. This gives you
72 * greater flexibility and allows you to set more fine-grained access
73 * control.
74 *
75 * *The IAM access control features described in this document are Beta,
76 * including the API methods to get and set IAM policies, and to test IAM
77 * permissions. Cloud Pub/Sub's use of IAM features is not covered by
78 * any SLA or deprecation policy, and may be subject to
79 * backward-incompatible changes.*
80 *
81 * @name Topic#iam
82 * @mixes IAM
83 *
84 * @see [Access Control Overview]{@link https://cloud.google.com/pubsub/access_control}
85 * @see [What is Cloud IAM?]{@link https://cloud.google.com/iam/}
86 *
87 * @example
88 * ```
89 * const {PubSub} = require('@google-cloud/pubsub');
90 * const pubsub = new PubSub();
91 *
92 * const topic = pubsub.topic('my-topic');
93 *
94 * //-
95 * // Get the IAM policy for your topic.
96 * //-
97 * topic.iam.getPolicy((err, policy) => {
98 * console.log(policy);
99 * });
100 *
101 * //-
102 * // If the callback is omitted, we'll return a Promise.
103 * //-
104 * topic.iam.getPolicy().then((data) => {
105 * const policy = data[0];
106 * const apiResponse = data[1];
107 * });
108 * ```
109 */
110 this.iam = new iam_1.IAM(pubsub, this.name);
111 }
112 flush(callback) {
113 // It doesn't matter here if callback is undefined; the Publisher
114 // flush() will handle it.
115 this.publisher.flush(callback);
116 }
117 create(optsOrCallback, callback) {
118 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
119 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
120 this.pubsub.createTopic(this.name, gaxOpts, callback);
121 }
122 createSubscription(name, optsOrCallback, callback) {
123 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
124 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
125 this.pubsub.createSubscription(this, name, options, callback);
126 }
127 delete(optsOrCallback, callback) {
128 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
129 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
130 const reqOpts = {
131 topic: this.name,
132 };
133 this.request({
134 client: 'PublisherClient',
135 method: 'deleteTopic',
136 reqOpts,
137 gaxOpts: gaxOpts,
138 }, callback);
139 }
140 exists(callback) {
141 this.getMetadata(err => {
142 if (!err) {
143 callback(null, true);
144 return;
145 }
146 if (err.code === 5) {
147 callback(null, false);
148 return;
149 }
150 callback(err);
151 });
152 }
153 get(optsOrCallback, callback) {
154 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
155 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
156 const autoCreate = !!gaxOpts.autoCreate;
157 delete gaxOpts.autoCreate;
158 this.getMetadata(gaxOpts, (err, apiResponse) => {
159 if (!err) {
160 callback(null, this, apiResponse);
161 return;
162 }
163 if (err.code !== 5 || !autoCreate) {
164 callback(err, null, apiResponse);
165 return;
166 }
167 this.create(gaxOpts, callback);
168 });
169 }
170 getMetadata(optsOrCallback, callback) {
171 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
172 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
173 const reqOpts = {
174 topic: this.name,
175 };
176 this.request({
177 client: 'PublisherClient',
178 method: 'getTopic',
179 reqOpts,
180 gaxOpts: gaxOpts,
181 }, (err, apiResponse) => {
182 if (!err) {
183 this.metadata = apiResponse;
184 }
185 callback(err, apiResponse);
186 });
187 }
188 getSubscriptions(optsOrCallback, callback) {
189 const options = typeof optsOrCallback === 'object' ? optsOrCallback : {};
190 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
191 const reqOpts = Object.assign({
192 topic: this.name,
193 }, options);
194 delete reqOpts.gaxOpts;
195 delete reqOpts.autoPaginate;
196 const gaxOpts = Object.assign({
197 autoPaginate: options.autoPaginate,
198 }, options.gaxOpts);
199 this.request({
200 client: 'PublisherClient',
201 method: 'listTopicSubscriptions',
202 reqOpts,
203 gaxOpts,
204 }, (err, subNames, ...args) => {
205 let subscriptions;
206 if (subNames) {
207 subscriptions = subNames.map((sub) => this.subscription(sub));
208 }
209 callback(err, subscriptions, ...args);
210 });
211 }
212 publish(data, attrsOrCb, callback) {
213 const attributes = typeof attrsOrCb === 'object' ? attrsOrCb : {};
214 callback = typeof attrsOrCb === 'function' ? attrsOrCb : callback;
215 return this.publishMessage({ data, attributes }, callback);
216 }
217 publishJSON(json, attrsOrCb, callback) {
218 if (!json || typeof json !== 'object') {
219 throw new Error('First parameter should be an object.');
220 }
221 const attributes = typeof attrsOrCb === 'object' ? attrsOrCb : {};
222 callback = typeof attrsOrCb === 'function' ? attrsOrCb : callback;
223 return this.publishMessage({ json, attributes }, callback);
224 }
225 publishMessage(message, callback) {
226 // Make a copy to ensure that any changes we make to it will not
227 // propagate up to the user's data.
228 message = Object.assign({}, message);
229 if (message.json && typeof message.json === 'object') {
230 message.data = Buffer.from(JSON.stringify(message.json));
231 delete message.json;
232 }
233 return this.publisher.publishMessage(message, callback);
234 }
235 /**
236 * Creates a FlowControlledPublisher for this Topic.
237 *
238 * FlowControlledPublisher is a helper that lets you control how many messages
239 * are simultaneously queued to send, to avoid ballooning memory usage on
240 * a low bandwidth connection to Pub/Sub.
241 *
242 * Note that it's perfectly fine to create more than one on the same Topic.
243 * The actual flow control settings on the Topic will apply across all
244 * FlowControlledPublisher objects on that Topic.
245 *
246 * @returns {FlowControlledPublisher} The flow control helper.
247 */
248 flowControlled() {
249 return new flow_publisher_1.FlowControlledPublisher(this.publisher);
250 }
251 /**
252 * In the event that the client fails to publish an ordered message, all
253 * subsequent publish calls using the same ordering key will fail. Calling
254 * this method will disregard the publish failure, allowing the supplied
255 * ordering key to be used again in the future.
256 *
257 * @param {string} orderingKey The ordering key in question.
258 *
259 * @example
260 * ```
261 * const {PubSub} = require('@google-cloud/pubsub');
262 * const pubsub = new PubSub();
263 * const topic = pubsub.topic('my-topic', {messageOrdering: true});
264 *
265 * const orderingKey = 'foo';
266 * const data = Buffer.from('Hello, order!');
267 *
268 * topic.publishMessage({data, orderingKey}, err => {
269 * if (err) {
270 * topic.resumePublishing(orderingKey);
271 * }
272 * });
273 * ```
274 */
275 resumePublishing(orderingKey) {
276 this.publisher.resumePublishing(orderingKey);
277 }
278 setMetadata(options, optsOrCallback, callback) {
279 const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
280 callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
281 const topic = Object.assign({ name: this.name }, options);
282 const updateMask = { paths: Object.keys(options) };
283 const reqOpts = { topic, updateMask };
284 this.request({
285 client: 'PublisherClient',
286 method: 'updateTopic',
287 reqOpts,
288 gaxOpts,
289 }, callback);
290 }
291 /**
292 * Set the publisher options.
293 *
294 * @param {PublishOptions} options The publisher options.
295 *
296 * @example
297 * ```
298 * const {PubSub} = require('@google-cloud/pubsub');
299 * const pubsub = new PubSub();
300 *
301 * const topic = pubsub.topic('my-topic');
302 *
303 * topic.setPublishOptions({
304 * batching: {
305 * maxMilliseconds: 10
306 * }
307 * });
308 * ```
309 */
310 setPublishOptions(options) {
311 this.publisher.setOptions(options);
312 }
313 /**
314 * Get the default publisher options. These may be modified and passed
315 * back into {@link Topic#setPublishOptions}.
316 *
317 * @example
318 * ```
319 * const {PubSub} = require('@google-cloud/pubsub');
320 * const pubsub = new PubSub();
321 *
322 * const topic = pubsub.topic('my-topic');
323 *
324 * const defaults = topic.getPublishOptionDefaults();
325 * defaults.batching.maxMilliseconds = 10;
326 * topic.setPublishOptions(defaults);
327 * ```
328 */
329 getPublishOptionDefaults() {
330 // Generally I'd leave this as a static, but it'll be easier for users to
331 // get at when they're using the veneer objects.
332 return this.publisher.getOptionDefaults();
333 }
334 /**
335 * Create a Subscription object. This command by itself will not run any API
336 * requests. You will receive a {module:pubsub/subscription} object,
337 * which will allow you to interact with a subscription.
338 *
339 * @throws {Error} If subscription name is omitted.
340 *
341 * @param {string} name Name of the subscription.
342 * @param {SubscriberOptions} [options] Configuration object.
343 * @return {Subscription}
344 *
345 * @example
346 * ```
347 * const {PubSub} = require('@google-cloud/pubsub');
348 * const pubsub = new PubSub();
349 *
350 * const topic = pubsub.topic('my-topic');
351 * const subscription = topic.subscription('my-subscription');
352 *
353 * // Register a listener for `message` events.
354 * subscription.on('message', (message) => {
355 * // Called every time a message is received.
356 * // message.id = ID of the message.
357 * // message.ackId = ID used to acknowledge the message receival.
358 * // message.data = Contents of the message.
359 * // message.attributes = Attributes of the message.
360 * // message.publishTime = Timestamp when Pub/Sub received the message.
361 * });
362 * ```
363 */
364 subscription(name, options) {
365 options = options || {};
366 options.topic = this;
367 return this.pubsub.subscription(name, options);
368 }
369 /**
370 * Format the name of a topic. A Topic's full name is in the format of
371 * 'projects/{projectId}/topics/{topicName}'.
372 *
373 * @private
374 *
375 * @return {string}
376 */
377 static formatName_(projectId, name) {
378 // Simple check if the name is already formatted.
379 if (name.indexOf('/') > -1) {
380 return name;
381 }
382 return 'projects/' + projectId + '/topics/' + name;
383 }
384}
385exports.Topic = Topic;
386/**
387 * Get a list of the {module:pubsub/subscription} objects registered to this
388 * topic as a readable object stream.
389 *
390 * @method PubSub#getSubscriptionsStream
391 * @param {GetSubscriptionsRequest} [options] Configuration object. See
392 * {@link PubSub#getSubscriptions} for a complete list of options.
393 * @returns {ReadableStream} A readable stream of {@link Subscription} instances.
394 *
395 * @example
396 * ```
397 * const {PubSub} = require('@google-cloud/pubsub');
398 * const pubsub = new PubSub();
399 *
400 * const topic = pubsub.topic('my-topic');
401 *
402 * topic.getSubscriptionsStream()
403 * .on('error', console.error)
404 * .on('data', (subscription) => {
405 * // subscription is a Subscription object.
406 * })
407 * .on('end', () => {
408 * // All subscriptions retrieved.
409 * });
410 *
411 * //-
412 * // If you anticipate many results, you can end a stream early to prevent
413 * // unnecessary processing and API requests.
414 * //-
415 * topic.getSubscriptionsStream()
416 * .on('data', function(subscription) {
417 * this.end();
418 * });
419 * ```
420 */
421/*! Developer Documentation
422 *
423 * These methods can be agto-paginated.
424 */
425paginator_1.paginator.extend(Topic, ['getSubscriptions']);
426/*! Developer Documentation
427 *
428 * Existing async methods (except for streams) will return a Promise in the event
429 * that a callback is omitted. Future methods will not allow for a callback.
430 * (Use .then() on the returned Promise instead.)
431 */
432util_1.promisifySome(Topic, Topic.prototype, [
433 'flush',
434 'create',
435 'createSubscription',
436 'delete',
437 'exists',
438 'get',
439 'getMetadata',
440 'getSubscriptions',
441 'setMetadata',
442]);
443//# sourceMappingURL=topic.js.map
\No newline at end of file