Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 1x 1x 1x 2x 2x 2x 1x 1x 1x 1x | const { Producer, CompressionTypes } = require('./_producer'); const { Consumer, PartitionAssigners } = require('./_consumer'); const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin'); const error = require('./_error'); const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common'); /** * This class holds common configuration for clients, and an instance can be * used to create producers, consumers, or admin clients. * @memberof KafkaJS */ class Kafka { /** @type{import("../../types/kafkajs").CommonConstructorConfig} */ #commonClientConfig = {}; /** * The configuration provided will be shared across all clients created using * this Kafka object. * * Properties listed within [CONFIGURATION.md]{@link https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md} * can be used as keys for this object. * * A number of KafkaJS-compatible configuration options can be provided as an object * with the `kafkaJS` key as illustrated in [MIGRATION.md]{@link https://github.com/confluentinc/confluent-kafka-javascript/blob/dev_early_access_development_branch/MIGRATION.md}. * @param {object} config - The common configuration for all clients created using this Kafka object. */ constructor(config) { this.#commonClientConfig = config ?? {}; const disallowedKey = checkIfKafkaJsKeysPresent('common', this.#commonClientConfig); Iif (disallowedKey !== null) { throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSCommonKey(disallowedKey)); } } /** * Merge the producer/consumer specific configuration with the common configuration. * @param {import("../../types/kafkajs").ProducerConstructorConfig|import("../../types/kafkajs").ConsumerConstructorConfig} config * @returns {(import("../../types/kafkajs").ProducerConstructorConfig & import("../../types/kafkajs").CommonConstructorConfig) | (import("../../types/kafkajs").ConsumerConstructorConfig & import("../../types/kafkajs").CommonConstructorConfig)} * @private */ #mergeConfiguration(config) { config = Object.assign({}, config) ?? {}; const mergedConfig = Object.assign({}, this.#commonClientConfig); mergedConfig.kafkaJS = Object.assign({}, mergedConfig.kafkaJS) ?? {}; Iif (typeof config.kafkaJS === 'object') { mergedConfig.kafkaJS = Object.assign(mergedConfig.kafkaJS, config.kafkaJS); delete config.kafkaJS; } Object.assign(mergedConfig, config); return mergedConfig; } /** * Creates a new producer. * * An object containing the configuration for the producer to be created. This * will be merged with the common configuration provided when creating the * {@link RdKafka.Kafka} object, and the same set of keys can be used. * * @param {object} config - The configuration for the producer to be created. * @returns {KafkaJS.Producer} */ producer(config) { const disallowedKey = checkIfKafkaJsKeysPresent('producer', config ?? {}); Iif (disallowedKey !== null) { throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'producer')); } return new Producer(this.#mergeConfiguration(config)); } /** * Creates a new consumer. * * An object containing the configuration for the consumer to be created. This * will be merged with the common configuration provided when creating the * {@link RdKafka.Kafka} object, and the same set of keys can be used. * * @param {object} config - The configuration for the consumer to be created. * @returns {KafkaJS.Consumer} */ consumer(config) { const disallowedKey = checkIfKafkaJsKeysPresent('consumer', config ?? {}); Iif (disallowedKey !== null) { throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'consumer')); } return new Consumer(this.#mergeConfiguration(config)); } /** * Creates a new admin client. * * An object containing the configuration for the admin client to be created. This * will be merged with the common configuration provided when creating the * {@link RdKafka.Kafka} object, and the same set of keys can be used. * * @param {object} config - The configuration for the admin client to be created. * @returns {KafkaJS.Admin} */ admin(config) { const disallowedKey = checkIfKafkaJsKeysPresent('admin', config ?? {}); Iif (disallowedKey !== null) { throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'admin')); } return new Admin(this.#mergeConfiguration(config)); } } module.exports = { Kafka, ...error, logLevel, PartitionAssigners, PartitionAssignors: PartitionAssigners, CompressionTypes, ConsumerGroupStates, AclOperationTypes, IsolationLevel}; |