All files / confluent-kafka-javascript/lib/kafkajs _kafka.js

80.64% Statements 25/31
63.63% Branches 14/22
100% Functions 4/4
80.64% Lines 25/31

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 1251x 1x 1x 1x 1x                 4x                           4x   4x 4x                       4x 4x   4x   4x         4x   4x                           1x 1x       1x                           2x 2x       2x                           1x 1x       1x       1x                  
const { Producer, CompressionTypes } = require('./_producer');
const { Consumer, PartitionAssigners } = require('./_consumer');
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
const error = require('./_error');
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
 
/**
 * This class holds common configuration for clients, and an instance can be
 * used to create producers, consumers, or admin clients.
 * @memberof KafkaJS
 */
class Kafka {
  /** @type{import("../../types/kafkajs").CommonConstructorConfig} */
  #commonClientConfig = {};
 
  /**
   * The configuration provided will be shared across all clients created using
   * this Kafka object.
   *
   * Properties listed within [CONFIGURATION.md]{@link https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md}
   * can be used as keys for this object.
   *
   * A number of KafkaJS-compatible configuration options can be provided as an object
   * with the `kafkaJS` key as illustrated in [MIGRATION.md]{@link https://github.com/confluentinc/confluent-kafka-javascript/blob/dev_early_access_development_branch/MIGRATION.md}.
   * @param {object} config - The common configuration for all clients created using this Kafka object.
   */
  constructor(config) {
    this.#commonClientConfig = config ?? {};
 
    const disallowedKey = checkIfKafkaJsKeysPresent('common', this.#commonClientConfig);
    Iif (disallowedKey !== null) {
      throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSCommonKey(disallowedKey));
    }
  }
 
  /**
   * Merge the producer/consumer specific configuration with the common configuration.
   * @param {import("../../types/kafkajs").ProducerConstructorConfig|import("../../types/kafkajs").ConsumerConstructorConfig} config
   * @returns {(import("../../types/kafkajs").ProducerConstructorConfig & import("../../types/kafkajs").CommonConstructorConfig) | (import("../../types/kafkajs").ConsumerConstructorConfig & import("../../types/kafkajs").CommonConstructorConfig)}
   * @private
   */
  #mergeConfiguration(config) {
    config = Object.assign({}, config) ?? {};
    const mergedConfig = Object.assign({}, this.#commonClientConfig);
 
    mergedConfig.kafkaJS = Object.assign({}, mergedConfig.kafkaJS) ?? {};
 
    Iif (typeof config.kafkaJS === 'object') {
      mergedConfig.kafkaJS = Object.assign(mergedConfig.kafkaJS, config.kafkaJS);
      delete config.kafkaJS;
    }
 
    Object.assign(mergedConfig, config);
 
    return mergedConfig;
  }
 
  /**
   * Creates a new producer.
   *
   * An object containing the configuration for the producer to be created. This
   * will be merged with the common configuration provided when creating the
   * {@link RdKafka.Kafka} object, and the same set of keys can be used.
   *
   * @param {object} config - The configuration for the producer to be created.
   * @returns {KafkaJS.Producer}
   */
  producer(config) {
    const disallowedKey = checkIfKafkaJsKeysPresent('producer', config ?? {});
    Iif (disallowedKey !== null) {
      throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'producer'));
    }
 
    return new Producer(this.#mergeConfiguration(config));
  }
 
  /**
   * Creates a new consumer.
   *
   * An object containing the configuration for the consumer to be created. This
   * will be merged with the common configuration provided when creating the
   * {@link RdKafka.Kafka} object, and the same set of keys can be used.
   *
   * @param {object} config - The configuration for the consumer to be created.
   * @returns {KafkaJS.Consumer}
   */
  consumer(config) {
    const disallowedKey = checkIfKafkaJsKeysPresent('consumer', config ?? {});
    Iif (disallowedKey !== null) {
      throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'consumer'));
    }
 
    return new Consumer(this.#mergeConfiguration(config));
  }
 
  /**
   * Creates a new admin client.
   *
   * An object containing the configuration for the admin client to be created. This
   * will be merged with the common configuration provided when creating the
   * {@link RdKafka.Kafka} object, and the same set of keys can be used.
   *
   * @param {object} config - The configuration for the admin client to be created.
   * @returns {KafkaJS.Admin}
   */
  admin(config) {
    const disallowedKey = checkIfKafkaJsKeysPresent('admin', config ?? {});
    Iif (disallowedKey !== null) {
      throw new error.KafkaJSError(CompatibilityErrorMessages.kafkaJSClientKey(disallowedKey, 'admin'));
    }
 
    return new Admin(this.#mergeConfiguration(config));
  }
}
 
module.exports = {
  Kafka,
  ...error, logLevel,
  PartitionAssigners,
  PartitionAssignors: PartitionAssigners,
  CompressionTypes,
  ConsumerGroupStates,
  AclOperationTypes,
  IsolationLevel};