/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 *               2023 Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rdkafka_int.h"
#include "rd.h"
#include "rdfloat.h"

#include <stdlib.h>
#include <ctype.h>
#include <stddef.h>

#include "rdkafka_int.h"
#include "rdkafka_feature.h"
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_assignor.h"
#include "rdkafka_sasl_oauthbearer.h"
#if WITH_PLUGINS
#include "rdkafka_plugin.h"
#endif
#include "rdunittest.h"

#ifndef _WIN32
#include <netinet/tcp.h>
#else

#ifndef WIN32_MEAN_AND_LEAN
#define WIN32_MEAN_AND_LEAN
#endif
#include <windows.h>
#endif

struct rd_kafka_property {
        rd_kafka_conf_scope_t scope;
        const char *name;
        enum { _RK_C_STR,
               _RK_C_INT,
               _RK_C_DBL, /* Double */
               _RK_C_S2I, /* String to Integer mapping.
                           * Supports limited canonical str->int mappings
                           * using s2i[] */
               _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
               _RK_C_BOOL,
               _RK_C_PTR,     /* Only settable through special set functions */
               _RK_C_PATLIST, /* Pattern list */
               _RK_C_KSTR,    /* Kafka string */
               _RK_C_ALIAS, /* Alias: points to other property through .sdef */
               _RK_C_INTERNAL, /* Internal, don't expose to application */
               _RK_C_INVALID,  /* Invalid property, used to catch known
                                * but unsupported Java properties. */
        } type;
        int offset;
        const char *desc;
        int vmin;
        int vmax;
        int vdef;         /* Default value (int) */
        const char *sdef; /* Default value (string) */
        void *pdef;       /* Default value (pointer) */
        double ddef;      /* Default value (double) */
        double dmin;
        double dmax;
        struct {
                int val;
                const char *str;
                const char *unsupported; /**< Reason for value not being
                                          *   supported in this build. */
        } s2i[20];                       /* _RK_C_S2I and _RK_C_S2F */

        const char *unsupported; /**< Reason for propery not being supported
                                  *   in this build.
                                  *   Will be included in the conf_set()
                                  *   error string. */

        /* Value validator (STR) */
        int (*validate)(const struct rd_kafka_property *prop,
                        const char *val,
                        int ival);

        /* Configuration object constructors and destructor for use when
         * the property value itself is not used, or needs extra care. */
        void (*ctor)(int scope, void *pconf);
        void (*dtor)(int scope, void *pconf);
        void (*copy)(int scope,
                     void *pdst,
                     const void *psrc,
                     void *dstptr,
                     const void *srcptr,
                     size_t filter_cnt,
                     const char **filter);

        rd_kafka_conf_res_t (*set)(int scope,
                                   void *pconf,
                                   const char *name,
                                   const char *value,
                                   void *dstptr,
                                   rd_kafka_conf_set_mode_t set_mode,
                                   char *errstr,
                                   size_t errstr_size);
};


#define _RK(field)  offsetof(rd_kafka_conf_t, field)
#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)

#if WITH_SSL
#define _UNSUPPORTED_SSL .unsupported = NULL
#else
#define _UNSUPPORTED_SSL .unsupported = "OpenSSL not available at build time"
#endif

#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && defined(WITH_SSL) &&              \
    !defined(LIBRESSL_VERSION_NUMBER)
#define _UNSUPPORTED_OPENSSL_1_0_2 .unsupported = NULL
#else
#define _UNSUPPORTED_OPENSSL_1_0_2                                             \
        .unsupported = "OpenSSL >= 1.0.2 not available at build time"
#endif

#if OPENSSL_VERSION_NUMBER >= 0x10100000 && defined(WITH_SSL) &&               \
    !defined(LIBRESSL_VERSION_NUMBER)
#define _UNSUPPORTED_OPENSSL_1_1_0 .unsupported = NULL
#else
#define _UNSUPPORTED_OPENSSL_1_1_0                                             \
        .unsupported = "OpenSSL >= 1.1.0 not available at build time"
#endif

#if WITH_SSL_ENGINE
#define _UNSUPPORTED_SSL_ENGINE .unsupported = NULL
#else
#define _UNSUPPORTED_SSL_ENGINE                                                \
        .unsupported = "OpenSSL >= 1.1.x not available at build time"
#endif

#if OPENSSL_VERSION_NUMBER >= 0x30000000 && defined(WITH_SSL)
#define _UNSUPPORTED_SSL_3 .unsupported = NULL
#else
#define _UNSUPPORTED_SSL_3                                                     \
        .unsupported = "OpenSSL >= 3.0.0 not available at build time"
#endif


#if WITH_ZLIB
#define _UNSUPPORTED_ZLIB .unsupported = NULL
#else
#define _UNSUPPORTED_ZLIB .unsupported = "zlib not available at build time"
#endif

#if WITH_SNAPPY
#define _UNSUPPORTED_SNAPPY .unsupported = NULL
#else
#define _UNSUPPORTED_SNAPPY .unsupported = "snappy not enabled at build time"
#endif

#if WITH_ZSTD
#define _UNSUPPORTED_ZSTD .unsupported = NULL
#else
#define _UNSUPPORTED_ZSTD .unsupported = "libzstd not available at build time"
#endif

#if WITH_CURL
#define _UNSUPPORTED_HTTP .unsupported = NULL
#else
#define _UNSUPPORTED_HTTP .unsupported = "libcurl not available at build time"
#endif

#if WITH_OAUTHBEARER_OIDC
#define _UNSUPPORTED_OIDC .unsupported = NULL
#else
#define _UNSUPPORTED_OIDC                                                      \
        .unsupported =                                                         \
            "OAuth/OIDC depends on libcurl and OpenSSL which were not "        \
            "available at build time"
#endif

#ifdef _WIN32
#define _UNSUPPORTED_WIN32_GSSAPI                                              \
        .unsupported =                                                         \
            "Kerberos keytabs are not supported on Windows, "                  \
            "instead the logged on "                                           \
            "user's credentials are used through native SSPI"
#else
#define _UNSUPPORTED_WIN32_GSSAPI .unsupported = NULL
#endif

#if defined(_WIN32) || defined(WITH_SASL_CYRUS)
#define _UNSUPPORTED_GSSAPI .unsupported = NULL
#else
#define _UNSUPPORTED_GSSAPI                                                    \
        .unsupported = "cyrus-sasl/libsasl2 not available at build time"
#endif

#define _UNSUPPORTED_OAUTHBEARER _UNSUPPORTED_SSL


static rd_kafka_conf_res_t
rd_kafka_anyconf_get0(const void *conf,
                      const struct rd_kafka_property *prop,
                      char *dest,
                      size_t *dest_size);



/**
 * @returns a unique index for property \p prop, using the byte position
 *          of the field.
 */
static RD_INLINE int rd_kafka_prop2idx(const struct rd_kafka_property *prop) {
        return prop->offset;
}



/**
 * @brief Set the property as modified.
 *
 * We do this by mapping the property's conf struct field byte offset
 * to a bit in a bit vector.
 * If the bit is set the property has been modified, otherwise it is
 * at its default unmodified value.
 *
 * \p is_modified 1: set as modified, 0: clear modified
 */
static void rd_kafka_anyconf_set_modified(void *conf,
                                          const struct rd_kafka_property *prop,
                                          int is_modified) {
        int idx                              = rd_kafka_prop2idx(prop);
        int bkt                              = idx / 64;
        uint64_t bit                         = (uint64_t)1 << (idx % 64);
        struct rd_kafka_anyconf_hdr *confhdr = conf;

        rd_assert(idx < RD_KAFKA_CONF_PROPS_IDX_MAX &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");

        if (is_modified)
                confhdr->modified[bkt] |= bit;
        else
                confhdr->modified[bkt] &= ~bit;
}

/**
 * @brief Clear is_modified for all properties.
 * @warning Does NOT clear/reset the value.
 */
static void rd_kafka_anyconf_clear_all_is_modified(void *conf) {
        struct rd_kafka_anyconf_hdr *confhdr = conf;

        memset(confhdr, 0, sizeof(*confhdr));
}


/**
 * @returns true of the property has been set/modified, else false.
 */
static rd_bool_t
rd_kafka_anyconf_is_modified(const void *conf,
                             const struct rd_kafka_property *prop) {
        int idx                                    = rd_kafka_prop2idx(prop);
        int bkt                                    = idx / 64;
        uint64_t bit                               = (uint64_t)1 << (idx % 64);
        const struct rd_kafka_anyconf_hdr *confhdr = conf;

        return !!(confhdr->modified[bkt] & bit);
}

/**
 * @returns true if any property in \p conf has been set/modified.
 */
static rd_bool_t rd_kafka_anyconf_is_any_modified(const void *conf) {
        const struct rd_kafka_anyconf_hdr *confhdr = conf;
        int i;

        for (i = 0; i < (int)RD_ARRAYSIZE(confhdr->modified); i++)
                if (confhdr->modified[i])
                        return rd_true;

        return rd_false;
}



/**
 * @brief Validate \p broker.version.fallback property.
 */
static int
rd_kafka_conf_validate_broker_version(const struct rd_kafka_property *prop,
                                      const char *val,
                                      int ival) {
        struct rd_kafka_ApiVersion *apis;
        size_t api_cnt;
        return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
}

/**
 * @brief Validate that string is a single item, without delimters (, space).
 */
static RD_UNUSED int
rd_kafka_conf_validate_single(const struct rd_kafka_property *prop,
                              const char *val,
                              int ival) {
        return !strchr(val, ',') && !strchr(val, ' ');
}

/**
 * @brief Validate builtin partitioner string
 */
static RD_UNUSED int
rd_kafka_conf_validate_partitioner(const struct rd_kafka_property *prop,
                                   const char *val,
                                   int ival) {
        return !strcmp(val, "random") || !strcmp(val, "consistent") ||
               !strcmp(val, "consistent_random") || !strcmp(val, "murmur2") ||
               !strcmp(val, "murmur2_random") || !strcmp(val, "fnv1a") ||
               !strcmp(val, "fnv1a_random");
}


/**
 * librdkafka configuration property definitions.
 */
static const struct rd_kafka_property rd_kafka_properties[] = {
    /* Global properties */
    {_RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
     "Indicates the builtin features for this build of librdkafka. "
     "An application can either query this value or attempt to set it "
     "with its list of required features to check for library support.",
     0, 0x7fffffff, 0xffff,
     .s2i = {{0x1, "gzip", _UNSUPPORTED_ZLIB},
             {0x2, "snappy", _UNSUPPORTED_SNAPPY},
             {0x4, "ssl", _UNSUPPORTED_SSL},
             {0x8, "sasl"},
             {0x10, "regex"},
             {0x20, "lz4"},
             {0x40, "sasl_gssapi", _UNSUPPORTED_GSSAPI},
             {0x80, "sasl_plain"},
             {0x100, "sasl_scram", _UNSUPPORTED_SSL},
             {0x200, "plugins"
#if !WITH_PLUGINS
              ,
              .unsupported = "libdl/dlopen(3) not available at "
                             "build time"
#endif
             },
             {0x400, "zstd", _UNSUPPORTED_ZSTD},
             {0x800, "sasl_oauthbearer", _UNSUPPORTED_SSL},
             {0x1000, "http", _UNSUPPORTED_HTTP},
             {0x2000, "oidc", _UNSUPPORTED_OIDC},
             {0, NULL}}},
    {_RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
     "Client identifier.", .sdef = "rdkafka"},
    {_RK_GLOBAL | _RK_HIDDEN, "client.software.name", _RK_C_STR, _RK(sw_name),
     "Client software name as reported to broker version >= v2.4.0. "
     "Broker-side character restrictions apply, as of broker version "
     "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
     "will replace any other character with `-` and strip leading and "
     "trailing non-alphanumeric characters before tranmission to "
     "the broker. "
     "This property should only be set by high-level language "
     "librdkafka client bindings.",
     .sdef = "librdkafka"},
    {
        _RK_GLOBAL | _RK_HIDDEN,
        "client.software.version",
        _RK_C_STR,
        _RK(sw_version),
        "Client software version as reported to broker version >= v2.4.0. "
        "Broker-side character restrictions apply, as of broker version "
        "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
        "will replace any other character with `-` and strip leading and "
        "trailing non-alphanumeric characters before tranmission to "
        "the broker. "
        "This property should only be set by high-level language "
        "librdkafka client bindings."
        "If changing this property it is highly recommended to append the "
        "librdkafka version.",
    },
    {_RK_GLOBAL | _RK_HIGH, "metadata.broker.list", _RK_C_STR, _RK(brokerlist),
     "Initial list of brokers as a CSV list of broker host or host:port. "
     "The application may also use `rd_kafka_brokers_add()` to add "
     "brokers during runtime."},
    {_RK_GLOBAL | _RK_HIGH, "bootstrap.servers", _RK_C_ALIAS, 0,
     "See metadata.broker.list", .sdef = "metadata.broker.list"},
    {_RK_GLOBAL | _RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
     "Maximum Kafka protocol request message size. "
     "Due to differing framing overhead between protocol versions the "
     "producer is unable to reliably enforce a strict max message limit "
     "at produce time and may exceed the maximum size by one message in "
     "protocol ProduceRequests, the broker will enforce the the topic's "
     "`max.message.bytes` limit (see Apache Kafka documentation).",
     1000, 1000000000, 1000000},
    {_RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT, _RK(msg_copy_max_size),
     "Maximum size for message to be copied to buffer. "
     "Messages larger than this will be passed by reference (zero-copy) "
     "at the expense of larger iovecs.",
     0, 1000000000, 0xffff},
    {_RK_GLOBAL | _RK_MED, "receive.message.max.bytes", _RK_C_INT,
     _RK(recv_max_msg_size),
     "Maximum Kafka protocol response message size. "
     "This serves as a safety precaution to avoid memory exhaustion in "
     "case of protocol hickups. "
     "This value must be at least `fetch.max.bytes`  + 512 to allow "
     "for protocol overhead; the value is adjusted automatically "
     "unless the configuration property is explicitly set.",
     1000, INT_MAX, 100000000},
    {_RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
     _RK(max_inflight),
     "Maximum number of in-flight requests per broker connection. "
     "This is a generic property applied to all broker communication, "
     "however it is primarily relevant to produce requests. "
     "In particular, note that other mechanisms limit the number "
     "of outstanding consumer fetch request per broker to one.",
     1, 1000000, 1000000},
    {_RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
     .sdef = "max.in.flight.requests.per.connection"},
    {_RK_GLOBAL | _RK_DEPRECATED | _RK_HIDDEN, "metadata.request.timeout.ms",
     _RK_C_INT, _RK(metadata_request_timeout_ms), "Not used.", 10, 900 * 1000,
     10},
    {_RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
     _RK(metadata_refresh_interval_ms),
     "Period of time in milliseconds at which topic and broker "
     "metadata is refreshed in order to proactively discover any new "
     "brokers, topics, partitions or partition leader changes. "
     "Use -1 to disable the intervalled refresh (not recommended). "
     "If there are no locally referenced topics "
     "(no topic objects created, no messages produced, "
     "no subscription or no assignment) then only the broker list will "
     "be refreshed every interval but no more often than every 10s.",
     -1, 3600 * 1000, 5 * 60 * 1000},
    {_RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT, _RK(metadata_max_age_ms),
     "Metadata cache max age. "
     "Defaults to topic.metadata.refresh.interval.ms * 3",
     1, 24 * 3600 * 1000, 5 * 60 * 1000 * 3},
    {_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
     _RK(metadata_refresh_fast_interval_ms),
     "When a topic loses its leader a new metadata request will be "
     "enqueued immediately and then with this initial interval, exponentially "
     "increasing upto `retry.backoff.max.ms`, "
     "until the topic metadata has been refreshed. "
     "If not set explicitly, it will be defaulted to `retry.backoff.ms`. "
     "This is used to recover quickly from transitioning leader brokers.",
     1, 60 * 1000, 100},
    {_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
     _RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10},
    {_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
     _RK(metadata_refresh_sparse),
     "Sparse metadata requests (consumes less network bandwidth)", 0, 1, 1},
    {_RK_GLOBAL, "topic.metadata.propagation.max.ms", _RK_C_INT,
     _RK(metadata_propagation_max_ms),
     "Apache Kafka topic creation is asynchronous and it takes some "
     "time for a new topic to propagate throughout the cluster to all "
     "brokers. "
     "If a client requests topic metadata after manual topic creation but "
     "before the topic has been fully propagated to the broker the "
     "client is requesting metadata from, the topic will seem to be "
     "non-existent and the client will mark the topic as such, "
     "failing queued produced messages with `ERR__UNKNOWN_TOPIC`. "
     "This setting delays marking a topic as non-existent until the "
     "configured propagation max time has passed. "
     "The maximum propagation time is calculated from the time the "
     "topic is first referenced in the client, e.g., on produce().",
     0, 60 * 60 * 1000, 30 * 1000},
    {_RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST, _RK(topic_blacklist),
     "Topic blacklist, a comma-separated list of regular expressions "
     "for matching topic names that should be ignored in "
     "broker metadata information as if the topics did not exist."},
    {_RK_GLOBAL | _RK_MED, "debug", _RK_C_S2F, _RK(debug),
     "A comma-separated list of debug contexts to enable. "
     "Detailed Producer debugging: broker,topic,msg. "
     "Consumer: consumer,cgrp,topic,fetch",
     .s2i = {{RD_KAFKA_DBG_GENERIC, "generic"},
             {RD_KAFKA_DBG_BROKER, "broker"},
             {RD_KAFKA_DBG_TOPIC, "topic"},
             {RD_KAFKA_DBG_METADATA, "metadata"},
             {RD_KAFKA_DBG_FEATURE, "feature"},
             {RD_KAFKA_DBG_QUEUE, "queue"},
             {RD_KAFKA_DBG_MSG, "msg"},
             {RD_KAFKA_DBG_PROTOCOL, "protocol"},
             {RD_KAFKA_DBG_CGRP, "cgrp"},
             {RD_KAFKA_DBG_SECURITY, "security"},
             {RD_KAFKA_DBG_FETCH, "fetch"},
             {RD_KAFKA_DBG_INTERCEPTOR, "interceptor"},
             {RD_KAFKA_DBG_PLUGIN, "plugin"},
             {RD_KAFKA_DBG_CONSUMER, "consumer"},
             {RD_KAFKA_DBG_ADMIN, "admin"},
             {RD_KAFKA_DBG_EOS, "eos"},
             {RD_KAFKA_DBG_MOCK, "mock"},
             {RD_KAFKA_DBG_ASSIGNOR, "assignor"},
             {RD_KAFKA_DBG_CONF, "conf"},
             {RD_KAFKA_DBG_ALL, "all"}}},
    {_RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
     "Default timeout for network requests. "
     "Producer: ProduceRequests will use the lesser value of "
     "`socket.timeout.ms` and remaining `message.timeout.ms` for the "
     "first message in the batch. "
     "Consumer: FetchRequests will use "
     "`fetch.wait.max.ms` + `socket.timeout.ms`. "
     "Admin: Admin requests will use `socket.timeout.ms` or explicitly "
     "set `rd_kafka_AdminOptions_set_operation_timeout()` value.",
     10, 300 * 1000, 60 * 1000},
    {_RK_GLOBAL | _RK_DEPRECATED, "socket.blocking.max.ms", _RK_C_INT,
     _RK(socket_blocking_max_ms), "No longer used.", 1, 60 * 1000, 1000},
    {_RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT, _RK(socket_sndbuf_size),
     "Broker socket send buffer size. System default is used if 0.", 0,
     100000000, 0},
    {_RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
     _RK(socket_rcvbuf_size),
     "Broker socket receive buffer size. System default is used if 0.", 0,
     100000000, 0},
    {_RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL, _RK(socket_keepalive),
     "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets", 0, 1, 0
#ifndef SO_KEEPALIVE
     ,
     .unsupported = "SO_KEEPALIVE not available at build time"
#endif
    },
    {_RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL, _RK(socket_nagle_disable),
     "Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.", 0, 1, 0
#ifndef TCP_NODELAY
     ,
     .unsupported = "TCP_NODELAY not available at build time"
#endif
    },
    {_RK_GLOBAL, "socket.max.fails", _RK_C_INT, _RK(socket_max_fails),
     "Disconnect from broker when this number of send failures "
     "(e.g., timed out requests) is reached. Disable with 0. "
     "WARNING: It is highly recommended to leave this setting at "
     "its default value of 1 to avoid the client and broker to "
     "become desynchronized in case of request timeouts. "
     "NOTE: The connection is automatically re-established.",
     0, 1000000, 1},
    {_RK_GLOBAL, "broker.address.ttl", _RK_C_INT, _RK(broker_addr_ttl),
     "How long to cache the broker address resolving "
     "results (milliseconds).",
     0, 86400 * 1000, 1 * 1000},
    {_RK_GLOBAL, "broker.address.family", _RK_C_S2I, _RK(broker_addr_family),
     "Allowed broker IP address families: any, v4, v6", .vdef = AF_UNSPEC,
     .s2i =
         {
             {AF_UNSPEC, "any"},
             {AF_INET, "v4"},
             {AF_INET6, "v6"},
         }},
    {_RK_GLOBAL | _RK_MED, "socket.connection.setup.timeout.ms", _RK_C_INT,
     _RK(socket_connection_setup_timeout_ms),
     "Maximum time allowed for broker connection setup "
     "(TCP connection setup as well SSL and SASL handshake). "
     "If the connection to the broker is not fully functional after this "
     "the connection will be closed and retried.",
     1000, INT_MAX, 30 * 1000 /* 30s */},
    {_RK_GLOBAL | _RK_MED, "connections.max.idle.ms", _RK_C_INT,
     _RK(connections_max_idle_ms),
     "Close broker connections after the specified time of "
     "inactivity. "
     "Disable with 0. "
     "If this property is left at its default value some heuristics are "
     "performed to determine a suitable default value, this is currently "
     "limited to identifying brokers on Azure "
     "(see librdkafka issue #3109 for more info).",
     0, INT_MAX, 0},
    {_RK_GLOBAL | _RK_MED | _RK_HIDDEN, "enable.sparse.connections", _RK_C_BOOL,
     _RK(sparse_connections),
     "When enabled the client will only connect to brokers "
     "it needs to communicate with. When disabled the client "
     "will maintain connections to all brokers in the cluster.",
     0, 1, 1},
    {_RK_GLOBAL | _RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
     _RK(reconnect_jitter_ms),
     "No longer used. See `reconnect.backoff.ms` and "
     "`reconnect.backoff.max.ms`.",
     0, 60 * 60 * 1000, 0},
    {_RK_GLOBAL | _RK_MED, "reconnect.backoff.ms", _RK_C_INT,
     _RK(reconnect_backoff_ms),
     "The initial time to wait before reconnecting to a broker "
     "after the connection has been closed. "
     "The time is increased exponentially until "
     "`reconnect.backoff.max.ms` is reached. "
     "-25% to +50% jitter is applied to each reconnect backoff. "
     "A value of 0 disables the backoff and reconnects immediately.",
     0, 60 * 60 * 1000, 100},
    {_RK_GLOBAL | _RK_MED, "reconnect.backoff.max.ms", _RK_C_INT,
     _RK(reconnect_backoff_max_ms),
     "The maximum time to wait before reconnecting to a broker "
     "after the connection has been closed.",
     0, 60 * 60 * 1000, 10 * 1000},
    {_RK_GLOBAL | _RK_HIGH, "statistics.interval.ms", _RK_C_INT,
     _RK(stats_interval_ms),
     "librdkafka statistics emit interval. The application also needs to "
     "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
     "The granularity is 1000ms. A value of 0 disables statistics.",
     0, 86400 * 1000, 0},
    {_RK_GLOBAL, "enabled_events", _RK_C_INT, _RK(enabled_events),
     "See `rd_kafka_conf_set_events()`", 0, 0x7fffffff, 0},
    {_RK_GLOBAL, "error_cb", _RK_C_PTR, _RK(error_cb),
     "Error callback (set with rd_kafka_conf_set_error_cb())"},
    {_RK_GLOBAL, "throttle_cb", _RK_C_PTR, _RK(throttle_cb),
     "Throttle callback (set with rd_kafka_conf_set_throttle_cb())"},
    {_RK_GLOBAL, "stats_cb", _RK_C_PTR, _RK(stats_cb),
     "Statistics callback (set with rd_kafka_conf_set_stats_cb())"},
    {_RK_GLOBAL, "log_cb", _RK_C_PTR, _RK(log_cb),
     "Log callback (set with rd_kafka_conf_set_log_cb())",
     .pdef = rd_kafka_log_print},
    {_RK_GLOBAL, "log_level", _RK_C_INT, _RK(log_level),
     "Logging level (syslog(3) levels)", 0, 7, 6},
    {_RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
     "Disable spontaneous log_cb from internal librdkafka "
     "threads, instead enqueue log messages on queue set with "
     "`rd_kafka_set_log_queue()` and serve log callbacks or "
     "events through the standard poll APIs. "
     "**NOTE**: Log messages will linger in a temporary queue "
     "until the log queue has been set.",
     0, 1, 0},
    {_RK_GLOBAL, "log.thread.name", _RK_C_BOOL, _RK(log_thread_name),
     "Print internal thread name in log messages "
     "(useful for debugging librdkafka internals)",
     0, 1, 1},
    {_RK_GLOBAL, "enable.random.seed", _RK_C_BOOL, _RK(enable_random_seed),
     "If enabled librdkafka will initialize the PRNG "
     "with srand(current_time.milliseconds) on the first invocation of "
     "rd_kafka_new() (required only if rand_r() is not available on your "
     "platform). "
     "If disabled the application must call srand() prior to calling "
     "rd_kafka_new().",
     0, 1, 1},
    {_RK_GLOBAL, "log.connection.close", _RK_C_BOOL, _RK(log_connection_close),
     "Log broker disconnects. "
     "It might be useful to turn this off when interacting with "
     "0.9 brokers with an aggressive `connections.max.idle.ms` value.",
     0, 1, 1},
    {_RK_GLOBAL, "background_event_cb", _RK_C_PTR, _RK(background_event_cb),
     "Background queue event callback "
     "(set with rd_kafka_conf_set_background_event_cb())"},
    {_RK_GLOBAL, "socket_cb", _RK_C_PTR, _RK(socket_cb),
     "Socket creation callback to provide race-free CLOEXEC",
     .pdef =
#ifdef __linux__
         rd_kafka_socket_cb_linux
#else
          rd_kafka_socket_cb_generic
#endif
    },
    {
        _RK_GLOBAL,
        "connect_cb",
        _RK_C_PTR,
        _RK(connect_cb),
        "Socket connect callback",
    },
    {
        _RK_GLOBAL,
        "closesocket_cb",
        _RK_C_PTR,
        _RK(closesocket_cb),
        "Socket close callback",
    },
    {_RK_GLOBAL, "open_cb", _RK_C_PTR, _RK(open_cb),
     "File open callback to provide race-free CLOEXEC",
     .pdef =
#ifdef __linux__
         rd_kafka_open_cb_linux
#else
          rd_kafka_open_cb_generic
#endif
    },
    {_RK_GLOBAL, "resolve_cb", _RK_C_PTR, _RK(resolve_cb),
     "Address resolution callback (set with rd_kafka_conf_set_resolve_cb())."},
    {_RK_GLOBAL, "opaque", _RK_C_PTR, _RK(opaque),
     "Application opaque (set with rd_kafka_conf_set_opaque())"},
    {_RK_GLOBAL, "default_topic_conf", _RK_C_PTR, _RK(topic_conf),
     "Default topic configuration for automatically subscribed topics"},
    {_RK_GLOBAL, "internal.termination.signal", _RK_C_INT, _RK(term_sig),
     "Signal that librdkafka will use to quickly terminate on "
     "rd_kafka_destroy(). If this signal is not set then there will be a "
     "delay before rd_kafka_wait_destroyed() returns true "
     "as internal threads are timing out their system calls. "
     "If this signal is set however the delay will be minimal. "
     "The application should mask this signal as an internal "
     "signal handler is installed.",
     0, 128, 0},
    {_RK_GLOBAL | _RK_HIGH, "api.version.request", _RK_C_BOOL,
     _RK(api_version_request),
     "Request broker's supported API versions to adjust functionality to "
     "available protocol features. If set to false, or the "
     "ApiVersionRequest fails, the fallback version "
     "`broker.version.fallback` will be used. "
     "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
     "supported by (an older) broker the `broker.version.fallback` fallback is "
     "used.",
     0, 1, 1},
    {_RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
     _RK(api_version_request_timeout_ms),
     "Timeout for broker API version requests.", 1, 5 * 60 * 1000, 10 * 1000},
    {_RK_GLOBAL | _RK_MED, "api.version.fallback.ms", _RK_C_INT,
     _RK(api_version_fallback_ms),
     "Dictates how long the `broker.version.fallback` fallback is used "
     "in the case the ApiVersionRequest fails. "
     "**NOTE**: The ApiVersionRequest is only issued when a new connection "
     "to the broker is made (such as after an upgrade).",
     0, 86400 * 7 * 1000, 0},

    {_RK_GLOBAL | _RK_MED, "broker.version.fallback", _RK_C_STR,
     _RK(broker_version_fallback),
     "Older broker versions (before 0.10.0) provide no way for a client to "
     "query "
     "for supported protocol features "
     "(ApiVersionRequest, see `api.version.request`) making it impossible "
     "for the client to know what features it may use. "
     "As a workaround a user may set this property to the expected broker "
     "version and the client will automatically adjust its feature set "
     "accordingly if the ApiVersionRequest fails (or is disabled). "
     "The fallback broker version will be used for `api.version.fallback.ms`. "
     "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. "
     "Any other value >= 0.10, such as 0.10.2.1, "
     "enables ApiVersionRequests.",
     .sdef = "0.10.0", .validate = rd_kafka_conf_validate_broker_version},
    {_RK_GLOBAL, "allow.auto.create.topics", _RK_C_BOOL,
     _RK(allow_auto_create_topics),
     "Allow automatic topic creation on the broker when subscribing to "
     "or assigning non-existent topics. "
     "The broker must also be configured with "
     "`auto.create.topics.enable=true` for this configuration to "
     "take effect. "
     "Note: the default value (true) for the producer is "
     "different from the default value (false) for the consumer. "
     "Further, the consumer default value is different from the Java "
     "consumer (true), and this property is not supported by the Java "
     "producer. Requires broker version >= 0.11.0.0, for older broker "
     "versions only the broker configuration applies.",
     0, 1, 0},

    /* Security related global properties */
    {_RK_GLOBAL | _RK_HIGH, "security.protocol", _RK_C_S2I,
     _RK(security_protocol), "Protocol used to communicate with brokers.",
     .vdef = RD_KAFKA_PROTO_PLAINTEXT,
     .s2i  = {{RD_KAFKA_PROTO_PLAINTEXT, "plaintext"},
             {RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL},
             {RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"},
             {RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL},
             {0, NULL}}},

    {_RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR, _RK(ssl.cipher_suites),
     "A cipher suite is a named combination of authentication, "
     "encryption, MAC and key exchange algorithm used to negotiate the "
     "security settings for a network connection using TLS or SSL network "
     "protocol. See manual page for `ciphers(1)` and "
     "`SSL_CTX_set_cipher_list(3).",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.curves.list", _RK_C_STR, _RK(ssl.curves_list),
     "The supported-curves extension in the TLS ClientHello message specifies "
     "the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client "
     "is willing to have the server use. See manual page for "
     "`SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required.",
     _UNSUPPORTED_OPENSSL_1_0_2},
    {_RK_GLOBAL, "ssl.sigalgs.list", _RK_C_STR, _RK(ssl.sigalgs_list),
     "The client uses the TLS ClientHello signature_algorithms extension "
     "to indicate to the server which signature/hash algorithm pairs "
     "may be used in digital signatures. See manual page for "
     "`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required.",
     _UNSUPPORTED_OPENSSL_1_0_2},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.location", _RK_C_STR,
     _RK(ssl.key_location),
     "Path to client's private key (PEM) used for authentication.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.password", _RK_C_STR,
     _RK(ssl.key_password),
     "Private key passphrase (for use with `ssl.key.location` "
     "and `set_ssl_cert()`)",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.pem", _RK_C_STR, _RK(ssl.key_pem),
     "Client's private key string (PEM format) used for authentication.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl_key", _RK_C_INTERNAL, _RK(ssl.key),
     "Client's private key as set by rd_kafka_conf_set_ssl_cert()",
     .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.certificate.location", _RK_C_STR, _RK(ssl.cert_location),
     "Path to client's public key (PEM) used for authentication.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.certificate.pem", _RK_C_STR, _RK(ssl.cert_pem),
     "Client's public key string (PEM format) used for authentication.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl_certificate", _RK_C_INTERNAL, _RK(ssl.key),
     "Client's public key as set by rd_kafka_conf_set_ssl_cert()",
     .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
     _UNSUPPORTED_SSL},

    {_RK_GLOBAL, "ssl.ca.location", _RK_C_STR, _RK(ssl.ca_location),
     "File or directory path to CA certificate(s) for verifying "
     "the broker's key. "
     "Defaults: "
     "On Windows the system's CA certificates are automatically looked "
     "up in the Windows Root certificate store. "
     "On Mac OSX this configuration defaults to `probe`. "
     "It is recommended to install openssl using Homebrew, "
     "to provide CA certificates. "
     "On Linux install the distribution's ca-certificates package. "
     "If OpenSSL is statically linked or `ssl.ca.location` is set to "
     "`probe` a list of standard paths will be probed and the first one "
     "found will be used as the default CA certificate location path. "
     "If OpenSSL is dynamically linked the OpenSSL library's default "
     "path will be used (see `OPENSSLDIR` in `openssl version -a`).",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl.ca.pem", _RK_C_STR, _RK(ssl.ca_pem),
     "CA certificate string (PEM format) for verifying the broker's key.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL, _RK(ssl.ca),
     "CA certificate as set by rd_kafka_conf_set_ssl_cert()",
     .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.ca.certificate.stores", _RK_C_STR,
     _RK(ssl.ca_cert_stores),
     "Comma-separated list of Windows Certificate stores to load "
     "CA certificates from. Certificates will be loaded in the same "
     "order as stores are specified. If no certificates can be loaded "
     "from any of the specified stores an error is logged and the "
     "OpenSSL library's default CA location is used instead. "
     "Store names are typically one or more of: MY, Root, Trust, CA.",
     .sdef = "Root",
#if !defined(_WIN32)
     .unsupported = "configuration only valid on Windows"
#endif
    },

    {_RK_GLOBAL, "ssl.crl.location", _RK_C_STR, _RK(ssl.crl_location),
     "Path to CRL for verifying broker's certificate validity.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.keystore.location", _RK_C_STR, _RK(ssl.keystore_location),
     "Path to client's keystore (PKCS#12) used for authentication.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL | _RK_SENSITIVE, "ssl.keystore.password", _RK_C_STR,
     _RK(ssl.keystore_password), "Client's keystore (PKCS#12) password.",
     _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.providers", _RK_C_STR, _RK(ssl.providers),
     "Comma-separated list of OpenSSL 3.0.x implementation providers. "
     "E.g., \"default,legacy\".",
     _UNSUPPORTED_SSL_3},
    {_RK_GLOBAL | _RK_DEPRECATED, "ssl.engine.location", _RK_C_STR,
     _RK(ssl.engine_location),
     "Path to OpenSSL engine library. OpenSSL >= 1.1.x required. "
     "DEPRECATED: OpenSSL engine support is deprecated and should be "
     "replaced by OpenSSL 3 providers.",
     _UNSUPPORTED_SSL_ENGINE},
    {_RK_GLOBAL, "ssl.engine.id", _RK_C_STR, _RK(ssl.engine_id),
     "OpenSSL engine id is the name used for loading engine.",
     .sdef = "dynamic", _UNSUPPORTED_SSL_ENGINE},
    {_RK_GLOBAL, "ssl_engine_callback_data", _RK_C_PTR,
     _RK(ssl.engine_callback_data),
     "OpenSSL engine callback data (set "
     "with rd_kafka_conf_set_engine_callback_data()).",
     _UNSUPPORTED_SSL_ENGINE},
    {_RK_GLOBAL, "enable.ssl.certificate.verification", _RK_C_BOOL,
     _RK(ssl.enable_verify),
     "Enable OpenSSL's builtin broker (server) certificate verification. "
     "This verification can be extended by the application by "
     "implementing a certificate_verify_cb.",
     0, 1, 1, _UNSUPPORTED_SSL},
    {_RK_GLOBAL, "ssl.endpoint.identification.algorithm", _RK_C_S2I,
     _RK(ssl.endpoint_identification),
     "Endpoint identification algorithm to validate broker "
     "hostname using broker certificate. "
     "https - Server (broker) hostname verification as "
     "specified in RFC2818. "
     "none - No endpoint verification. "
     "OpenSSL >= 1.0.2 required.",
     .vdef = RD_KAFKA_SSL_ENDPOINT_ID_HTTPS,
     .s2i  = {{RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none"},
             {RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}},
     _UNSUPPORTED_OPENSSL_1_0_2},
    {_RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR,
     _RK(ssl.cert_verify_cb),
     "Callback to verify the broker certificate chain.", _UNSUPPORTED_SSL},

    /* Point user in the right direction if they try to apply
     * Java client SSL / JAAS properties. */
    {_RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID, _RK(dummy),
     "Java TrustStores are not supported, use `ssl.ca.location` "
     "and a certificate file instead. "
     "See "
     "https://github.com/confluentinc/librdkafka/"
     "wiki/Using-SSL-with-librdkafka "
     "for more information."},
    {_RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID, _RK(dummy),
     "Java JAAS configuration is not supported, see "
     "https://github.com/confluentinc/librdkafka/"
     "wiki/Using-SASL-with-librdkafka "
     "for more information."},

    {_RK_GLOBAL | _RK_HIGH, "sasl.mechanisms", _RK_C_STR, _RK(sasl.mechanisms),
     "SASL mechanism to use for authentication. "
     "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. "
     "**NOTE**: Despite the name only one mechanism must be configured.",
     .sdef = "GSSAPI", .validate = rd_kafka_conf_validate_single},
    {_RK_GLOBAL | _RK_HIGH, "sasl.mechanism", _RK_C_ALIAS,
     .sdef = "sasl.mechanisms"},
    {_RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
     _RK(sasl.service_name),
     "Kerberos principal name that Kafka runs as, "
     "not including /hostname@REALM",
     .sdef = "kafka"},
    {_RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR, _RK(sasl.principal),
     "This client's Kerberos principal name. "
     "(Not supported on Windows, will use the logon user's principal).",
     .sdef = "kafkaclient"},
    {_RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR, _RK(sasl.kinit_cmd),
     "Shell command to refresh or acquire the client's Kerberos ticket. "
     "This command is executed on client creation and every "
     "sasl.kerberos.min.time.before.relogin (0=disable). "
     "%{config.prop.name} is replaced by corresponding config "
     "object value.",
     .sdef =
         /* First attempt to refresh, else acquire. */
     "kinit -R -t \"%{sasl.kerberos.keytab}\" "
     "-k %{sasl.kerberos.principal} || "
     "kinit -t \"%{sasl.kerberos.keytab}\" -k %{sasl.kerberos.principal}",
     _UNSUPPORTED_WIN32_GSSAPI},
    {_RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR, _RK(sasl.keytab),
     "Path to Kerberos keytab file. "
     "This configuration property is only used as a variable in "
     "`sasl.kerberos.kinit.cmd` as "
     "` ... -t \"%{sasl.kerberos.keytab}\"`.",
     _UNSUPPORTED_WIN32_GSSAPI},
    {_RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
     _RK(sasl.relogin_min_time),
     "Minimum time in milliseconds between key refresh attempts. "
     "Disable automatic key refresh by setting this property to 0.",
     0, 86400 * 1000, 60 * 1000, _UNSUPPORTED_WIN32_GSSAPI},
    {_RK_GLOBAL | _RK_HIGH | _RK_SENSITIVE, "sasl.username", _RK_C_STR,
     _RK(sasl.username),
     "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms"},
    {_RK_GLOBAL | _RK_HIGH | _RK_SENSITIVE, "sasl.password", _RK_C_STR,
     _RK(sasl.password),
     "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism"},
    {_RK_GLOBAL | _RK_SENSITIVE, "sasl.oauthbearer.config", _RK_C_STR,
     _RK(sasl.oauthbearer_config),
     "SASL/OAUTHBEARER configuration. The format is "
     "implementation-dependent and must be parsed accordingly. The "
     "default unsecured token implementation (see "
     "https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes "
     "space-separated name=value pairs with valid names including "
     "principalClaimName, principal, scopeClaimName, scope, and "
     "lifeSeconds. The default value for principalClaimName is \"sub\", "
     "the default value for scopeClaimName is \"scope\", and the default "
     "value for lifeSeconds is 3600. The scope value is CSV format with "
     "the default value being no/empty scope. For example: "
     "`principalClaimName=azp principal=admin scopeClaimName=roles "
     "scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions "
     "can be communicated to the broker via "
     "`extension_NAME=value`. For example: "
     "`principal=admin extension_traceId=123`",
     _UNSUPPORTED_OAUTHBEARER},
    {_RK_GLOBAL, "enable.sasl.oauthbearer.unsecure.jwt", _RK_C_BOOL,
     _RK(sasl.enable_oauthbearer_unsecure_jwt),
     "Enable the builtin unsecure JWT OAUTHBEARER token handler "
     "if no oauthbearer_refresh_cb has been set. "
     "This builtin handler should only be used for development "
     "or testing, and not in production.",
     0, 1, 0, _UNSUPPORTED_OAUTHBEARER},
    {_RK_GLOBAL, "oauthbearer_token_refresh_cb", _RK_C_PTR,
     _RK(sasl.oauthbearer.token_refresh_cb),
     "SASL/OAUTHBEARER token refresh callback (set with "
     "rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by "
     "rd_kafka_poll(), et.al. "
     "This callback will be triggered when it is time to refresh "
     "the client's OAUTHBEARER token. "
     "Also see `rd_kafka_conf_enable_sasl_queue()`.",
     _UNSUPPORTED_OAUTHBEARER},
    {
        _RK_GLOBAL | _RK_HIDDEN,
        "enable_sasl_queue",
        _RK_C_BOOL,
        _RK(sasl.enable_callback_queue),
        "Enable the SASL callback queue "
        "(set with rd_kafka_conf_enable_sasl_queue()).",
        0,
        1,
        0,
    },
    {_RK_GLOBAL, "sasl.oauthbearer.method", _RK_C_S2I,
     _RK(sasl.oauthbearer.method),
     "Set to \"default\" or \"oidc\" to control which login method "
     "to be used. If set to \"oidc\", the following properties must also be "
     "be specified: "
     "`sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`, "
     "and `sasl.oauthbearer.token.endpoint.url`.",
     .vdef = RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
     .s2i  = {{RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT, "default"},
             {RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}},
     _UNSUPPORTED_OIDC},
    {_RK_GLOBAL, "sasl.oauthbearer.client.id", _RK_C_STR,
     _RK(sasl.oauthbearer.client_id),
     "Public identifier for the application. "
     "Must be unique across all clients that the "
     "authorization server handles. "
     "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
     _UNSUPPORTED_OIDC},
    {_RK_GLOBAL, "sasl.oauthbearer.client.secret", _RK_C_STR,
     _RK(sasl.oauthbearer.client_secret),
     "Client secret only known to the application and the "
     "authorization server. This should be a sufficiently random string "
     "that is not guessable. "
     "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
     _UNSUPPORTED_OIDC},
    {_RK_GLOBAL, "sasl.oauthbearer.scope", _RK_C_STR,
     _RK(sasl.oauthbearer.scope),
     "Client use this to specify the scope of the access request to the "
     "broker. "
     "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
     _UNSUPPORTED_OIDC},
    {_RK_GLOBAL, "sasl.oauthbearer.extensions", _RK_C_STR,
     _RK(sasl.oauthbearer.extensions_str),
     "Allow additional information to be provided to the broker. "
     "Comma-separated list of key=value pairs. "
     "E.g., \"supportFeatureX=true,organizationId=sales-emea\"."
     "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
     _UNSUPPORTED_OIDC},
    {_RK_GLOBAL, "sasl.oauthbearer.token.endpoint.url", _RK_C_STR,
     _RK(sasl.oauthbearer.token_endpoint_url),
     "OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. "
     "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
     _UNSUPPORTED_OIDC},

    /* Plugins */
    {_RK_GLOBAL, "plugin.library.paths", _RK_C_STR, _RK(plugin_paths),
     "List of plugin libraries to load (; separated). "
     "The library search path is platform dependent (see dlopen(3) for "
     "Unix and LoadLibrary() for Windows). If no filename extension is "
     "specified the platform-specific extension (such as .dll or .so) "
     "will be appended automatically.",
#if WITH_PLUGINS
     .set = rd_kafka_plugins_conf_set
#else
          .unsupported = "libdl/dlopen(3) not available at build time"
#endif
    },

    /* Interceptors are added through specific API and not exposed
     * as configuration properties.
     * The interceptor property must be defined after plugin.library.paths
     * so that the plugin libraries are properly loaded before
     * interceptors are configured when duplicating configuration objects.*/
    {_RK_GLOBAL, "interceptors", _RK_C_INTERNAL, _RK(interceptors),
     "Interceptors added through rd_kafka_conf_interceptor_add_..() "
     "and any configuration handled by interceptors.",
     .ctor = rd_kafka_conf_interceptor_ctor,
     .dtor = rd_kafka_conf_interceptor_dtor,
     .copy = rd_kafka_conf_interceptor_copy},

    /* Test mocks. */
    {_RK_GLOBAL | _RK_HIDDEN, "test.mock.num.brokers", _RK_C_INT,
     _RK(mock.broker_cnt),
     "Number of mock brokers to create. "
     "This will automatically overwrite `bootstrap.servers` with the "
     "mock broker list.",
     0, 10000, 0},
    {_RK_GLOBAL | _RK_HIDDEN, "test.mock.broker.rtt", _RK_C_INT,
     _RK(mock.broker_rtt), "Simulated mock broker latency in milliseconds.", 0,
     60 * 60 * 1000 /*1h*/, 0},

    /* Unit test interfaces.
     * These are not part of the public API and may change at any time.
     * Only to be used by the librdkafka tests. */
    {_RK_GLOBAL | _RK_HIDDEN, "ut_handle_ProduceResponse", _RK_C_PTR,
     _RK(ut.handle_ProduceResponse),
     "ProduceResponse handler: "
     "rd_kafka_resp_err_t (*cb) (rd_kafka_t *rk, "
     "int32_t brokerid, uint64_t msgid, rd_kafka_resp_err_t err)"},

    /* Global consumer group properties */
    {_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.id", _RK_C_STR, _RK(group_id_str),
     "Client group id string. All clients sharing the same group.id "
     "belong to the same group."},
    {_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.instance.id", _RK_C_STR,
     _RK(group_instance_id),
     "Enable static group membership. "
     "Static group members are able to leave and rejoin a group "
     "within the configured `session.timeout.ms` without prompting a "
     "group rebalance. This should be used in combination with a larger "
     "`session.timeout.ms` to avoid group rebalances caused by transient "
     "unavailability (e.g. process restarts). "
     "Requires broker version >= 2.3.0."},
    {_RK_GLOBAL | _RK_CGRP | _RK_MED, "partition.assignment.strategy",
     _RK_C_STR, _RK(partition_assignment_strategy),
     "The name of one or more partition assignment strategies. The "
     "elected group leader will use a strategy supported by all "
     "members of the group to assign partitions to group members. If "
     "there is more than one eligible strategy, preference is "
     "determined by the order of this list (strategies earlier in the "
     "list have higher priority). "
     "Cooperative and non-cooperative (eager) strategies must not be "
     "mixed. "
     "Available strategies: range, roundrobin, cooperative-sticky.",
     .sdef = "range,roundrobin"},
    {_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "session.timeout.ms", _RK_C_INT,
     _RK(group_session_timeout_ms),
     "Client group session and failure detection timeout. "
     "The consumer sends periodic heartbeats (heartbeat.interval.ms) "
     "to indicate its liveness to the broker. If no hearts are "
     "received by the broker for a group member within the "
     "session timeout, the broker will remove the consumer from "
     "the group and trigger a rebalance. "
     "The allowed range is configured with the **broker** configuration "
     "properties `group.min.session.timeout.ms` and "
     "`group.max.session.timeout.ms`. "
     "Also see `max.poll.interval.ms`.",
     1, 3600 * 1000, 45 * 1000},
    {_RK_GLOBAL | _RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
     _RK(group_heartbeat_intvl_ms),
     "Group session keepalive heartbeat interval.", 1, 3600 * 1000, 3 * 1000},
    {_RK_GLOBAL | _RK_CGRP, "group.protocol.type", _RK_C_KSTR,
     _RK(group_protocol_type),
     "Group protocol type. NOTE: Currently, the only supported group "
     "protocol type is `consumer`.",
     .sdef = "consumer"},
    {_RK_GLOBAL | _RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
     _RK(coord_query_intvl_ms),
     "How often to query for the current client group coordinator. "
     "If the currently assigned coordinator is down the configured "
     "query interval will be divided by ten to more quickly recover "
     "in case of coordinator reassignment.",
     1, 3600 * 1000, 10 * 60 * 1000},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "max.poll.interval.ms", _RK_C_INT,
     _RK(max_poll_interval_ms),
     "Maximum allowed time between calls to consume messages "
     "(e.g., rd_kafka_consumer_poll()) for high-level consumers. "
     "If this interval is exceeded the consumer is considered failed "
     "and the group will rebalance in order to reassign the "
     "partitions to another consumer group member. "
     "Warning: Offset commits may be not possible at this point. "
     "Note: It is recommended to set `enable.auto.offset.store=false` "
     "for long-time processing applications and then explicitly store "
     "offsets (using offsets_store()) *after* message processing, to "
     "make sure offsets are not auto-committed prior to processing "
     "has finished. "
     "The interval is checked two times per second. "
     "See KIP-62 for more information.",
     1, 86400 * 1000, 300000},

    /* Global consumer properties */
    {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "enable.auto.commit", _RK_C_BOOL,
     _RK(enable_auto_commit),
     "Automatically and periodically commit offsets in the background. "
     "Note: setting this to false does not prevent the consumer from "
     "fetching previously committed start offsets. To circumvent this "
     "behaviour set specific start offsets per partition in the call "
     "to assign().",
     0, 1, 1},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "auto.commit.interval.ms", _RK_C_INT,
     _RK(auto_commit_interval_ms),
     "The frequency in milliseconds that the consumer offsets "
     "are committed (written) to offset storage. (0 = disable). "
     "This setting is used by the high-level consumer.",
     0, 86400 * 1000, 5 * 1000},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "enable.auto.offset.store",
     _RK_C_BOOL, _RK(enable_auto_offset_store),
     "Automatically store offset of last message provided to "
     "application. "
     "The offset store is an in-memory store of the next offset to "
     "(auto-)commit for each partition.",
     0, 1, 1},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "queued.min.messages", _RK_C_INT,
     _RK(queued_min_msgs),
     "Minimum number of messages per topic+partition "
     "librdkafka tries to maintain in the local consumer queue.",
     1, 10000000, 100000},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "queued.max.messages.kbytes",
     _RK_C_INT, _RK(queued_max_msg_kbytes),
     "Maximum number of kilobytes of queued pre-fetched messages "
     "in the local consumer queue. "
     "If using the high-level consumer this setting applies to the "
     "single consumer queue, regardless of the number of partitions. "
     "When using the legacy simple consumer or when separate "
     "partition queues are used this setting applies per partition. "
     "This value may be overshot by fetch.message.max.bytes. "
     "This property has higher priority than queued.min.messages.",
     1, INT_MAX / 1024, 0x10000 /*64MB*/},
    {_RK_GLOBAL | _RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
     _RK(fetch_wait_max_ms),
     "Maximum time the broker may wait to fill the Fetch response "
     "with fetch.min.bytes of messages.",
     0, 300 * 1000, 500},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.queue.backoff.ms", _RK_C_INT,
     _RK(fetch_queue_backoff_ms),
     "How long to postpone the next fetch request for a "
     "topic+partition in case the current fetch queue thresholds "
     "(queued.min.messages or queued.max.messages.kbytes) have "
     "been exceded. "
     "This property may need to be decreased if the queue thresholds are "
     "set low and the application is experiencing long (~1s) delays "
     "between messages. Low values may increase CPU utilization.",
     0, 300 * 1000, 1000},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.message.max.bytes", _RK_C_INT,
     _RK(fetch_msg_max_bytes),
     "Initial maximum number of bytes per topic+partition to request when "
     "fetching messages from the broker. "
     "If the client encounters a message larger than this value "
     "it will gradually try to increase it until the "
     "entire message can be fetched.",
     1, 1000000000, 1024 * 1024},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "max.partition.fetch.bytes",
     _RK_C_ALIAS, .sdef = "fetch.message.max.bytes"},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.max.bytes", _RK_C_INT,
     _RK(fetch_max_bytes),
     "Maximum amount of data the broker shall return for a Fetch request. "
     "Messages are fetched in batches by the consumer and if the first "
     "message batch in the first non-empty partition of the Fetch request "
     "is larger than this value, then the message batch will still be "
     "returned to ensure the consumer can make progress. "
     "The maximum message batch size accepted by the broker is defined "
     "via `message.max.bytes` (broker config) or "
     "`max.message.bytes` (broker topic config). "
     "`fetch.max.bytes` is automatically adjusted upwards to be "
     "at least `message.max.bytes` (consumer config).",
     0, INT_MAX - 512, 50 * 1024 * 1024 /* 50MB */},
    {_RK_GLOBAL | _RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
     _RK(fetch_min_bytes),
     "Minimum number of bytes the broker responds with. "
     "If fetch.wait.max.ms expires the accumulated data will "
     "be sent to the client regardless of this setting.",
     1, 100000000, 1},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.error.backoff.ms", _RK_C_INT,
     _RK(fetch_error_backoff_ms),
     "How long to postpone the next fetch request for a "
     "topic+partition in case of a fetch error.",
     0, 300 * 1000, 500},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.method",
     _RK_C_S2I, _RK(offset_store_method),
     "Offset commit store method: "
     "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
     "'broker' - broker commit store "
     "(requires Apache Kafka 0.8.2 or later on the broker).",
     .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
     .s2i  = {{RD_KAFKA_OFFSET_METHOD_NONE, "none"},
             {RD_KAFKA_OFFSET_METHOD_FILE, "file"},
             {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "isolation.level", _RK_C_S2I,
     _RK(isolation_level),
     "Controls how to read messages written transactionally: "
     "`read_committed` - only return transactional messages which have "
     "been committed. `read_uncommitted` - return all messages, even "
     "transactional messages which have been aborted.",
     .vdef = RD_KAFKA_READ_COMMITTED,
     .s2i  = {{RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted"},
             {RD_KAFKA_READ_COMMITTED, "read_committed"}}},
    {_RK_GLOBAL | _RK_CONSUMER, "consume_cb", _RK_C_PTR, _RK(consume_cb),
     "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
    {_RK_GLOBAL | _RK_CONSUMER, "rebalance_cb", _RK_C_PTR, _RK(rebalance_cb),
     "Called after consumer group has been rebalanced "
     "(set with rd_kafka_conf_set_rebalance_cb())"},
    {_RK_GLOBAL | _RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
     _RK(offset_commit_cb),
     "Offset commit result propagation callback. "
     "(set with rd_kafka_conf_set_offset_commit_cb())"},
    {_RK_GLOBAL | _RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
     _RK(enable_partition_eof),
     "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
     "consumer reaches the end of a partition.",
     0, 1, 0},
    {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "check.crcs", _RK_C_BOOL,
     _RK(check_crcs),
     "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
     "on-disk corruption to the messages occurred. This check comes "
     "at slightly increased CPU usage.",
     0, 1, 0},
    {_RK_GLOBAL, "client.rack", _RK_C_KSTR, _RK(client_rack),
     "A rack identifier for this client. This can be any string value "
     "which indicates where this client is physically located. It "
     "corresponds with the broker config `broker.rack`.",
     .sdef = ""},

    /* Global producer properties */
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "transactional.id", _RK_C_STR,
     _RK(eos.transactional_id),
     "Enables the transactional producer. "
     "The transactional.id is used to identify the same transactional "
     "producer instance across process restarts. "
     "It allows the producer to guarantee that transactions corresponding "
     "to earlier instances of the same producer have been finalized "
     "prior to starting any new transactions, and that any "
     "zombie instances are fenced off. "
     "If no transactional.id is provided, then the producer is limited "
     "to idempotent delivery (if enable.idempotence is set). "
     "Requires broker version >= 0.11.0."},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "transaction.timeout.ms", _RK_C_INT,
     _RK(eos.transaction_timeout_ms),
     "The maximum amount of time in milliseconds that the transaction "
     "coordinator will wait for a transaction status update from the "
     "producer before proactively aborting the ongoing transaction. "
     "If this value is larger than the `transaction.max.timeout.ms` "
     "setting in the broker, the init_transactions() call will fail with "
     "ERR_INVALID_TRANSACTION_TIMEOUT. "
     "The transaction timeout automatically adjusts "
     "`message.timeout.ms` and `socket.timeout.ms`, unless explicitly "
     "configured in which case they must not exceed the "
     "transaction timeout (`socket.timeout.ms` must be at least 100ms "
     "lower than `transaction.timeout.ms`). "
     "This is also the default timeout value if no timeout (-1) is "
     "supplied to the transactional API methods.",
     1000, INT_MAX, 60000},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "enable.idempotence", _RK_C_BOOL,
     _RK(eos.idempotence),
     "When set to `true`, the producer will ensure that messages are "
     "successfully produced exactly once and in the original produce "
     "order. "
     "The following configuration properties are adjusted automatically "
     "(if not modified by the user) when idempotence is enabled: "
     "`max.in.flight.requests.per.connection=" RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
     "` (must be less than or "
     "equal to " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "), `retries=INT32_MAX` "
     "(must be greater than 0), `acks=all`, `queuing.strategy=fifo`. "
     "Producer instantation will fail if user-supplied configuration "
     "is incompatible.",
     0, 1, 0},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_EXPERIMENTAL, "enable.gapless.guarantee",
     _RK_C_BOOL, _RK(eos.gapless),
     "When set to `true`, any error that could result in a gap "
     "in the produced message series when a batch of messages fails, "
     "will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop "
     "the producer. "
     "Messages failing due to `message.timeout.ms` are not covered "
     "by this guarantee. "
     "Requires `enable.idempotence=true`.",
     0, 1, 0},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.messages",
     _RK_C_INT, _RK(queue_buffering_max_msgs),
     "Maximum number of messages allowed on the producer queue. "
     "This queue is shared by all topics and partitions. A value of 0 disables "
     "this limit.",
     0, INT_MAX, 100000},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.kbytes",
     _RK_C_INT, _RK(queue_buffering_max_kbytes),
     "Maximum total message size sum allowed on the producer queue. "
     "This queue is shared by all topics and partitions. "
     "This property has higher priority than queue.buffering.max.messages.",
     1, INT_MAX, 0x100000 /*1GB*/},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.ms", _RK_C_DBL,
     _RK(buffering_max_ms_dbl),
     "Delay in milliseconds to wait for messages in the producer queue "
     "to accumulate before constructing message batches (MessageSets) to "
     "transmit to brokers. "
     "A higher value allows larger and more effective "
     "(less overhead, improved compression) batches of messages to "
     "accumulate at the expense of increased message delivery latency.",
     .dmin = 0, .dmax = 900.0 * 1000.0, .ddef = 5.0},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "linger.ms", _RK_C_ALIAS,
     .sdef = "queue.buffering.max.ms"},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "message.send.max.retries",
     _RK_C_INT, _RK(max_retries),
     "How many times to retry sending a failing Message. "
     "**Note:** retrying may cause reordering unless "
     "`enable.idempotence` is set to true.",
     0, INT32_MAX, INT32_MAX},
    {_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
     .sdef = "message.send.max.retries"},

    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
     _RK(retry_backoff_ms),
     "The backoff time in milliseconds before retrying a protocol request, "
     "this is the first backoff time, "
     "and will be backed off exponentially until number of retries is "
     "exhausted, and it's capped by retry.backoff.max.ms.",
     1, 300 * 1000, 100},

    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.max.ms", _RK_C_INT,
     _RK(retry_backoff_max_ms),
     "The max backoff time in milliseconds before retrying a protocol request, "
     "this is the atmost backoff allowed for exponentially backed off "
     "requests.",
     1, 300 * 1000, 1000},

    {_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
     _RK_C_INT, _RK(queue_backpressure_thres),
     "The threshold of outstanding not yet transmitted broker requests "
     "needed to backpressure the producer's message accumulator. "
     "If the number of not yet transmitted requests equals or exceeds "
     "this number, produce request creation that would have otherwise "
     "been triggered (for example, in accordance with linger.ms) will be "
     "delayed. A lower number yields larger and more effective batches. "
     "A higher value can improve latency when using compression on slow "
     "machines.",
     1, 1000000, 1},

    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.codec", _RK_C_S2I,
     _RK(compression_codec),
     "compression codec to use for compressing message sets. "
     "This is the default value for all topics, may be overridden by "
     "the topic configuration property `compression.codec`. ",
     .vdef = RD_KAFKA_COMPRESSION_NONE,
     .s2i  = {{RD_KAFKA_COMPRESSION_NONE, "none"},
             {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
             {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
             {RD_KAFKA_COMPRESSION_LZ4, "lz4"},
             {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
             {0}}},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.type", _RK_C_ALIAS,
     .sdef = "compression.codec"},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.num.messages", _RK_C_INT,
     _RK(batch_num_messages),
     "Maximum number of messages batched in one MessageSet. "
     "The total MessageSet size is also limited by batch.size and "
     "message.max.bytes.",
     1, 1000000, 10000},
    {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.size", _RK_C_INT,
     _RK(batch_size),
     "Maximum size (in bytes) of all messages batched in one MessageSet, "
     "including protocol framing overhead. "
     "This limit is applied after the first message has been added "
     "to the batch, regardless of the first message's size, this is to "
     "ensure that messages that exceed batch.size are produced. "
     "The total MessageSet size is also limited by batch.num.messages and "
     "message.max.bytes.",
     1, INT_MAX, 1000000},
    {_RK_GLOBAL | _RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
     _RK(dr_err_only), "Only provide delivery reports for failed messages.", 0,
     1, 0},
    {_RK_GLOBAL | _RK_PRODUCER, "dr_cb", _RK_C_PTR, _RK(dr_cb),
     "Delivery report callback (set with rd_kafka_conf_set_dr_cb())"},
    {_RK_GLOBAL | _RK_PRODUCER, "dr_msg_cb", _RK_C_PTR, _RK(dr_msg_cb),
     "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())"},
    {_RK_GLOBAL | _RK_PRODUCER, "sticky.partitioning.linger.ms", _RK_C_INT,
     _RK(sticky_partition_linger_ms),
     "Delay in milliseconds to wait to assign new sticky partitions for "
     "each topic. "
     "By default, set to double the time of linger.ms. To disable sticky "
     "behavior, set to 0. "
     "This behavior affects messages with the key NULL in all cases, and "
     "messages with key lengths of zero when the consistent_random "
     "partitioner is in use. "
     "These messages would otherwise be assigned randomly. "
     "A higher value allows for more effective batching of these "
     "messages.",
     0, 900000, 10},
    {_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
     "Controls how the client uses DNS lookups. By default, when the lookup "
     "returns multiple IP addresses for a hostname, they will all be attempted "
     "for connection before the connection is considered failed. This applies "
     "to both bootstrap and advertised servers. If the value is set to "
     "`resolve_canonical_bootstrap_servers_only`, each entry will be resolved "
     "and expanded into a list of canonical names. NOTE: Default here is "
     "different from the Java client's default behavior, which connects only "
     "to the first IP address returned for a hostname. ",
     .vdef = RD_KAFKA_USE_ALL_DNS_IPS,
     .s2i  = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"},
             {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
              "resolve_canonical_bootstrap_servers_only"}}},


    /*
     * Topic properties
     */

    /* Topic producer properties */
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "request.required.acks", _RK_C_INT,
     _RKT(required_acks),
     "This field indicates the number of acknowledgements the leader "
     "broker must receive from ISR brokers before responding to the "
     "request: "
     "*0*=Broker does not send any response/ack to client, "
     "*-1* or *all*=Broker will block until message is committed by all "
     "in sync replicas (ISRs). If there are less than "
     "`min.insync.replicas` (broker configuration) in the ISR set the "
     "produce request will fail.",
     -1, 1000, -1,
     .s2i =
         {
             {-1, "all"},
         }},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "acks", _RK_C_ALIAS,
     .sdef = "request.required.acks"},

    {_RK_TOPIC | _RK_PRODUCER | _RK_MED, "request.timeout.ms", _RK_C_INT,
     _RKT(request_timeout_ms),
     "The ack timeout of the producer request in milliseconds. "
     "This value is only enforced by the broker and relies "
     "on `request.required.acks` being != 0.",
     1, 900 * 1000, 30 * 1000},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "message.timeout.ms", _RK_C_INT,
     _RKT(message_timeout_ms),
     "Local message timeout. "
     "This value is only enforced locally and limits the time a "
     "produced message waits for successful delivery. "
     "A time of 0 is infinite. "
     "This is the maximum time librdkafka may use to deliver a message "
     "(including retries). Delivery error occurs when either the retry "
     "count or the message timeout are exceeded. "
     "The message timeout is automatically adjusted to "
     "`transaction.timeout.ms` if `transactional.id` is configured.",
     0, INT32_MAX, 300 * 1000},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "delivery.timeout.ms", _RK_C_ALIAS,
     .sdef = "message.timeout.ms"},
    {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED | _RK_EXPERIMENTAL,
     "queuing.strategy", _RK_C_S2I, _RKT(queuing_strategy),
     "Producer queuing strategy. FIFO preserves produce ordering, "
     "while LIFO prioritizes new messages.",
     .vdef = 0,
     .s2i  = {{RD_KAFKA_QUEUE_FIFO, "fifo"}, {RD_KAFKA_QUEUE_LIFO, "lifo"}}},
    {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED, "produce.offset.report",
     _RK_C_BOOL, _RKT(produce_offset_report), "No longer used.", 0, 1, 0},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "partitioner", _RK_C_STR,
     _RKT(partitioner_str),
     "Partitioner: "
     "`random` - random distribution, "
     "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to "
     "single partition), "
     "`consistent_random` - CRC32 hash of key (Empty and NULL keys are "
     "randomly partitioned), "
     "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are "
     "mapped to single partition), "
     "`murmur2_random` - Java Producer compatible Murmur2 hash of key "
     "(NULL keys are randomly partitioned. This is functionally equivalent "
     "to the default partitioner in the Java Producer.), "
     "`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), "
     "`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly "
     "partitioned).",
     .sdef     = "consistent_random",
     .validate = rd_kafka_conf_validate_partitioner},
    {_RK_TOPIC | _RK_PRODUCER, "partitioner_cb", _RK_C_PTR, _RKT(partitioner),
     "Custom partitioner callback "
     "(set with rd_kafka_topic_conf_set_partitioner_cb())"},
    {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED | _RK_EXPERIMENTAL,
     "msg_order_cmp", _RK_C_PTR, _RKT(msg_order_cmp),
     "Message queue ordering comparator "
     "(set with rd_kafka_topic_conf_set_msg_order_cmp()). "
     "Also see `queuing.strategy`."},
    {_RK_TOPIC, "opaque", _RK_C_PTR, _RKT(opaque),
     "Application opaque (set with rd_kafka_topic_conf_set_opaque())"},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.codec", _RK_C_S2I,
     _RKT(compression_codec),
     "Compression codec to use for compressing message sets. "
     "inherit = inherit global compression.codec configuration.",
     .vdef = RD_KAFKA_COMPRESSION_INHERIT,
     .s2i  = {{RD_KAFKA_COMPRESSION_NONE, "none"},
             {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
             {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
             {RD_KAFKA_COMPRESSION_LZ4, "lz4"},
             {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
             {RD_KAFKA_COMPRESSION_INHERIT, "inherit"},
             {0}}},
    {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.type", _RK_C_ALIAS,
     .sdef = "compression.codec"},
    {_RK_TOPIC | _RK_PRODUCER | _RK_MED, "compression.level", _RK_C_INT,
     _RKT(compression_level),
     "Compression level parameter for algorithm selected by configuration "
     "property `compression.codec`. Higher values will result in better "
     "compression at the cost of more CPU usage. Usable range is "
     "algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; "
     "-1 = codec-dependent default compression level.",
     RD_KAFKA_COMPLEVEL_MIN, RD_KAFKA_COMPLEVEL_MAX,
     RD_KAFKA_COMPLEVEL_DEFAULT},


    /* Topic consumer properties */
    {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "auto.commit.enable",
     _RK_C_BOOL, _RKT(auto_commit),
     "[**LEGACY PROPERTY:** This property is used by the simple legacy "
     "consumer only. When using the high-level KafkaConsumer, the global "
     "`enable.auto.commit` property must be used instead]. "
     "If true, periodically commit offset of the last message handed "
     "to the application. This committed offset will be used when the "
     "process restarts to pick up where it left off. "
     "If false, the application will have to call "
     "`rd_kafka_offset_store()` to store an offset (optional). "
     "Offsets will be written to broker or local file according to "
     "offset.store.method.",
     0, 1, 1},
    {_RK_TOPIC | _RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
     .sdef = "auto.commit.enable"},
    {_RK_TOPIC | _RK_CONSUMER | _RK_HIGH, "auto.commit.interval.ms", _RK_C_INT,
     _RKT(auto_commit_interval_ms),
     "[**LEGACY PROPERTY:** This setting is used by the simple legacy "
     "consumer only. When using the high-level KafkaConsumer, the "
     "global `auto.commit.interval.ms` property must be used instead]. "
     "The frequency in milliseconds that the consumer offsets "
     "are committed (written) to offset storage.",
     10, 86400 * 1000, 60 * 1000},
    {_RK_TOPIC | _RK_CONSUMER | _RK_HIGH, "auto.offset.reset", _RK_C_S2I,
     _RKT(auto_offset_reset),
     "Action to take when there is no initial offset in offset store "
     "or the desired offset is out of range: "
     "'smallest','earliest' - automatically reset the offset to the smallest "
     "offset, "
     "'largest','latest' - automatically reset the offset to the largest "
     "offset, "
     "'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is "
     "retrieved by consuming messages and checking 'message->err'.",
     .vdef = RD_KAFKA_OFFSET_END,
     .s2i =
         {
             {RD_KAFKA_OFFSET_BEGINNING, "smallest"},
             {RD_KAFKA_OFFSET_BEGINNING, "earliest"},
             {RD_KAFKA_OFFSET_BEGINNING, "beginning"},
             {RD_KAFKA_OFFSET_END, "largest"},
             {RD_KAFKA_OFFSET_END, "latest"},
             {RD_KAFKA_OFFSET_END, "end"},
             {RD_KAFKA_OFFSET_INVALID, "error"},
         }},
    {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.path", _RK_C_STR,
     _RKT(offset_store_path),
     "Path to local file for storing offsets. If the path is a directory "
     "a filename will be automatically generated in that directory based "
     "on the topic and partition. "
     "File-based offset storage will be removed in a future version.",
     .sdef = "."},

    {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.sync.interval.ms",
     _RK_C_INT, _RKT(offset_store_sync_interval_ms),
     "fsync() interval for the offset file, in milliseconds. "
     "Use -1 to disable syncing, and 0 for immediate sync after "
     "each write. "
     "File-based offset storage will be removed in a future version.",
     -1, 86400 * 1000, -1},

    {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.method",
     _RK_C_S2I, _RKT(offset_store_method),
     "Offset commit store method: "
     "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
     "'broker' - broker commit store "
     "(requires \"group.id\" to be configured and "
     "Apache Kafka 0.8.2 or later on the broker.).",
     .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
     .s2i  = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"},
             {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},

    {_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
     _RKT(consume_callback_max_msgs),
     "Maximum number of messages to dispatch in "
     "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
     0, 1000000, 0},

    {0, /* End */}};

/**
 * @returns the property object for \p name in \p scope, or NULL if not found.
 * @remark does not work with interceptor configs.
 */
const struct rd_kafka_property *rd_kafka_conf_prop_find(int scope,
                                                        const char *name) {
        const struct rd_kafka_property *prop;

restart:
        for (prop = rd_kafka_properties; prop->name; prop++) {

                if (!(prop->scope & scope))
                        continue;

                if (strcmp(prop->name, name))
                        continue;

                if (prop->type == _RK_C_ALIAS) {
                        /* Caller supplied an alias, restart
                         * search for real name. */
                        name = prop->sdef;
                        goto restart;
                }

                return prop;
        }

        return NULL;
}

/**
 * @returns rd_true if property has been set/modified, else rd_false.
 *
 * @warning Asserts if the property does not exist.
 */
rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,
                                    const char *name) {
        const struct rd_kafka_property *prop;

        if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name)))
                RD_BUG("Configuration property \"%s\" does not exist", name);

        return rd_kafka_anyconf_is_modified(conf, prop);
}


/**
 * @returns true if property has been set/modified, else 0.
 *
 * @warning Asserts if the property does not exist.
 */
static rd_bool_t
rd_kafka_topic_conf_is_modified(const rd_kafka_topic_conf_t *conf,
                                const char *name) {
        const struct rd_kafka_property *prop;

        if (!(prop = rd_kafka_conf_prop_find(_RK_TOPIC, name)))
                RD_BUG("Topic configuration property \"%s\" does not exist",
                       name);

        return rd_kafka_anyconf_is_modified(conf, prop);
}



static rd_kafka_conf_res_t
rd_kafka_anyconf_set_prop0(int scope,
                           void *conf,
                           const struct rd_kafka_property *prop,
                           const char *istr,
                           int ival,
                           rd_kafka_conf_set_mode_t set_mode,
                           char *errstr,
                           size_t errstr_size) {
        rd_kafka_conf_res_t res;

#define _RK_PTR(TYPE, BASE, OFFSET) (TYPE)(void *)(((char *)(BASE)) + (OFFSET))

        /* Try interceptors first (only for GLOBAL config) */
        if (scope & _RK_GLOBAL) {
                if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
                        res = RD_KAFKA_CONF_UNKNOWN;
                else
                        res = rd_kafka_interceptors_on_conf_set(
                            conf, prop->name, istr, errstr, errstr_size);
                if (res != RD_KAFKA_CONF_UNKNOWN)
                        return res;
        }


        if (prop->set) {
                /* Custom setter */

                res = prop->set(scope, conf, prop->name, istr,
                                _RK_PTR(void *, conf, prop->offset), set_mode,
                                errstr, errstr_size);

                if (res != RD_KAFKA_CONF_OK)
                        return res;

                /* FALLTHRU so that property value is set. */
        }

        switch (prop->type) {
        case _RK_C_STR: {
                char **str = _RK_PTR(char **, conf, prop->offset);
                if (*str)
                        rd_free(*str);
                if (istr)
                        *str = rd_strdup(istr);
                else
                        *str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
                break;
        }
        case _RK_C_KSTR: {
                rd_kafkap_str_t **kstr =
                    _RK_PTR(rd_kafkap_str_t **, conf, prop->offset);
                if (*kstr)
                        rd_kafkap_str_destroy(*kstr);
                if (istr)
                        *kstr = rd_kafkap_str_new(istr, -1);
                else
                        *kstr = prop->sdef ? rd_kafkap_str_new(prop->sdef, -1)
                                           : NULL;
                break;
        }
        case _RK_C_PTR:
                *_RK_PTR(const void **, conf, prop->offset) = istr;
                break;
        case _RK_C_BOOL:
        case _RK_C_INT:
        case _RK_C_S2I:
        case _RK_C_S2F: {
                int *val = _RK_PTR(int *, conf, prop->offset);

                if (prop->type == _RK_C_S2F) {
                        switch (set_mode) {
                        case _RK_CONF_PROP_SET_REPLACE:
                                *val = ival;
                                break;
                        case _RK_CONF_PROP_SET_ADD:
                                *val |= ival;
                                break;
                        case _RK_CONF_PROP_SET_DEL:
                                *val &= ~ival;
                                break;
                        }
                } else {
                        /* Single assignment */
                        *val = ival;
                }
                break;
        }
        case _RK_C_DBL: {
                double *val = _RK_PTR(double *, conf, prop->offset);
                if (istr) {
                        char *endptr;
                        double new_val = strtod(istr, &endptr);
                        /* This is verified in set_prop() */
                        rd_assert(endptr != istr);
                        *val = new_val;
                } else
                        *val = prop->ddef;
                break;
        }

        case _RK_C_PATLIST: {
                /* Split comma-separated list into individual regex expressions
                 * that are verified and then append to the provided list. */
                rd_kafka_pattern_list_t **plist;

                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);

                if (*plist)
                        rd_kafka_pattern_list_destroy(*plist);

                if (istr) {
                        if (!(*plist = rd_kafka_pattern_list_new(
                                  istr, errstr, (int)errstr_size)))
                                return RD_KAFKA_CONF_INVALID;
                } else
                        *plist = NULL;

                break;
        }

        case _RK_C_INTERNAL:
                /* Probably handled by setter */
                break;

        default:
                rd_kafka_assert(NULL, !*"unknown conf type");
        }


        rd_kafka_anyconf_set_modified(conf, prop, 1 /*modified*/);
        return RD_KAFKA_CONF_OK;
}


/**
 * @brief Find s2i (string-to-int mapping) entry and return its array index,
 *        or -1 on miss.
 */
static int rd_kafka_conf_s2i_find(const struct rd_kafka_property *prop,
                                  const char *value) {
        int j;

        for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
                if (prop->s2i[j].str && !rd_strcasecmp(prop->s2i[j].str, value))
                        return j;
        }

        return -1;
}


/**
 * @brief Set configuration property.
 *
 * @param allow_specific Allow rd_kafka_*conf_set_..() to be set,
 *        such as rd_kafka_conf_set_log_cb().
 *        Should not be allowed from the conf_set() string interface.
 */
static rd_kafka_conf_res_t
rd_kafka_anyconf_set_prop(int scope,
                          void *conf,
                          const struct rd_kafka_property *prop,
                          const char *value,
                          int allow_specific,
                          char *errstr,
                          size_t errstr_size) {
        int ival;

        if (prop->unsupported) {
                rd_snprintf(errstr, errstr_size,
                            "Configuration property \"%s\" not supported "
                            "in this build: %s",
                            prop->name, prop->unsupported);
                return RD_KAFKA_CONF_INVALID;
        }

        switch (prop->type) {
        case _RK_C_STR:
                /* Left-trim string(likes) */
                if (value)
                        while (isspace((int)*value))
                                value++;

                /* FALLTHRU */
        case _RK_C_KSTR:
                if (prop->s2i[0].str) {
                        int match;

                        if (!value || (match = rd_kafka_conf_s2i_find(
                                           prop, value)) == -1) {
                                rd_snprintf(errstr, errstr_size,
                                            "Invalid value for "
                                            "configuration property \"%s\": "
                                            "%s",
                                            prop->name, value);
                                return RD_KAFKA_CONF_INVALID;
                        }

                        /* Replace value string with canonical form */
                        value = prop->s2i[match].str;
                }
                /* FALLTHRU */
        case _RK_C_PATLIST:
                if (prop->validate &&
                    (!value || !prop->validate(prop, value, -1))) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value for "
                                    "configuration property \"%s\": %s",
                                    prop->name, value);
                        return RD_KAFKA_CONF_INVALID;
                }

                return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
                                                  _RK_CONF_PROP_SET_REPLACE,
                                                  errstr, errstr_size);

        case _RK_C_PTR:
                /* Allow hidden internal unit test properties to
                 * be set from generic conf_set() interface. */
                if (!allow_specific && !(prop->scope & _RK_HIDDEN)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Property \"%s\" must be set through "
                                    "dedicated .._set_..() function",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }
                return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
                                                  _RK_CONF_PROP_SET_REPLACE,
                                                  errstr, errstr_size);

        case _RK_C_BOOL:
                if (!value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Bool configuration property \"%s\" cannot "
                                    "be set to empty value",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }


                if (!rd_strcasecmp(value, "true") ||
                    !rd_strcasecmp(value, "t") || !strcmp(value, "1"))
                        ival = 1;
                else if (!rd_strcasecmp(value, "false") ||
                         !rd_strcasecmp(value, "f") || !strcmp(value, "0"))
                        ival = 0;
                else {
                        rd_snprintf(errstr, errstr_size,
                                    "Expected bool value for \"%s\": "
                                    "true or false",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
                                           _RK_CONF_PROP_SET_REPLACE, errstr,
                                           errstr_size);
                return RD_KAFKA_CONF_OK;

        case _RK_C_INT: {
                const char *end;

                if (!value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Integer configuration "
                                    "property \"%s\" cannot be set "
                                    "to empty value",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                ival = (int)strtol(value, (char **)&end, 0);
                if (end == value) {
                        /* Non numeric, check s2i for string mapping */
                        int match = rd_kafka_conf_s2i_find(prop, value);

                        if (match == -1) {
                                rd_snprintf(errstr, errstr_size,
                                            "Invalid value for "
                                            "configuration property \"%s\"",
                                            prop->name);
                                return RD_KAFKA_CONF_INVALID;
                        }

                        if (prop->s2i[match].unsupported) {
                                rd_snprintf(errstr, errstr_size,
                                            "Unsupported value \"%s\" for "
                                            "configuration property \"%s\": %s",
                                            value, prop->name,
                                            prop->s2i[match].unsupported);
                                return RD_KAFKA_CONF_INVALID;
                        }

                        ival = prop->s2i[match].val;
                }

                if (ival < prop->vmin || ival > prop->vmax) {
                        rd_snprintf(errstr, errstr_size,
                                    "Configuration property \"%s\" value "
                                    "%i is outside allowed range %i..%i\n",
                                    prop->name, ival, prop->vmin, prop->vmax);
                        return RD_KAFKA_CONF_INVALID;
                }

                rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
                                           _RK_CONF_PROP_SET_REPLACE, errstr,
                                           errstr_size);
                return RD_KAFKA_CONF_OK;
        }

        case _RK_C_DBL: {
                const char *end;
                double dval;

                if (!value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Float configuration "
                                    "property \"%s\" cannot be set "
                                    "to empty value",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                dval = strtod(value, (char **)&end);
                if (end == value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value for "
                                    "configuration property \"%s\"",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                if (dval < prop->dmin || dval > prop->dmax) {
                        rd_snprintf(errstr, errstr_size,
                                    "Configuration property \"%s\" value "
                                    "%g is outside allowed range %g..%g\n",
                                    prop->name, dval, prop->dmin, prop->dmax);
                        return RD_KAFKA_CONF_INVALID;
                }

                rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
                                           _RK_CONF_PROP_SET_REPLACE, errstr,
                                           errstr_size);
                return RD_KAFKA_CONF_OK;
        }

        case _RK_C_S2I:
        case _RK_C_S2F: {
                int j;
                const char *next;

                if (!value) {
                        rd_snprintf(errstr, errstr_size,
                                    "Configuration "
                                    "property \"%s\" cannot be set "
                                    "to empty value",
                                    prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }

                next = value;
                while (next && *next) {
                        const char *s, *t;
                        rd_kafka_conf_set_mode_t set_mode =
                            _RK_CONF_PROP_SET_ADD; /* S2F */

                        s = next;

                        if (prop->type == _RK_C_S2F && (t = strchr(s, ','))) {
                                /* CSV flag field */
                                next = t + 1;
                        } else {
                                /* Single string */
                                t    = s + strlen(s);
                                next = NULL;
                        }


                        /* Left trim */
                        while (s < t && isspace((int)*s))
                                s++;

                        /* Right trim */
                        while (t > s && isspace((int)*t))
                                t--;

                        /* S2F: +/- prefix */
                        if (prop->type == _RK_C_S2F) {
                                if (*s == '+') {
                                        set_mode = _RK_CONF_PROP_SET_ADD;
                                        s++;
                                } else if (*s == '-') {
                                        set_mode = _RK_CONF_PROP_SET_DEL;
                                        s++;
                                }
                        }

                        /* Empty string? */
                        if (s == t)
                                continue;

                        /* Match string to s2i table entry */
                        for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
                                int new_val;

                                if (!prop->s2i[j].str)
                                        continue;

                                if (strlen(prop->s2i[j].str) ==
                                        (size_t)(t - s) &&
                                    !rd_strncasecmp(prop->s2i[j].str, s,
                                                    (int)(t - s)))
                                        new_val = prop->s2i[j].val;
                                else
                                        continue;

                                if (prop->s2i[j].unsupported) {
                                        rd_snprintf(
                                            errstr, errstr_size,
                                            "Unsupported value \"%.*s\" "
                                            "for configuration property "
                                            "\"%s\": %s",
                                            (int)(t - s), s, prop->name,
                                            prop->s2i[j].unsupported);
                                        return RD_KAFKA_CONF_INVALID;
                                }

                                rd_kafka_anyconf_set_prop0(
                                    scope, conf, prop, value, new_val, set_mode,
                                    errstr, errstr_size);

                                if (prop->type == _RK_C_S2F) {
                                        /* Flags: OR it in: do next */
                                        break;
                                } else {
                                        /* Single assignment */
                                        return RD_KAFKA_CONF_OK;
                                }
                        }

                        /* S2F: Good match: continue with next */
                        if (j < (int)RD_ARRAYSIZE(prop->s2i))
                                continue;

                        /* No match */
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value \"%.*s\" for "
                                    "configuration property \"%s\"",
                                    (int)(t - s), s, prop->name);
                        return RD_KAFKA_CONF_INVALID;
                }
                return RD_KAFKA_CONF_OK;
        }

        case _RK_C_INTERNAL:
                rd_snprintf(errstr, errstr_size,
                            "Internal property \"%s\" not settable",
                            prop->name);
                return RD_KAFKA_CONF_INVALID;

        case _RK_C_INVALID:
                rd_snprintf(errstr, errstr_size, "%s", prop->desc);
                return RD_KAFKA_CONF_INVALID;

        default:
                rd_kafka_assert(NULL, !*"unknown conf type");
        }

        /* not reachable */
        return RD_KAFKA_CONF_INVALID;
}



static void rd_kafka_defaultconf_set(int scope, void *conf) {
        const struct rd_kafka_property *prop;

        for (prop = rd_kafka_properties; prop->name; prop++) {
                if (!(prop->scope & scope))
                        continue;

                if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
                        continue;

                if (prop->ctor)
                        prop->ctor(scope, conf);

                if (prop->sdef || prop->vdef || prop->pdef ||
                    !rd_dbl_zero(prop->ddef))
                        rd_kafka_anyconf_set_prop0(
                            scope, conf, prop,
                            prop->sdef ? prop->sdef : prop->pdef, prop->vdef,
                            _RK_CONF_PROP_SET_REPLACE, NULL, 0);
        }
}

rd_kafka_conf_t *rd_kafka_conf_new(void) {
        rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
        rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
        rd_kafka_anyconf_clear_all_is_modified(conf);
        return conf;
}

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
        rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
        rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
        rd_kafka_anyconf_clear_all_is_modified(tconf);
        return tconf;
}


static int rd_kafka_anyconf_set(int scope,
                                void *conf,
                                const char *name,
                                const char *value,
                                char *errstr,
                                size_t errstr_size) {
        char estmp[1];
        const struct rd_kafka_property *prop;
        rd_kafka_conf_res_t res;

        if (!errstr) {
                errstr      = estmp;
                errstr_size = 0;
        }

        if (value && !*value)
                value = NULL;

        /* Try interceptors first (only for GLOBAL config for now) */
        if (scope & _RK_GLOBAL) {
                res = rd_kafka_interceptors_on_conf_set(
                    (rd_kafka_conf_t *)conf, name, value, errstr, errstr_size);
                /* Handled (successfully or not) by interceptor. */
                if (res != RD_KAFKA_CONF_UNKNOWN)
                        return res;
        }

        /* Then global config */


        for (prop = rd_kafka_properties; prop->name; prop++) {

                if (!(prop->scope & scope))
                        continue;

                if (strcmp(prop->name, name))
                        continue;

                if (prop->type == _RK_C_ALIAS)
                        return rd_kafka_anyconf_set(scope, conf, prop->sdef,
                                                    value, errstr, errstr_size);

                return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
                                                 0 /*don't allow specifics*/,
                                                 errstr, errstr_size);
        }

        rd_snprintf(errstr, errstr_size,
                    "No such configuration property: \"%s\"", name);

        return RD_KAFKA_CONF_UNKNOWN;
}


/**
 * @brief Set a rd_kafka_*_conf_set_...() specific property, such as
 *        rd_kafka_conf_set_error_cb().
 *
 * @warning Will not call interceptor's on_conf_set.
 * @warning Asserts if \p name is not known or value is incorrect.
 *
 * Implemented as a macro to have rd_assert() print the original function.
 */

#define rd_kafka_anyconf_set_internal(SCOPE, CONF, NAME, VALUE)                \
        do {                                                                   \
                const struct rd_kafka_property *_prop;                         \
                rd_kafka_conf_res_t _res;                                      \
                _prop = rd_kafka_conf_prop_find(SCOPE, NAME);                  \
                rd_assert(_prop && * "invalid property name");                 \
                _res = rd_kafka_anyconf_set_prop(                              \
                    SCOPE, CONF, _prop, (const void *)VALUE,                   \
                    1 /*allow-specifics*/, NULL, 0);                           \
                rd_assert(_res == RD_KAFKA_CONF_OK);                           \
        } while (0)


rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
                                      const char *name,
                                      const char *value,
                                      char *errstr,
                                      size_t errstr_size) {
        rd_kafka_conf_res_t res;

        res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value, errstr,
                                   errstr_size);
        if (res != RD_KAFKA_CONF_UNKNOWN)
                return res;

        /* Fallthru:
         * If the global property was unknown, try setting it on the
         * default topic config. */
        if (!conf->topic_conf) {
                /* Create topic config, might be over-written by application
                 * later. */
                rd_kafka_conf_set_default_topic_conf(conf,
                                                     rd_kafka_topic_conf_new());
        }

        return rd_kafka_topic_conf_set(conf->topic_conf, name, value, errstr,
                                       errstr_size);
}


rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
                                            const char *name,
                                            const char *value,
                                            char *errstr,
                                            size_t errstr_size) {
        if (!strncmp(name, "topic.", strlen("topic.")))
                name += strlen("topic.");

        return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value, errstr,
                                    errstr_size);
}


/**
 * @brief Overwrites the contents of \p str up until but not including
 *        the nul-term.
 */
void rd_kafka_desensitize_str(char *str) {
        size_t len;
        static const char redacted[] = "(REDACTED)";

#ifdef _WIN32
        len = strlen(str);
        SecureZeroMemory(str, len);
#else
        volatile char *volatile s;

        for (s = str; *s; s++)
                *s = '\0';

        len = (size_t)(s - str);
#endif

        if (len > sizeof(redacted))
                memcpy(str, redacted, sizeof(redacted));
}



/**
 * @brief Overwrite the value of \p prop, if sensitive.
 */
static RD_INLINE void
rd_kafka_anyconf_prop_desensitize(int scope,
                                  void *conf,
                                  const struct rd_kafka_property *prop) {
        if (likely(!(prop->scope & _RK_SENSITIVE)))
                return;

        switch (prop->type) {
        case _RK_C_STR: {
                char **str = _RK_PTR(char **, conf, prop->offset);
                if (*str)
                        rd_kafka_desensitize_str(*str);
                break;
        }

        case _RK_C_INTERNAL:
                /* This is typically a pointer to something, the
                 * _RK_SENSITIVE flag is set to get it redacted in
                 * ..dump_dbg(), but we don't have to desensitize
                 * anything here. */
                break;

        default:
                rd_assert(!*"BUG: Don't know how to desensitize prop type");
                break;
        }
}


/**
 * @brief Desensitize all sensitive properties in \p conf
 */
static void rd_kafka_anyconf_desensitize(int scope, void *conf) {
        const struct rd_kafka_property *prop;

        for (prop = rd_kafka_properties; prop->name; prop++) {
                if (!(prop->scope & scope))
                        continue;

                rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
        }
}

/**
 * @brief Overwrite the values of sensitive properties
 */
void rd_kafka_conf_desensitize(rd_kafka_conf_t *conf) {
        if (conf->topic_conf)
                rd_kafka_anyconf_desensitize(_RK_TOPIC, conf->topic_conf);
        rd_kafka_anyconf_desensitize(_RK_GLOBAL, conf);
}

/**
 * @brief Overwrite the values of sensitive properties
 */
void rd_kafka_topic_conf_desensitize(rd_kafka_topic_conf_t *tconf) {
        rd_kafka_anyconf_desensitize(_RK_TOPIC, tconf);
}


static void rd_kafka_anyconf_clear(int scope,
                                   void *conf,
                                   const struct rd_kafka_property *prop) {

        rd_kafka_anyconf_prop_desensitize(scope, conf, prop);

        switch (prop->type) {
        case _RK_C_STR: {
                char **str = _RK_PTR(char **, conf, prop->offset);

                if (*str) {
                        if (prop->set) {
                                prop->set(scope, conf, prop->name, NULL, *str,
                                          _RK_CONF_PROP_SET_DEL, NULL, 0);
                                /* FALLTHRU */
                        }
                        rd_free(*str);
                        *str = NULL;
                }
        } break;

        case _RK_C_KSTR: {
                rd_kafkap_str_t **kstr =
                    _RK_PTR(rd_kafkap_str_t **, conf, prop->offset);
                if (*kstr) {
                        rd_kafkap_str_destroy(*kstr);
                        *kstr = NULL;
                }
        } break;

        case _RK_C_PATLIST: {
                rd_kafka_pattern_list_t **plist;
                plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
                if (*plist) {
                        rd_kafka_pattern_list_destroy(*plist);
                        *plist = NULL;
                }
        } break;

        case _RK_C_PTR:
                if (_RK_PTR(void *, conf, prop->offset) != NULL) {
                        if (!strcmp(prop->name, "default_topic_conf")) {
                                rd_kafka_topic_conf_t **tconf;

                                tconf = _RK_PTR(rd_kafka_topic_conf_t **, conf,
                                                prop->offset);
                                if (*tconf) {
                                        rd_kafka_topic_conf_destroy(*tconf);
                                        *tconf = NULL;
                                }
                        }
                }
                break;

        default:
                break;
        }

        if (prop->dtor)
                prop->dtor(scope, conf);
}

void rd_kafka_anyconf_destroy(int scope, void *conf) {
        const struct rd_kafka_property *prop;

        /* Call on_conf_destroy() interceptors */
        if (scope == _RK_GLOBAL)
                rd_kafka_interceptors_on_conf_destroy(conf);

        for (prop = rd_kafka_properties; prop->name; prop++) {
                if (!(prop->scope & scope))
                        continue;

                rd_kafka_anyconf_clear(scope, conf, prop);
        }
}


void rd_kafka_conf_destroy(rd_kafka_conf_t *conf) {
        rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
        // FIXME: partition_assignors
        rd_free(conf);
}

void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf) {
        rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
        rd_free(topic_conf);
}



static void rd_kafka_anyconf_copy(int scope,
                                  void *dst,
                                  const void *src,
                                  size_t filter_cnt,
                                  const char **filter) {
        const struct rd_kafka_property *prop;

        for (prop = rd_kafka_properties; prop->name; prop++) {
                const char *val = NULL;
                int ival        = 0;
                char *valstr;
                size_t valsz;
                size_t fi;
                size_t nlen;

                if (!(prop->scope & scope))
                        continue;

                if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
                        continue;

                /* Skip properties that have not been set,
                 * unless it is an internal one which requires
                 * extra logic, such as the interceptors. */
                if (!rd_kafka_anyconf_is_modified(src, prop) &&
                    prop->type != _RK_C_INTERNAL)
                        continue;

                /* Apply filter, if any. */
                nlen = strlen(prop->name);
                for (fi = 0; fi < filter_cnt; fi++) {
                        size_t flen = strlen(filter[fi]);
                        if (nlen >= flen &&
                            !strncmp(filter[fi], prop->name, flen))
                                break;
                }
                if (fi < filter_cnt)
                        continue; /* Filter matched */

                switch (prop->type) {
                case _RK_C_STR:
                case _RK_C_PTR:
                        val = *_RK_PTR(const char **, src, prop->offset);

                        if (!strcmp(prop->name, "default_topic_conf") && val)
                                val = (void *)rd_kafka_topic_conf_dup(
                                    (const rd_kafka_topic_conf_t *)(void *)val);
                        break;
                case _RK_C_KSTR: {
                        rd_kafkap_str_t **kstr =
                            _RK_PTR(rd_kafkap_str_t **, src, prop->offset);
                        if (*kstr)
                                val = (*kstr)->str;
                        break;
                }

                case _RK_C_BOOL:
                case _RK_C_INT:
                case _RK_C_S2I:
                case _RK_C_S2F:
                        ival = *_RK_PTR(const int *, src, prop->offset);

                        /* Get string representation of configuration value. */
                        valsz = 0;
                        rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
                        valstr = rd_alloca(valsz);
                        rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
                        val = valstr;
                        break;
                case _RK_C_DBL:
                        /* Get string representation of configuration value. */
                        valsz = 0;
                        rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
                        valstr = rd_alloca(valsz);
                        rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
                        val = valstr;
                        break;
                case _RK_C_PATLIST: {
                        const rd_kafka_pattern_list_t **plist;
                        plist = _RK_PTR(const rd_kafka_pattern_list_t **, src,
                                        prop->offset);
                        if (*plist)
                                val = (*plist)->rkpl_orig;
                        break;
                }
                case _RK_C_INTERNAL:
                        /* Handled by ->copy() below. */
                        break;
                default:
                        continue;
                }

                if (prop->copy)
                        prop->copy(scope, dst, src,
                                   _RK_PTR(void *, dst, prop->offset),
                                   _RK_PTR(const void *, src, prop->offset),
                                   filter_cnt, filter);

                rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
                                           _RK_CONF_PROP_SET_REPLACE, NULL, 0);
        }
}


rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf) {
        rd_kafka_conf_t *new = rd_kafka_conf_new();

        rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);

        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);

        return new;
}

rd_kafka_conf_t *rd_kafka_conf_dup_filter(const rd_kafka_conf_t *conf,
                                          size_t filter_cnt,
                                          const char **filter) {
        rd_kafka_conf_t *new = rd_kafka_conf_new();

        rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);

        rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);

        return new;
}


rd_kafka_topic_conf_t *
rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf) {
        rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();

        rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);

        return new;
}

rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup(rd_kafka_t *rk) {
        if (rk->rk_conf.topic_conf)
                return rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
        else
                return rd_kafka_topic_conf_new();
}

void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events) {
        char tmp[32];
        rd_snprintf(tmp, sizeof(tmp), "%d", events);
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enabled_events", tmp);
}

void rd_kafka_conf_set_background_event_cb(
    rd_kafka_conf_t *conf,
    void (*event_cb)(rd_kafka_t *rk, rd_kafka_event_t *rkev, void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "background_event_cb",
                                      event_cb);
}


void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
                             void (*dr_cb)(rd_kafka_t *rk,
                                           void *payload,
                                           size_t len,
                                           rd_kafka_resp_err_t err,
                                           void *opaque,
                                           void *msg_opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_cb", dr_cb);
}


void rd_kafka_conf_set_dr_msg_cb(
    rd_kafka_conf_t *conf,
    void (*dr_msg_cb)(rd_kafka_t *rk,
                      const rd_kafka_message_t *rkmessage,
                      void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_msg_cb", dr_msg_cb);
}


void rd_kafka_conf_set_consume_cb(
    rd_kafka_conf_t *conf,
    void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "consume_cb",
                                      consume_cb);
}

void rd_kafka_conf_set_rebalance_cb(
    rd_kafka_conf_t *conf,
    void (*rebalance_cb)(rd_kafka_t *rk,
                         rd_kafka_resp_err_t err,
                         rd_kafka_topic_partition_list_t *partitions,
                         void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "rebalance_cb",
                                      rebalance_cb);
}

void rd_kafka_conf_set_offset_commit_cb(
    rd_kafka_conf_t *conf,
    void (*offset_commit_cb)(rd_kafka_t *rk,
                             rd_kafka_resp_err_t err,
                             rd_kafka_topic_partition_list_t *offsets,
                             void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "offset_commit_cb",
                                      offset_commit_cb);
}



void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
                                void (*error_cb)(rd_kafka_t *rk,
                                                 int err,
                                                 const char *reason,
                                                 void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "error_cb", error_cb);
}


void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf,
                                   void (*throttle_cb)(rd_kafka_t *rk,
                                                       const char *broker_name,
                                                       int32_t broker_id,
                                                       int throttle_time_ms,
                                                       void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "throttle_cb",
                                      throttle_cb);
}


void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
                              void (*log_cb)(const rd_kafka_t *rk,
                                             int level,
                                             const char *fac,
                                             const char *buf)) {
#if !WITH_SYSLOG
        if (log_cb == rd_kafka_log_syslog)
                rd_assert(!*"syslog support not enabled in this build");
#endif
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "log_cb", log_cb);
}


void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
                                int (*stats_cb)(rd_kafka_t *rk,
                                                char *json,
                                                size_t json_len,
                                                void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "stats_cb", stats_cb);
}

void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
    rd_kafka_conf_t *conf,
    void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
                                         const char *oauthbearer_config,
                                         void *opaque)) {
#if WITH_SASL_OAUTHBEARER
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
                                      "oauthbearer_token_refresh_cb",
                                      oauthbearer_token_refresh_cb);
#endif
}

void rd_kafka_conf_enable_sasl_queue(rd_kafka_conf_t *conf, int enable) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enable_sasl_queue",
                                      (enable ? "true" : "false"));
}

void rd_kafka_conf_set_socket_cb(
    rd_kafka_conf_t *conf,
    int (*socket_cb)(int domain, int type, int protocol, void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "socket_cb", socket_cb);
}

void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t *conf,
                                  int (*connect_cb)(int sockfd,
                                                    const struct sockaddr *addr,
                                                    int addrlen,
                                                    const char *id,
                                                    void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "connect_cb",
                                      connect_cb);
}

void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t *conf,
                                      int (*closesocket_cb)(int sockfd,
                                                            void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "closesocket_cb",
                                      closesocket_cb);
}



#ifndef _WIN32
void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf,
                               int (*open_cb)(const char *pathname,
                                              int flags,
                                              mode_t mode,
                                              void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "open_cb", open_cb);
}
#endif

void rd_kafka_conf_set_resolve_cb(
    rd_kafka_conf_t *conf,
    int (*resolve_cb)(const char *node,
                      const char *service,
                      const struct addrinfo *hints,
                      struct addrinfo **res,
                      void *opaque)) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "resolve_cb",
                                      resolve_cb);
}

rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(
    rd_kafka_conf_t *conf,
    int (*ssl_cert_verify_cb)(rd_kafka_t *rk,
                              const char *broker_name,
                              int32_t broker_id,
                              int *x509_set_error,
                              int depth,
                              const char *buf,
                              size_t size,
                              char *errstr,
                              size_t errstr_size,
                              void *opaque)) {
#if !WITH_SSL
        return RD_KAFKA_CONF_INVALID;
#else
        rd_kafka_anyconf_set_internal(
            _RK_GLOBAL, conf, "ssl.certificate.verify_cb", ssl_cert_verify_cb);
        return RD_KAFKA_CONF_OK;
#endif
}


void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque) {
        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "opaque", opaque);
}


void rd_kafka_conf_set_engine_callback_data(rd_kafka_conf_t *conf,
                                            void *callback_data) {
        rd_kafka_anyconf_set_internal(
            _RK_GLOBAL, conf, "ssl_engine_callback_data", callback_data);
}


void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf,
                                          rd_kafka_topic_conf_t *tconf) {
        if (conf->topic_conf) {
                if (rd_kafka_anyconf_is_any_modified(conf->topic_conf))
                        conf->warn.default_topic_conf_overwritten = rd_true;
                rd_kafka_topic_conf_destroy(conf->topic_conf);
        }

        rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "default_topic_conf",
                                      tconf);
}

rd_kafka_topic_conf_t *
rd_kafka_conf_get_default_topic_conf(rd_kafka_conf_t *conf) {
        return conf->topic_conf;
}


void rd_kafka_topic_conf_set_partitioner_cb(
    rd_kafka_topic_conf_t *topic_conf,
    int32_t (*partitioner)(const rd_kafka_topic_t *rkt,
                           const void *keydata,
                           size_t keylen,
                           int32_t partition_cnt,
                           void *rkt_opaque,
                           void *msg_opaque)) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "partitioner_cb",
                                      partitioner);
}

void rd_kafka_topic_conf_set_msg_order_cmp(
    rd_kafka_topic_conf_t *topic_conf,
    int (*msg_order_cmp)(const rd_kafka_message_t *a,
                         const rd_kafka_message_t *b)) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "msg_order_cmp",
                                      msg_order_cmp);
}

void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *topic_conf,
                                    void *opaque) {
        rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "opaque", opaque);
}



/**
 * @brief Convert flags \p ival to csv-string using S2F property \p prop.
 *
 * This function has two modes: size query and write.
 * To query for needed size call with dest==NULL,
 * to write to buffer of size dest_size call with dest!=NULL.
 *
 * An \p ival of -1 means all.
 *
 * @param include_unsupported Include flag values that are unsupported
 *                            due to missing dependencies at build time.
 *
 * @returns the number of bytes written to \p dest (if not NULL), else the
 *          total number of bytes needed.
 *
 */
static size_t rd_kafka_conf_flags2str(char *dest,
                                      size_t dest_size,
                                      const char *delim,
                                      const struct rd_kafka_property *prop,
                                      int ival,
                                      rd_bool_t include_unsupported) {
        size_t of = 0;
        int j;

        if (dest && dest_size > 0)
                *dest = '\0';

        /* Phase 1: scan for set flags, accumulate needed size.
         * Phase 2: write to dest */
        for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i) && prop->s2i[j].str; j++) {
                if (prop->type == _RK_C_S2F && ival != -1 &&
                    (ival & prop->s2i[j].val) != prop->s2i[j].val)
                        continue;
                else if (prop->type == _RK_C_S2I && ival != -1 &&
                         prop->s2i[j].val != ival)
                        continue;
                else if (prop->s2i[j].unsupported && !include_unsupported)
                        continue;

                if (!dest)
                        of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
                else {
                        size_t r;
                        r = rd_snprintf(dest + of, dest_size - of, "%s%s",
                                        of > 0 ? delim : "", prop->s2i[j].str);
                        if (r > dest_size - of) {
                                r = dest_size - of;
                                break;
                        }
                        of += r;
                }
        }

        return of + 1 /*nul*/;
}


/**
 * Return "original"(re-created) configuration value string
 */
static rd_kafka_conf_res_t
rd_kafka_anyconf_get0(const void *conf,
                      const struct rd_kafka_property *prop,
                      char *dest,
                      size_t *dest_size) {
        char tmp[22];
        const char *val = NULL;
        size_t val_len  = 0;
        int j;

        switch (prop->type) {
        case _RK_C_STR:
                val = *_RK_PTR(const char **, conf, prop->offset);
                break;

        case _RK_C_KSTR: {
                const rd_kafkap_str_t **kstr =
                    _RK_PTR(const rd_kafkap_str_t **, conf, prop->offset);
                if (*kstr)
                        val = (*kstr)->str;
                break;
        }

        case _RK_C_PTR:
                val = *_RK_PTR(const void **, conf, prop->offset);
                if (val) {
                        rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
                        val = tmp;
                }
                break;

        case _RK_C_BOOL:
                val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
                break;

        case _RK_C_INT:
                rd_snprintf(tmp, sizeof(tmp), "%i",
                            *_RK_PTR(int *, conf, prop->offset));
                val = tmp;
                break;

        case _RK_C_DBL:
                rd_snprintf(tmp, sizeof(tmp), "%g",
                            *_RK_PTR(double *, conf, prop->offset));
                val = tmp;
                break;

        case _RK_C_S2I:
                for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
                        if (prop->s2i[j].val ==
                            *_RK_PTR(int *, conf, prop->offset)) {
                                val = prop->s2i[j].str;
                                break;
                        }
                }
                break;

        case _RK_C_S2F: {
                const int ival = *_RK_PTR(const int *, conf, prop->offset);

                val_len = rd_kafka_conf_flags2str(dest, dest ? *dest_size : 0,
                                                  ",", prop, ival,
                                                  rd_false /*only supported*/);
                if (dest) {
                        val_len = 0;
                        val     = dest;
                        dest    = NULL;
                }
                break;
        }

        case _RK_C_PATLIST: {
                const rd_kafka_pattern_list_t **plist;
                plist = _RK_PTR(const rd_kafka_pattern_list_t **, conf,
                                prop->offset);
                if (*plist)
                        val = (*plist)->rkpl_orig;
                break;
        }

        default:
                break;
        }

        if (val_len) {
                *dest_size = val_len + 1;
                return RD_KAFKA_CONF_OK;
        }

        if (!val)
                return RD_KAFKA_CONF_INVALID;

        val_len = strlen(val);

        if (dest) {
                size_t use_len = RD_MIN(val_len, (*dest_size) - 1);
                memcpy(dest, val, use_len);
                dest[use_len] = '\0';
        }

        /* Return needed size */
        *dest_size = val_len + 1;

        return RD_KAFKA_CONF_OK;
}


static rd_kafka_conf_res_t rd_kafka_anyconf_get(int scope,
                                                const void *conf,
                                                const char *name,
                                                char *dest,
                                                size_t *dest_size) {
        const struct rd_kafka_property *prop;

        for (prop = rd_kafka_properties; prop->name; prop++) {

                if (!(prop->scope & scope) || strcmp(prop->name, name))
                        continue;

                if (prop->type == _RK_C_ALIAS)
                        return rd_kafka_anyconf_get(scope, conf, prop->sdef,
                                                    dest, dest_size);

                if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
                    RD_KAFKA_CONF_OK)
                        return RD_KAFKA_CONF_OK;
        }

        return RD_KAFKA_CONF_UNKNOWN;
}

rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf,
                                            const char *name,
                                            char *dest,
                                            size_t *dest_size) {
        return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
}

rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf,
                                      const char *name,
                                      char *dest,
                                      size_t *dest_size) {
        rd_kafka_conf_res_t res;
        res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
        if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
                return res;

        /* Fallthru:
         * If the global property was unknown, try getting it from the
         * default topic config, if any. */
        return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
}


static const char **rd_kafka_anyconf_dump(int scope,
                                          const void *conf,
                                          size_t *cntp,
                                          rd_bool_t only_modified,
                                          rd_bool_t redact_sensitive) {
        const struct rd_kafka_property *prop;
        char **arr;
        int cnt = 0;

        arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties) * 2);

        for (prop = rd_kafka_properties; prop->name; prop++) {
                char *val = NULL;
                size_t val_size;

                if (!(prop->scope & scope))
                        continue;

                if (only_modified && !rd_kafka_anyconf_is_modified(conf, prop))
                        continue;

                /* Skip aliases, show original property instead.
                 * Skip invalids. */
                if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
                        continue;

                if (redact_sensitive && (prop->scope & _RK_SENSITIVE)) {
                        val = rd_strdup("[redacted]");
                } else {
                        /* Query value size */
                        if (rd_kafka_anyconf_get0(conf, prop, NULL,
                                                  &val_size) !=
                            RD_KAFKA_CONF_OK)
                                continue;

                        /* Get value */
                        val = rd_malloc(val_size);
                        rd_kafka_anyconf_get0(conf, prop, val, &val_size);
                }

                arr[cnt++] = rd_strdup(prop->name);
                arr[cnt++] = val;
        }

        *cntp = cnt;

        return (const char **)arr;
}


const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp) {
        return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp, rd_false /*all*/,
                                     rd_false /*don't redact*/);
}

const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
                                      size_t *cntp) {
        return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp, rd_false /*all*/,
                                     rd_false /*don't redact*/);
}

void rd_kafka_conf_dump_free(const char **arr, size_t cnt) {
        char **_arr = (char **)arr;
        unsigned int i;

        for (i = 0; i < cnt; i++)
                if (_arr[i])
                        rd_free(_arr[i]);

        rd_free(_arr);
}



/**
 * @brief Dump configured properties to debug log.
 */
void rd_kafka_anyconf_dump_dbg(rd_kafka_t *rk,
                               int scope,
                               const void *conf,
                               const char *description) {
        const char **arr;
        size_t cnt;
        size_t i;

        arr =
            rd_kafka_anyconf_dump(scope, conf, &cnt, rd_true /*modified only*/,
                                  rd_true /*redact sensitive*/);
        if (cnt > 0)
                rd_kafka_dbg(rk, CONF, "CONF", "%s:", description);
        for (i = 0; i < cnt; i += 2)
                rd_kafka_dbg(rk, CONF, "CONF", "  %s = %s", arr[i], arr[i + 1]);

        rd_kafka_conf_dump_free(arr, cnt);
}

void rd_kafka_conf_properties_show(FILE *fp) {
        const struct rd_kafka_property *prop0;
        int last = 0;
        int j;
        char tmp[512];
        const char *dash80 =
            "----------------------------------------"
            "----------------------------------------";

        for (prop0 = rd_kafka_properties; prop0->name; prop0++) {
                const char *typeinfo = "";
                const char *importance;
                const struct rd_kafka_property *prop = prop0;

                /* Skip hidden properties. */
                if (prop->scope & _RK_HIDDEN)
                        continue;

                /* Skip invalid properties. */
                if (prop->type == _RK_C_INVALID)
                        continue;

                if (!(prop->scope & last)) {
                        fprintf(fp, "%s## %s configuration properties\n\n",
                                last ? "\n\n" : "",
                                prop->scope == _RK_GLOBAL ? "Global" : "Topic");

                        fprintf(fp,
                                "%-40s | %3s | %-15s | %13s | %-10s | %-25s\n"
                                "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s-| -%.*s\n",
                                "Property", "C/P", "Range", "Default",
                                "Importance", "Description", 40, dash80, 3,
                                dash80, 15, dash80, 13, dash80, 10, dash80, 25,
                                dash80);

                        last = prop->scope & (_RK_GLOBAL | _RK_TOPIC);
                }

                fprintf(fp, "%-40s | ", prop->name);

                /* For aliases, use the aliased property from here on
                 * so that the alias property shows up with proper
                 * ranges, defaults, etc. */
                if (prop->type == _RK_C_ALIAS) {
                        prop = rd_kafka_conf_prop_find(prop->scope, prop->sdef);
                        rd_assert(prop && *"BUG: "
                                  "alias points to unknown config property");
                }

                fprintf(fp, "%3s | ",
                        (!(prop->scope & _RK_PRODUCER) ==
                                 !(prop->scope & _RK_CONSUMER)
                             ? " * "
                             : ((prop->scope & _RK_PRODUCER) ? " P " : " C ")));

                switch (prop->type) {
                case _RK_C_STR:
                case _RK_C_KSTR:
                        typeinfo = "string";
                case _RK_C_PATLIST:
                        if (prop->type == _RK_C_PATLIST)
                                typeinfo = "pattern list";
                        if (prop->s2i[0].str) {
                                rd_kafka_conf_flags2str(
                                    tmp, sizeof(tmp), ", ", prop, -1,
                                    rd_true /*include unsupported*/);
                                fprintf(fp, "%-15s | %13s", tmp,
                                        prop->sdef ? prop->sdef : "");
                        } else {
                                fprintf(fp, "%-15s | %13s", "",
                                        prop->sdef ? prop->sdef : "");
                        }
                        break;
                case _RK_C_BOOL:
                        typeinfo = "boolean";
                        fprintf(fp, "%-15s | %13s", "true, false",
                                prop->vdef ? "true" : "false");
                        break;
                case _RK_C_INT:
                        typeinfo = "integer";
                        rd_snprintf(tmp, sizeof(tmp), "%d .. %d", prop->vmin,
                                    prop->vmax);
                        fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
                        break;
                case _RK_C_DBL:
                        typeinfo = "float"; /* more user-friendly than double */
                        rd_snprintf(tmp, sizeof(tmp), "%g .. %g", prop->dmin,
                                    prop->dmax);
                        fprintf(fp, "%-15s | %13g", tmp, prop->ddef);
                        break;
                case _RK_C_S2I:
                        typeinfo = "enum value";
                        rd_kafka_conf_flags2str(
                            tmp, sizeof(tmp), ", ", prop, -1,
                            rd_true /*include unsupported*/);
                        fprintf(fp, "%-15s | ", tmp);

                        for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
                                if (prop->s2i[j].val == prop->vdef) {
                                        fprintf(fp, "%13s", prop->s2i[j].str);
                                        break;
                                }
                        }
                        if (j == RD_ARRAYSIZE(prop->s2i))
                                fprintf(fp, "%13s", " ");
                        break;

                case _RK_C_S2F:
                        typeinfo = "CSV flags";
                        /* Dont duplicate builtin.features value in
                         * both Range and Default */
                        if (!strcmp(prop->name, "builtin.features"))
                                *tmp = '\0';
                        else
                                rd_kafka_conf_flags2str(
                                    tmp, sizeof(tmp), ", ", prop, -1,
                                    rd_true /*include unsupported*/);
                        fprintf(fp, "%-15s | ", tmp);
                        rd_kafka_conf_flags2str(
                            tmp, sizeof(tmp), ", ", prop, prop->vdef,
                            rd_true /*include unsupported*/);
                        fprintf(fp, "%13s", tmp);

                        break;
                case _RK_C_PTR:
                case _RK_C_INTERNAL:
                        typeinfo = "see dedicated API";
                        /* FALLTHRU */
                default:
                        fprintf(fp, "%-15s | %-13s", "", " ");
                        break;
                }

                if (prop->scope & _RK_HIGH)
                        importance = "high";
                else if (prop->scope & _RK_MED)
                        importance = "medium";
                else
                        importance = "low";

                fprintf(fp, " | %-10s | ", importance);

                if (prop->scope & _RK_EXPERIMENTAL)
                        fprintf(fp,
                                "**EXPERIMENTAL**: "
                                "subject to change or removal. ");

                if (prop->scope & _RK_DEPRECATED)
                        fprintf(fp, "**DEPRECATED** ");

                /* If the original property is an alias, prefix the
                 * description saying so. */
                if (prop0->type == _RK_C_ALIAS)
                        fprintf(fp, "Alias for `%s`: ", prop0->sdef);

                fprintf(fp, "%s <br>*Type: %s*\n", prop->desc, typeinfo);
        }
        fprintf(fp, "\n");
        fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
}



/**
 * @name Configuration value methods
 *
 * @remark This generic interface will eventually replace the config property
 *         used above.
 * @{
 */


/**
 * @brief Set up an INT confval.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_init_int(rd_kafka_confval_t *confval,
                               const char *name,
                               int vmin,
                               int vmax,
                               int vdef) {
        confval->name       = name;
        confval->is_enabled = 1;
        confval->valuetype  = RD_KAFKA_CONFVAL_INT;
        confval->u.INT.vmin = vmin;
        confval->u.INT.vmax = vmax;
        confval->u.INT.vdef = vdef;
        confval->u.INT.v    = vdef;
}

/**
 * @brief Set up a PTR confval.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_init_ptr(rd_kafka_confval_t *confval, const char *name) {
        confval->name       = name;
        confval->is_enabled = 1;
        confval->valuetype  = RD_KAFKA_CONFVAL_PTR;
        confval->u.PTR      = NULL;
}

/**
 * @brief Set up but disable an intval, attempt to set this confval will fail.
 *
 * @oaram name Property name, must be a const static string (will not be copied)
 */
void rd_kafka_confval_disable(rd_kafka_confval_t *confval, const char *name) {
        confval->name       = name;
        confval->is_enabled = 0;
}

/**
 * @brief Set confval's value to \p valuep, verifying the passed
 *        \p valuetype matches (or can be cast to) \p confval's type.
 *
 * @param dispname is the display name for the configuration value and is
 *        included in error strings.
 * @param valuep is a pointer to the value, or NULL to revert to default.
 *
 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the new value was set, or
 *          RD_KAFKA_RESP_ERR__INVALID_ARG if the value was of incorrect type,
 *          out of range, or otherwise not a valid value.
 */
rd_kafka_resp_err_t rd_kafka_confval_set_type(rd_kafka_confval_t *confval,
                                              rd_kafka_confval_type_t valuetype,
                                              const void *valuep,
                                              char *errstr,
                                              size_t errstr_size) {

        if (!confval->is_enabled) {
                rd_snprintf(errstr, errstr_size,
                            "\"%s\" is not supported for this operation",
                            confval->name);
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
        }

        switch (confval->valuetype) {
        case RD_KAFKA_CONFVAL_INT: {
                int v;
                const char *end;

                if (!valuep) {
                        /* Revert to default */
                        confval->u.INT.v = confval->u.INT.vdef;
                        confval->is_set  = 0;
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }

                switch (valuetype) {
                case RD_KAFKA_CONFVAL_INT:
                        v = *(const int *)valuep;
                        break;
                case RD_KAFKA_CONFVAL_STR:
                        v = (int)strtol((const char *)valuep, (char **)&end, 0);
                        if (end == (const char *)valuep) {
                                rd_snprintf(errstr, errstr_size,
                                            "Invalid value type for \"%s\": "
                                            "expecting integer",
                                            confval->name);
                                return RD_KAFKA_RESP_ERR__INVALID_TYPE;
                        }
                        break;
                default:
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting integer",
                                    confval->name);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }


                if ((confval->u.INT.vmin || confval->u.INT.vmax) &&
                    (v < confval->u.INT.vmin || v > confval->u.INT.vmax)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting integer in range %d..%d",
                                    confval->name, confval->u.INT.vmin,
                                    confval->u.INT.vmax);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                confval->u.INT.v = v;
                confval->is_set  = 1;
        } break;

        case RD_KAFKA_CONFVAL_STR: {
                size_t vlen;
                const char *v = (const char *)valuep;

                if (!valuep) {
                        confval->is_set = 0;
                        if (confval->u.STR.vdef)
                                confval->u.STR.v =
                                    rd_strdup(confval->u.STR.vdef);
                        else
                                confval->u.STR.v = NULL;
                }

                if (valuetype != RD_KAFKA_CONFVAL_STR) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value type for \"%s\": "
                                    "expecting string",
                                    confval->name);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                vlen = strlen(v);
                if ((confval->u.STR.minlen || confval->u.STR.maxlen) &&
                    (vlen < confval->u.STR.minlen ||
                     vlen > confval->u.STR.maxlen)) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid value for \"%s\": "
                                    "expecting string with length "
                                    "%" PRIusz "..%" PRIusz,
                                    confval->name, confval->u.STR.minlen,
                                    confval->u.STR.maxlen);
                        return RD_KAFKA_RESP_ERR__INVALID_ARG;
                }

                if (confval->u.STR.v)
                        rd_free(confval->u.STR.v);

                confval->u.STR.v = rd_strdup(v);
        } break;

        case RD_KAFKA_CONFVAL_PTR:
                confval->u.PTR = (void *)valuep;
                break;

        default:
                RD_NOTREACHED();
                return RD_KAFKA_RESP_ERR__NOENT;
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


int rd_kafka_confval_get_int(const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_INT);
        return confval->u.INT.v;
}


const char *rd_kafka_confval_get_str(const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_STR);
        return confval->u.STR.v;
}

void *rd_kafka_confval_get_ptr(const rd_kafka_confval_t *confval) {
        rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_PTR);
        return confval->u.PTR;
}


#define _is_alphanum(C)                                                        \
        (((C) >= 'a' && (C) <= 'z') || ((C) >= 'A' && (C) <= 'Z') ||           \
         ((C) >= '0' && (C) <= '9'))

/**
 * @returns true if the string is KIP-511 safe, else false.
 */
static rd_bool_t rd_kafka_sw_str_is_safe(const char *str) {
        const char *s;

        if (!*str)
                return rd_true;

        for (s = str; *s; s++) {
                int c = (int)*s;

                if (unlikely(!(_is_alphanum(c) || c == '-' || c == '.')))
                        return rd_false;
        }

        /* Verify that the string begins and ends with a-zA-Z0-9 */
        if (!_is_alphanum(*str))
                return rd_false;
        if (!_is_alphanum(*(s - 1)))
                return rd_false;

        return rd_true;
}


/**
 * @brief Sanitize KIP-511 software name/version strings in-place,
 *        replacing unaccepted characters with "-".
 *
 * @warning The \p str is modified in-place.
 */
static void rd_kafka_sw_str_sanitize_inplace(char *str) {
        char *s = str, *d = str;

        /* Strip any leading non-alphanums */
        while (!_is_alphanum(*s))
                s++;

        for (; *s; s++) {
                int c = (int)*s;

                if (unlikely(!(_is_alphanum(c) || c == '-' || c == '.')))
                        *d = '-';
                else
                        *d = *s;
                d++;
        }

        *d = '\0';

        /* Strip any trailing non-alphanums */
        for (d = d - 1; d >= str && !_is_alphanum(*d); d--)
                *d = '\0';
}

#undef _is_alphanum


/**
 * @brief Create a staggered array of key-value pairs from
 *        an array of "key=value" strings (typically from rd_string_split()).
 *
 * The output array will have element 0 being key0 and element 1 being
 * value0. Element 2 being key1 and element 3 being value1, and so on.
 * E.g.:
 *  input   { "key0=value0", "key1=value1" } incnt=2
 *  returns { "key0", "value0", "key1", "value1" } cntp=4
 *
 * @returns NULL on error (no '=' separator), or a newly allocated array
 *          on success. The array count is returned in \p cntp.
 *          The returned pointer must be freed with rd_free().
 */
char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp) {
        size_t i;
        char **out, *p;
        size_t lens   = 0;
        size_t outcnt = 0;

        /* First calculate total length needed for key-value strings. */
        for (i = 0; i < incnt; i++) {
                const char *t = strchr(input[i], '=');

                /* No "=", or "=" at beginning of string. */
                if (!t || t == input[i])
                        return NULL;

                /* Length of key, '=' (will be \0), value, and \0 */
                lens += strlen(input[i]) + 1;
        }

        /* Allocate array along with elements in one go */
        out = rd_malloc((sizeof(*out) * incnt * 2) + lens);
        p   = (char *)(&out[incnt * 2]);

        for (i = 0; i < incnt; i++) {
                const char *t   = strchr(input[i], '=');
                size_t namelen  = (size_t)(t - input[i]);
                size_t valuelen = strlen(t + 1);

                /* Copy name */
                out[outcnt++] = p;
                memcpy(p, input[i], namelen);
                p += namelen;
                *(p++) = '\0';

                /* Copy value */
                out[outcnt++] = p;
                memcpy(p, t + 1, valuelen + 1);
                p += valuelen;
                *(p++) = '\0';
        }


        *cntp = outcnt;
        return out;
}


/**
 * @brief Verify configuration \p conf is
 *        correct/non-conflicting and finalize the configuration
 *        settings for use.
 *
 * @returns an error string if configuration is incorrect, else NULL.
 */
const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
                                   rd_kafka_conf_t *conf) {
        const char *errstr;

        if (!conf->sw_name)
                rd_kafka_conf_set(conf, "client.software.name", "librdkafka",
                                  NULL, 0);
        if (!conf->sw_version)
                rd_kafka_conf_set(conf, "client.software.version",
                                  rd_kafka_version_str(), NULL, 0);

        /* The client.software.name and .version are sent to the broker
         * with the ApiVersionRequest starting with AK 2.4.0 (KIP-511).
         * These strings need to be sanitized or the broker will reject them,
         * so modify them in-place here. */
        rd_assert(conf->sw_name && conf->sw_version);
        rd_kafka_sw_str_sanitize_inplace(conf->sw_name);
        rd_kafka_sw_str_sanitize_inplace(conf->sw_version);

        /* Verify mandatory configuration */
        if (!conf->socket_cb)
                return "Mandatory config property `socket_cb` not set";

        if (!conf->open_cb)
                return "Mandatory config property `open_cb` not set";

#if WITH_SSL
        if (conf->ssl.keystore_location && !conf->ssl.keystore_password)
                return "`ssl.keystore.password` is mandatory when "
                       "`ssl.keystore.location` is set";
        if (conf->ssl.ca && (conf->ssl.ca_location || conf->ssl.ca_pem))
                return "`ssl.ca.location` or `ssl.ca.pem`, and memory-based "
                       "set_ssl_cert(CERT_CA) are mutually exclusive.";
#ifdef __APPLE__
        else if (!conf->ssl.ca && !conf->ssl.ca_location && !conf->ssl.ca_pem)
                /* Default ssl.ca.location to 'probe' on OSX */
                rd_kafka_conf_set(conf, "ssl.ca.location", "probe", NULL, 0);
#endif
#endif

#if WITH_SASL_OAUTHBEARER
        if (!rd_strcasecmp(conf->sasl.mechanisms, "OAUTHBEARER")) {
                if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
                    conf->sasl.oauthbearer.token_refresh_cb)
                        return "`enable.sasl.oauthbearer.unsecure.jwt` and "
                               "`oauthbearer_token_refresh_cb` are "
                               "mutually exclusive";

                if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
                    conf->sasl.oauthbearer.method ==
                        RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC)
                        return "`enable.sasl.oauthbearer.unsecure.jwt` and "
                               "`sasl.oauthbearer.method=oidc` are "
                               "mutually exclusive";

                if (conf->sasl.oauthbearer.method ==
                    RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC) {
                        if (!conf->sasl.oauthbearer.client_id)
                                return "`sasl.oauthbearer.client.id` is "
                                       "mandatory when "
                                       "`sasl.oauthbearer.method=oidc` is set";

                        if (!conf->sasl.oauthbearer.client_secret) {
                                return "`sasl.oauthbearer.client.secret` is "
                                       "mandatory when "
                                       "`sasl.oauthbearer.method=oidc` is set";
                        }

                        if (!conf->sasl.oauthbearer.token_endpoint_url) {
                                return "`sasl.oauthbearer.token.endpoint.url` "
                                       "is mandatory when "
                                       "`sasl.oauthbearer.method=oidc` is set";
                        }
                }

                /* Enable background thread for the builtin OIDC handler,
                 * unless a refresh callback has been set. */
                if (conf->sasl.oauthbearer.method ==
                        RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
                    !conf->sasl.oauthbearer.token_refresh_cb) {
                        conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
                        conf->sasl.enable_callback_queue = 1;
                }
        }

#endif

        if (cltype == RD_KAFKA_CONSUMER) {

                /* Automatically adjust `fetch.max.bytes` to be >=
                 * `message.max.bytes` and <= `queued.max.message.kbytes`
                 * unless set by user. */
                if (rd_kafka_conf_is_modified(conf, "fetch.max.bytes")) {
                        if (conf->fetch_max_bytes < conf->max_msg_size)
                                return "`fetch.max.bytes` must be >= "
                                       "`message.max.bytes`";
                } else {
                        conf->fetch_max_bytes =
                            RD_MAX(RD_MIN(conf->fetch_max_bytes,
                                          conf->queued_max_msg_kbytes * 1024),
                                   conf->max_msg_size);
                }

                /* Automatically adjust 'receive.message.max.bytes' to
                 * be 512 bytes larger than 'fetch.max.bytes' to have enough
                 * room for protocol framing (including topic name), unless
                 * set by user. */
                if (rd_kafka_conf_is_modified(conf,
                                              "receive.message.max.bytes")) {
                        if (conf->fetch_max_bytes + 512 >
                            conf->recv_max_msg_size)
                                return "`receive.message.max.bytes` must be >= "
                                       "`fetch.max.bytes` + 512";
                } else {
                        conf->recv_max_msg_size =
                            RD_MAX(conf->recv_max_msg_size,
                                   conf->fetch_max_bytes + 512);
                }

                if (conf->max_poll_interval_ms < conf->group_session_timeout_ms)
                        return "`max.poll.interval.ms`must be >= "
                               "`session.timeout.ms`";

                /* Simplifies rd_kafka_is_idempotent() which is producer-only */
                conf->eos.idempotence = 0;

        } else if (cltype == RD_KAFKA_PRODUCER) {
                if (conf->eos.transactional_id) {
                        if (!conf->eos.idempotence) {
                                /* Auto enable idempotence unless
                                 * explicitly disabled */
                                if (rd_kafka_conf_is_modified(
                                        conf, "enable.idempotence"))
                                        return "`transactional.id` requires "
                                               "`enable.idempotence=true`";

                                conf->eos.idempotence = rd_true;
                        }

                        /* Make sure at least one request can be sent
                         * before the transaction times out. */
                        if (!rd_kafka_conf_is_modified(conf,
                                                       "socket.timeout.ms"))
                                conf->socket_timeout_ms = RD_MAX(
                                    conf->eos.transaction_timeout_ms - 100,
                                    900);
                        else if (conf->eos.transaction_timeout_ms + 100 <
                                 conf->socket_timeout_ms)
                                return "`socket.timeout.ms` must be set <= "
                                       "`transaction.timeout.ms` + 100";
                }

                if (conf->eos.idempotence) {
                        /* Adjust configuration values for idempotent producer*/

                        if (rd_kafka_conf_is_modified(conf, "max.in.flight")) {
                                if (conf->max_inflight >
                                    RD_KAFKA_IDEMP_MAX_INFLIGHT)
                                        return "`max.in.flight` must be "
                                               "set "
                                               "<="
                                               " " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
                                               " when `enable.idempotence` "
                                               "is true";
                        } else {
                                conf->max_inflight =
                                    RD_MIN(conf->max_inflight,
                                           RD_KAFKA_IDEMP_MAX_INFLIGHT);
                        }


                        if (rd_kafka_conf_is_modified(conf, "retries")) {
                                if (conf->max_retries < 1)
                                        return "`retries` must be set >= 1 "
                                               "when `enable.idempotence` is "
                                               "true";
                        } else {
                                conf->max_retries = INT32_MAX;
                        }


                        if (rd_kafka_conf_is_modified(
                                conf,
                                "queue.buffering.backpressure.threshold") &&
                            conf->queue_backpressure_thres > 1)
                                return "`queue.buffering.backpressure."
                                       "threshold` "
                                       "must be set to 1 when "
                                       "`enable.idempotence` is true";
                        else
                                conf->queue_backpressure_thres = 1;

                        /* acks=all and queuing.strategy are set
                         * in topic_conf_finalize() */

                } else {
                        if (conf->eos.gapless &&
                            rd_kafka_conf_is_modified(
                                conf, "enable.gapless.guarantee"))
                                return "`enable.gapless.guarantee` requires "
                                       "`enable.idempotence` to be enabled";
                }

                if (!rd_kafka_conf_is_modified(conf,
                                               "sticky.partitioning.linger.ms"))
                        conf->sticky_partition_linger_ms = (int)RD_MIN(
                            900000, (rd_ts_t)(2 * conf->buffering_max_ms_dbl));
        }


        if (!rd_kafka_conf_is_modified(conf, "metadata.max.age.ms") &&
            conf->metadata_refresh_interval_ms > 0)
                conf->metadata_max_age_ms =
                    conf->metadata_refresh_interval_ms * 3;

        if (conf->reconnect_backoff_max_ms < conf->reconnect_backoff_ms)
                return "`reconnect.backoff.max.ms` must be >= "
                       "`reconnect.max.ms`";

        if (conf->sparse_connections) {
                /* Set sparse connection random selection interval to
                 * 10 < reconnect.backoff.ms / 2 < 1000. */
                conf->sparse_connect_intvl =
                    RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
        }
        if (!rd_kafka_conf_is_modified(
                conf, "topic.metadata.refresh.fast.interval.ms"))
                conf->metadata_refresh_fast_interval_ms =
                    conf->retry_backoff_ms;

        if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
            conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
                /* Issue #3109:
                 * Default connections.max.idle.ms to <4 minutes on Azure. */
                conf->connections_max_idle_ms = (4 * 60 - 10) * 1000;
        }

        if (!rd_kafka_conf_is_modified(conf, "allow.auto.create.topics")) {
                /* Consumer: Do not allow auto create by default.
                 * Producer: Allow auto create by default. */
                if (cltype == RD_KAFKA_CONSUMER)
                        conf->allow_auto_create_topics = rd_false;
                else if (cltype == RD_KAFKA_PRODUCER)
                        conf->allow_auto_create_topics = rd_true;
        }

        /* Finalize and verify the default.topic.config */
        if (conf->topic_conf) {

                if (cltype == RD_KAFKA_PRODUCER) {
                        rd_kafka_topic_conf_t *tconf = conf->topic_conf;

                        if (tconf->message_timeout_ms != 0 &&
                            (double)tconf->message_timeout_ms <=
                                conf->buffering_max_ms_dbl) {
                                if (rd_kafka_conf_is_modified(conf,
                                                              "linger.ms"))
                                        return "`message.timeout.ms` must be "
                                               "greater than `linger.ms`";
                                else /* Auto adjust linger.ms to be lower
                                      * than message.timeout.ms */
                                        conf->buffering_max_ms_dbl =
                                            (double)tconf->message_timeout_ms -
                                            0.1;
                        }
                }

                errstr = rd_kafka_topic_conf_finalize(cltype, conf,
                                                      conf->topic_conf);
                if (errstr)
                        return errstr;
        }

        /* Convert double linger.ms to internal int microseconds after
         * finalizing default_topic_conf since it may
         * update buffering_max_ms_dbl. */
        conf->buffering_max_us = (rd_ts_t)(conf->buffering_max_ms_dbl * 1000);


        return NULL;
}


/**
 * @brief Verify topic configuration \p tconf is
 *        correct/non-conflicting and finalize the configuration
 *        settings for use.
 *
 * @returns an error string if configuration is incorrect, else NULL.
 */
const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
                                         const rd_kafka_conf_t *conf,
                                         rd_kafka_topic_conf_t *tconf) {

        if (cltype != RD_KAFKA_PRODUCER)
                return NULL;

        if (conf->eos.idempotence) {
                /* Ensure acks=all */
                if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
                        if (tconf->required_acks != -1)
                                return "`acks` must be set to `all` when "
                                       "`enable.idempotence` is true";
                } else {
                        tconf->required_acks = -1; /* all */
                }

                /* Ensure FIFO queueing */
                if (rd_kafka_topic_conf_is_modified(tconf,
                                                    "queuing.strategy")) {
                        if (tconf->queuing_strategy != RD_KAFKA_QUEUE_FIFO)
                                return "`queuing.strategy` must be set to "
                                       "`fifo` when `enable.idempotence` is "
                                       "true";
                } else {
                        tconf->queuing_strategy = RD_KAFKA_QUEUE_FIFO;
                }

                /* Ensure message.timeout.ms <= transaction.timeout.ms */
                if (conf->eos.transactional_id) {
                        if (!rd_kafka_topic_conf_is_modified(
                                tconf, "message.timeout.ms"))
                                tconf->message_timeout_ms =
                                    conf->eos.transaction_timeout_ms;
                        else if (tconf->message_timeout_ms >
                                 conf->eos.transaction_timeout_ms)
                                return "`message.timeout.ms` must be set <= "
                                       "`transaction.timeout.ms`";
                }
        }

        if (tconf->message_timeout_ms != 0 &&
            (double)tconf->message_timeout_ms <= conf->buffering_max_ms_dbl &&
            rd_kafka_conf_is_modified(conf, "linger.ms"))
                return "`message.timeout.ms` must be greater than `linger.ms`";

        return NULL;
}


/**
 * @brief Log warnings for set deprecated or experimental
 *        configuration properties.
 * @returns the number of warnings logged.
 */
static int rd_kafka_anyconf_warn_deprecated(rd_kafka_t *rk,
                                            rd_kafka_conf_scope_t scope,
                                            const void *conf) {
        const struct rd_kafka_property *prop;
        int warn_type =
            rk->rk_type == RD_KAFKA_PRODUCER ? _RK_CONSUMER : _RK_PRODUCER;
        int warn_on = _RK_DEPRECATED | _RK_EXPERIMENTAL | warn_type;

        int cnt = 0;

        for (prop = rd_kafka_properties; prop->name; prop++) {
                int match = prop->scope & warn_on;

                if (likely(!(prop->scope & scope) || !match))
                        continue;

                if (likely(!rd_kafka_anyconf_is_modified(conf, prop)))
                        continue;

                if (match != warn_type)
                        rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                                     "Configuration property %s is %s%s%s: %s",
                                     prop->name,
                                     match & _RK_DEPRECATED ? "deprecated" : "",
                                     match == warn_on ? " and " : "",
                                     match & _RK_EXPERIMENTAL ? "experimental"
                                                              : "",
                                     prop->desc);

                if (match & warn_type)
                        rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                                     "Configuration property %s "
                                     "is a %s property and will be ignored by "
                                     "this %s instance",
                                     prop->name,
                                     warn_type == _RK_PRODUCER ? "producer"
                                                               : "consumer",
                                     warn_type == _RK_PRODUCER ? "consumer"
                                                               : "producer");

                cnt++;
        }

        return cnt;
}


/**
 * @brief Log configuration warnings (deprecated configuration properties,
 *        unrecommended combinations, etc).
 *
 * @returns the number of warnings logged.
 *
 * @locality any
 * @locks none
 */
int rd_kafka_conf_warn(rd_kafka_t *rk) {
        int cnt = 0;

        cnt = rd_kafka_anyconf_warn_deprecated(rk, _RK_GLOBAL, &rk->rk_conf);
        if (rk->rk_conf.topic_conf)
                cnt += rd_kafka_anyconf_warn_deprecated(rk, _RK_TOPIC,
                                                        rk->rk_conf.topic_conf);

        if (rk->rk_conf.warn.default_topic_conf_overwritten)
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Topic configuration properties set in the "
                             "global configuration were overwritten by "
                             "explicitly setting a default_topic_conf: "
                             "recommend not using set_default_topic_conf");

        /* Additional warnings */
        if (rk->rk_conf.retry_backoff_ms > rk->rk_conf.retry_backoff_max_ms) {
                rd_kafka_log(
                    rk, LOG_WARNING, "CONFWARN",
                    "Configuration `retry.backoff.ms` with value %d is greater "
                    "than configuration `retry.backoff.max.ms` with value %d. "
                    "A static backoff with value `retry.backoff.max.ms` will "
                    "be applied.",
                    rk->rk_conf.retry_backoff_ms,
                    rk->rk_conf.retry_backoff_max_ms);
        }

        if (rd_kafka_conf_is_modified(
                &rk->rk_conf, "topic.metadata.refresh.fast.interval.ms") &&
            rk->rk_conf.metadata_refresh_fast_interval_ms >
                rk->rk_conf.retry_backoff_max_ms) {
                rd_kafka_log(
                    rk, LOG_WARNING, "CONFWARN",
                    "Configuration `topic.metadata.refresh.fast.interval.ms` "
                    "with value %d is greater than configuration "
                    "`retry.backoff.max.ms` with value %d. "
                    "A static backoff with value `retry.backoff.max.ms` will "
                    "be applied.",
                    rk->rk_conf.metadata_refresh_fast_interval_ms,
                    rk->rk_conf.retry_backoff_max_ms);
        }
        if (rk->rk_type == RD_KAFKA_CONSUMER) {
                if (rk->rk_conf.fetch_wait_max_ms + 1000 >
                    rk->rk_conf.socket_timeout_ms)
                        rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                                     "Configuration property "
                                     "`fetch.wait.max.ms` (%d) should be "
                                     "set lower than `socket.timeout.ms` (%d) "
                                     "by at least 1000ms to avoid blocking "
                                     "and timing out sub-sequent requests",
                                     rk->rk_conf.fetch_wait_max_ms,
                                     rk->rk_conf.socket_timeout_ms);
        }

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.mechanisms") &&
            !(rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
              rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT)) {
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `sasl.mechanism` set to "
                             "`%s` but `security.protocol` is not configured "
                             "for SASL: recommend setting "
                             "`security.protocol` to SASL_SSL or "
                             "SASL_PLAINTEXT",
                             rk->rk_conf.sasl.mechanisms);
        }

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.username") &&
            !(!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM", 5) ||
              !strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `sasl.username` only "
                             "applies when `sasl.mechanism` is set to "
                             "PLAIN or SCRAM-SHA-..");

        if (rd_kafka_conf_is_modified(&rk->rk_conf, "client.software.name") &&
            !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_name))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `client.software.name` "
                             "may only contain 'a-zA-Z0-9.-', other characters "
                             "will be replaced with '-'");

        if (rd_kafka_conf_is_modified(&rk->rk_conf,
                                      "client.software.version") &&
            !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_version))
                rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
                             "Configuration property `client.software.verison` "
                             "may only contain 'a-zA-Z0-9.-', other characters "
                             "will be replaced with '-'");

        if (rd_atomic32_get(&rk->rk_broker_cnt) == 0)
                rd_kafka_log(rk, LOG_NOTICE, "CONFWARN",
                             "No `bootstrap.servers` configured: "
                             "client will not be able to connect "
                             "to Kafka cluster");

        return cnt;
}


const rd_kafka_conf_t *rd_kafka_conf(rd_kafka_t *rk) {
        return &rk->rk_conf;
}


/**
 * @brief Unittests
 */
int unittest_conf(void) {
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *tconf;
        rd_kafka_conf_res_t res, res2;
        char errstr[128];
        int iteration;
        const struct rd_kafka_property *prop;
        char readval[512];
        size_t readlen;
        const char *errstr2;

        conf  = rd_kafka_conf_new();
        tconf = rd_kafka_topic_conf_new();

        res = rd_kafka_conf_set(conf, "unknown.thing", "foo", errstr,
                                sizeof(errstr));
        RD_UT_ASSERT(res == RD_KAFKA_CONF_UNKNOWN, "fail");
        RD_UT_ASSERT(*errstr, "fail");

        for (iteration = 0; iteration < 5; iteration++) {
                int cnt;


                /* Iterations:
                 *  0 - Check is_modified
                 *  1 - Set every other config property, read back and verify.
                 *  2 - Check is_modified.
                 *  3 - Set all config properties, read back and verify.
                 *  4 - Check is_modified. */
                for (prop = rd_kafka_properties, cnt = 0; prop->name;
                     prop++, cnt++) {
                        const char *val;
                        char tmp[64];
                        int odd    = cnt & 1;
                        int do_set = iteration == 3 || (iteration == 1 && odd);
                        rd_bool_t is_modified;
                        int exp_is_modified =
                            !prop->unsupported &&
                            (iteration >= 3 ||
                             (iteration > 0 && (do_set || odd)));

                        readlen = sizeof(readval);

                        /* Avoid some special configs */
                        if (!strcmp(prop->name, "plugin.library.paths") ||
                            !strcmp(prop->name, "builtin.features"))
                                continue;

                        switch (prop->type) {
                        case _RK_C_STR:
                        case _RK_C_KSTR:
                        case _RK_C_PATLIST:
                                if (prop->sdef)
                                        val = prop->sdef;
                                else
                                        val = "test";
                                break;

                        case _RK_C_BOOL:
                                val = "true";
                                break;

                        case _RK_C_INT:
                                rd_snprintf(tmp, sizeof(tmp), "%d", prop->vdef);
                                val = tmp;
                                break;

                        case _RK_C_DBL:
                                rd_snprintf(tmp, sizeof(tmp), "%g", prop->ddef);
                                val = tmp;
                                break;

                        case _RK_C_S2F:
                        case _RK_C_S2I:
                                val = prop->s2i[0].str;
                                break;

                        case _RK_C_PTR:
                        case _RK_C_ALIAS:
                        case _RK_C_INVALID:
                        case _RK_C_INTERNAL:
                        default:
                                continue;
                        }


                        if (prop->scope & _RK_GLOBAL) {
                                if (do_set)
                                        res = rd_kafka_conf_set(
                                            conf, prop->name, val, errstr,
                                            sizeof(errstr));

                                res2 = rd_kafka_conf_get(conf, prop->name,
                                                         readval, &readlen);

                                is_modified =
                                    rd_kafka_conf_is_modified(conf, prop->name);


                        } else if (prop->scope & _RK_TOPIC) {
                                if (do_set)
                                        res = rd_kafka_topic_conf_set(
                                            tconf, prop->name, val, errstr,
                                            sizeof(errstr));

                                res2 = rd_kafka_topic_conf_get(
                                    tconf, prop->name, readval, &readlen);

                                is_modified = rd_kafka_topic_conf_is_modified(
                                    tconf, prop->name);

                        } else {
                                RD_NOTREACHED();
                        }



                        if (do_set && prop->unsupported) {
                                RD_UT_ASSERT(res == RD_KAFKA_CONF_INVALID,
                                             "conf_set %s should've failed "
                                             "with CONF_INVALID, not %d: %s",
                                             prop->name, res, errstr);

                        } else if (do_set) {
                                RD_UT_ASSERT(res == RD_KAFKA_CONF_OK,
                                             "conf_set %s failed: %d: %s",
                                             prop->name, res, errstr);
                                RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK,
                                             "conf_get %s failed: %d",
                                             prop->name, res2);

                                RD_UT_ASSERT(!strcmp(readval, val),
                                             "conf_get %s "
                                             "returned \"%s\": "
                                             "expected \"%s\"",
                                             prop->name, readval, val);

                                RD_UT_ASSERT(is_modified,
                                             "Property %s was set but "
                                             "is_modified=%d",
                                             prop->name, is_modified);
                        }

                        assert(is_modified == exp_is_modified);
                        RD_UT_ASSERT(is_modified == exp_is_modified,
                                     "Property %s is_modified=%d, "
                                     "exp_is_modified=%d "
                                     "(iter %d, odd %d, do_set %d)",
                                     prop->name, is_modified, exp_is_modified,
                                     iteration, odd, do_set);
                }
        }

        /* Set an alias and make sure is_modified() works for it. */
        res = rd_kafka_conf_set(conf, "max.in.flight", "19", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);

        RD_UT_ASSERT(rd_kafka_conf_is_modified(conf, "max.in.flight") ==
                         rd_true,
                     "fail");
        RD_UT_ASSERT(rd_kafka_conf_is_modified(
                         conf, "max.in.flight.requests.per.connection") ==
                         rd_true,
                     "fail");

        rd_kafka_conf_destroy(conf);
        rd_kafka_topic_conf_destroy(tconf);


        /* Verify that software.client.* string-safing works */
        conf = rd_kafka_conf_new();
        res  = rd_kafka_conf_set(conf, "client.software.name",
                                " .~aba. va! !.~~", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
        res = rd_kafka_conf_set(conf, "client.software.version",
                                "!1.2.3.4.5!!! a", NULL, 0);
        RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);

        errstr2 = rd_kafka_conf_finalize(RD_KAFKA_PRODUCER, conf);
        RD_UT_ASSERT(!errstr2, "conf_finalize() failed: %s", errstr2);

        readlen = sizeof(readval);
        res2 =
            rd_kafka_conf_get(conf, "client.software.name", readval, &readlen);
        RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2);
        RD_UT_ASSERT(!strcmp(readval, "aba.-va"),
                     "client.software.* safification failed: \"%s\"", readval);
        RD_UT_SAY("Safified client.software.name=\"%s\"", readval);

        readlen = sizeof(readval);
        res2    = rd_kafka_conf_get(conf, "client.software.version", readval,
                                 &readlen);
        RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2);
        RD_UT_ASSERT(!strcmp(readval, "1.2.3.4.5----a"),
                     "client.software.* safification failed: \"%s\"", readval);
        RD_UT_SAY("Safified client.software.version=\"%s\"", readval);

        rd_kafka_conf_destroy(conf);

        RD_UT_PASS();
}

/**@}*/
