/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2014-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_CONF_H_
#define _RDKAFKA_CONF_H_

#include "rdlist.h"
#include "rdkafka_cert.h"

#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 &&                        \
    !defined(OPENSSL_IS_BORINGSSL)
#define WITH_SSL_ENGINE 1
/* Deprecated in OpenSSL 3 */
#include <openssl/engine.h>
#endif /* WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 */

/**
 * Forward declarations
 */
struct rd_kafka_transport_s;


/**
 * MessageSet compression codecs
 */
typedef enum {
        RD_KAFKA_COMPRESSION_NONE,
        RD_KAFKA_COMPRESSION_GZIP   = RD_KAFKA_MSG_ATTR_GZIP,
        RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
        RD_KAFKA_COMPRESSION_LZ4    = RD_KAFKA_MSG_ATTR_LZ4,
        RD_KAFKA_COMPRESSION_ZSTD   = RD_KAFKA_MSG_ATTR_ZSTD,
        RD_KAFKA_COMPRESSION_INHERIT, /* Inherit setting from global conf */
        RD_KAFKA_COMPRESSION_NUM
} rd_kafka_compression_t;

static RD_INLINE RD_UNUSED const char *
rd_kafka_compression2str(rd_kafka_compression_t compr) {
        static const char *names[RD_KAFKA_COMPRESSION_NUM] = {
            [RD_KAFKA_COMPRESSION_NONE]    = "none",
            [RD_KAFKA_COMPRESSION_GZIP]    = "gzip",
            [RD_KAFKA_COMPRESSION_SNAPPY]  = "snappy",
            [RD_KAFKA_COMPRESSION_LZ4]     = "lz4",
            [RD_KAFKA_COMPRESSION_ZSTD]    = "zstd",
            [RD_KAFKA_COMPRESSION_INHERIT] = "inherit"};
        static RD_TLS char ret[32];

        if ((int)compr < 0 || compr >= RD_KAFKA_COMPRESSION_NUM) {
                rd_snprintf(ret, sizeof(ret), "codec0x%x?", (int)compr);
                return ret;
        }

        return names[compr];
}

/**
 * MessageSet compression levels
 */
typedef enum {
        RD_KAFKA_COMPLEVEL_DEFAULT    = -1,
        RD_KAFKA_COMPLEVEL_MIN        = -1,
        RD_KAFKA_COMPLEVEL_GZIP_MAX   = 9,
        RD_KAFKA_COMPLEVEL_LZ4_MAX    = 12,
        RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0,
        RD_KAFKA_COMPLEVEL_ZSTD_MAX   = 22,
        RD_KAFKA_COMPLEVEL_MAX        = 12
} rd_kafka_complevel_t;

typedef enum {
        RD_KAFKA_PROTO_PLAINTEXT,
        RD_KAFKA_PROTO_SSL,
        RD_KAFKA_PROTO_SASL_PLAINTEXT,
        RD_KAFKA_PROTO_SASL_SSL,
        RD_KAFKA_PROTO_NUM,
} rd_kafka_secproto_t;


typedef enum {
        RD_KAFKA_CONFIGURED,
        RD_KAFKA_LEARNED,
        RD_KAFKA_INTERNAL,
        RD_KAFKA_LOGICAL
} rd_kafka_confsource_t;

static RD_INLINE RD_UNUSED const char *
rd_kafka_confsource2str(rd_kafka_confsource_t source) {
        static const char *names[] = {"configured", "learned", "internal",
                                      "logical"};

        return names[source];
}


typedef enum {
        _RK_GLOBAL       = 0x1,
        _RK_PRODUCER     = 0x2,
        _RK_CONSUMER     = 0x4,
        _RK_TOPIC        = 0x8,
        _RK_CGRP         = 0x10,
        _RK_DEPRECATED   = 0x20,
        _RK_HIDDEN       = 0x40,
        _RK_HIGH         = 0x80,  /* High Importance */
        _RK_MED          = 0x100, /* Medium Importance */
        _RK_EXPERIMENTAL = 0x200, /* Experimental (unsupported) property */
        _RK_SENSITIVE    = 0x400  /* The configuration property's value
                                   * might contain sensitive information. */
} rd_kafka_conf_scope_t;

/**< While the client groups is a generic concept, it is currently
 *   only implemented for consumers in librdkafka. */
#define _RK_CGRP _RK_CONSUMER

typedef enum {
        _RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
        _RK_CONF_PROP_SET_ADD,     /* Add value (S2F) */
        _RK_CONF_PROP_SET_DEL      /* Remove value (S2F) */
} rd_kafka_conf_set_mode_t;



typedef enum {
        RD_KAFKA_OFFSET_METHOD_NONE,
        RD_KAFKA_OFFSET_METHOD_FILE,
        RD_KAFKA_OFFSET_METHOD_BROKER
} rd_kafka_offset_method_t;

typedef enum {
        RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
        RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC
} rd_kafka_oauthbearer_method_t;

typedef enum {
        RD_KAFKA_SSL_ENDPOINT_ID_NONE,
        RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
} rd_kafka_ssl_endpoint_id_t;

typedef enum {
        RD_KAFKA_USE_ALL_DNS_IPS,
        RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
} rd_kafka_client_dns_lookup_t;

/* Increase in steps of 64 as needed.
 * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)

/**
 * @struct rd_kafka_anyconf_t
 * @brief The anyconf header must be the first field in the
 *        rd_kafka_conf_t and rd_kafka_topic_conf_t structs.
 *        It provides a way to track which property has been modified.
 */
struct rd_kafka_anyconf_hdr {
        uint64_t modified[RD_KAFKA_CONF_PROPS_IDX_MAX / 64];
};


/**
 * Optional configuration struct passed to rd_kafka_new*().
 *
 * The struct is populated ted through string properties
 * by calling rd_kafka_conf_set().
 *
 */
struct rd_kafka_conf_s {
        struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */

        /*
         * Generic configuration
         */
        int enabled_events;
        int max_msg_size;
        int msg_copy_max_size;
        int recv_max_msg_size;
        int max_inflight;
        int metadata_request_timeout_ms;
        int metadata_refresh_interval_ms;
        int metadata_refresh_fast_cnt;
        int metadata_refresh_fast_interval_ms;
        int metadata_refresh_sparse;
        int metadata_max_age_ms;
        int metadata_propagation_max_ms;
        int debug;
        int broker_addr_ttl;
        int broker_addr_family;
        int socket_timeout_ms;
        int socket_blocking_max_ms;
        int socket_sndbuf_size;
        int socket_rcvbuf_size;
        int socket_keepalive;
        int socket_nagle_disable;
        int socket_max_fails;
        char *client_id_str;
        char *brokerlist;
        int stats_interval_ms;
        int term_sig;
        int reconnect_backoff_ms;
        int reconnect_backoff_max_ms;
        int reconnect_jitter_ms;
        int socket_connection_setup_timeout_ms;
        int connections_max_idle_ms;
        int sparse_connections;
        int sparse_connect_intvl;
        int api_version_request;
        int api_version_request_timeout_ms;
        int api_version_fallback_ms;
        char *broker_version_fallback;
        rd_kafka_secproto_t security_protocol;
        rd_kafka_client_dns_lookup_t client_dns_lookup;

        struct {
#if WITH_SSL
                SSL_CTX *ctx;
#endif
                char *cipher_suites;
                char *curves_list;
                char *sigalgs_list;
                char *key_location;
                char *key_pem;
                rd_kafka_cert_t *key;
                char *key_password;
                char *cert_location;
                char *cert_pem;
                rd_kafka_cert_t *cert;
                char *ca_location;
                char *ca_pem;
                rd_kafka_cert_t *ca;
                /** CSV list of Windows certificate stores */
                char *ca_cert_stores;
                char *crl_location;
#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000
                ENGINE *engine;
#endif
                char *engine_location;
                char *engine_id;
                void *engine_callback_data;
                char *providers;
                rd_list_t loaded_providers; /**< (SSL_PROVIDER*) */
                char *keystore_location;
                char *keystore_password;
                int endpoint_identification;
                int enable_verify;
                int (*cert_verify_cb)(rd_kafka_t *rk,
                                      const char *broker_name,
                                      int32_t broker_id,
                                      int *x509_error,
                                      int depth,
                                      const char *buf,
                                      size_t size,
                                      char *errstr,
                                      size_t errstr_size,
                                      void *opaque);
        } ssl;

        struct {
                const struct rd_kafka_sasl_provider *provider;
                char *principal;
                char *mechanisms;
                char *service_name;
                char *kinit_cmd;
                char *keytab;
                int relogin_min_time;
                /** Protects .username and .password access after client
                 *  instance has been created (see sasl_set_credentials()). */
                mtx_t lock;
                char *username;
                char *password;
#if WITH_SASL_SCRAM
                /* SCRAM EVP-wrapped hash function
                 * (return value from EVP_shaX()) */
                const void /*EVP_MD*/ *scram_evp;
                /* SCRAM direct hash function (e.g., SHA256()) */
                unsigned char *(*scram_H)(const unsigned char *d,
                                          size_t n,
                                          unsigned char *md);
                /* Hash size */
                size_t scram_H_size;
#endif
                char *oauthbearer_config;
                int enable_oauthbearer_unsecure_jwt;
                int enable_callback_queue;
                struct {
                        rd_kafka_oauthbearer_method_t method;
                        char *token_endpoint_url;
                        char *client_id;
                        char *client_secret;
                        char *scope;
                        char *extensions_str;
                        /* SASL/OAUTHBEARER token refresh event callback */
                        void (*token_refresh_cb)(rd_kafka_t *rk,
                                                 const char *oauthbearer_config,
                                                 void *opaque);
                } oauthbearer;
        } sasl;

        char *plugin_paths;
#if WITH_PLUGINS
        rd_list_t plugins;
#endif

        /* Interceptors */
        struct {
                /* rd_kafka_interceptor_method_t lists */
                rd_list_t on_conf_set;            /* on_conf_set interceptors
                                                   * (not copied on conf_dup()) */
                rd_list_t on_conf_dup;            /* .. (not copied) */
                rd_list_t on_conf_destroy;        /* .. (not copied) */
                rd_list_t on_new;                 /* .. (copied) */
                rd_list_t on_destroy;             /* .. (copied) */
                rd_list_t on_send;                /* .. (copied) */
                rd_list_t on_acknowledgement;     /* .. (copied) */
                rd_list_t on_consume;             /* .. (copied) */
                rd_list_t on_commit;              /* .. (copied) */
                rd_list_t on_request_sent;        /* .. (copied) */
                rd_list_t on_response_received;   /* .. (copied) */
                rd_list_t on_thread_start;        /* .. (copied) */
                rd_list_t on_thread_exit;         /* .. (copied) */
                rd_list_t on_broker_state_change; /* .. (copied) */

                /* rd_strtup_t list */
                rd_list_t config; /* Configuration name=val's
                                   * handled by interceptors. */
        } interceptors;

        /* Client group configuration */
        int coord_query_intvl_ms;
        int max_poll_interval_ms;

        int builtin_features;
        /*
         * Consumer configuration
         */
        int check_crcs;
        int queued_min_msgs;
        int queued_max_msg_kbytes;
        int64_t queued_max_msg_bytes;
        int fetch_wait_max_ms;
        int fetch_msg_max_bytes;
        int fetch_max_bytes;
        int fetch_min_bytes;
        int fetch_queue_backoff_ms;
        int fetch_error_backoff_ms;
        char *group_id_str;
        char *group_instance_id;
        int allow_auto_create_topics;

        rd_kafka_pattern_list_t *topic_blacklist;
        struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
                                                   * for automatically
                                                   * subscribed topics. */
        int enable_auto_commit;
        int enable_auto_offset_store;
        int auto_commit_interval_ms;
        int group_session_timeout_ms;
        int group_heartbeat_intvl_ms;
        rd_kafkap_str_t *group_protocol_type;
        char *partition_assignment_strategy;
        rd_list_t partition_assignors;
        int enabled_assignor_cnt;

        void (*rebalance_cb)(rd_kafka_t *rk,
                             rd_kafka_resp_err_t err,
                             rd_kafka_topic_partition_list_t *partitions,
                             void *opaque);

        void (*offset_commit_cb)(rd_kafka_t *rk,
                                 rd_kafka_resp_err_t err,
                                 rd_kafka_topic_partition_list_t *offsets,
                                 void *opaque);

        rd_kafka_offset_method_t offset_store_method;

        rd_kafka_isolation_level_t isolation_level;

        int enable_partition_eof;

        rd_kafkap_str_t *client_rack;

        /*
         * Producer configuration
         */
        struct {
                /*
                 * Idempotence
                 */
                int idempotence;   /**< Enable Idempotent Producer */
                rd_bool_t gapless; /**< Raise fatal error if
                                    *   gapless guarantee can't be
                                    *   satisfied. */
                /*
                 * Transactions
                 */
                char *transactional_id;     /**< Transactional Id */
                int transaction_timeout_ms; /**< Transaction timeout */
        } eos;
        int queue_buffering_max_msgs;
        int queue_buffering_max_kbytes;
        double buffering_max_ms_dbl; /**< This is the configured value */
        rd_ts_t buffering_max_us;    /**< This is the value used in the code */
        int queue_backpressure_thres;
        int max_retries;
        int retry_backoff_ms;
        int retry_backoff_max_ms;
        int batch_num_messages;
        int batch_size;
        rd_kafka_compression_t compression_codec;
        int dr_err_only;
        int sticky_partition_linger_ms;

        /* Message delivery report callback.
         * Called once for each produced message, either on
         * successful and acknowledged delivery to the broker in which
         * case 'err' is 0, or if the message could not be delivered
         * in which case 'err' is non-zero (use rd_kafka_err2str()
         * to obtain a human-readable error reason).
         *
         * If the message was produced with neither RD_KAFKA_MSG_F_FREE
         * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
         * pointer provided to rd_kafka_produce().
         * rdkafka will not perform any further actions on 'payload'
         * at this point and the application may rd_free the payload data
         * at this point.
         *
         * 'opaque' is 'conf.opaque', while 'msg_opaque' is
         * the opaque pointer provided in the rd_kafka_produce() call.
         */
        void (*dr_cb)(rd_kafka_t *rk,
                      void *payload,
                      size_t len,
                      rd_kafka_resp_err_t err,
                      void *opaque,
                      void *msg_opaque);

        void (*dr_msg_cb)(rd_kafka_t *rk,
                          const rd_kafka_message_t *rkmessage,
                          void *opaque);

        /* Consume callback */
        void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque);

        /* Log callback */
        void (*log_cb)(const rd_kafka_t *rk,
                       int level,
                       const char *fac,
                       const char *buf);
        int log_level;
        int log_queue;
        int log_thread_name;
        int log_connection_close;

        /* PRNG seeding */
        int enable_random_seed;

        /* Error callback */
        void (*error_cb)(rd_kafka_t *rk,
                         int err,
                         const char *reason,
                         void *opaque);

        /* Throttle callback */
        void (*throttle_cb)(rd_kafka_t *rk,
                            const char *broker_name,
                            int32_t broker_id,
                            int throttle_time_ms,
                            void *opaque);

        /* Stats callback */
        int (*stats_cb)(rd_kafka_t *rk,
                        char *json,
                        size_t json_len,
                        void *opaque);

        /* Socket creation callback */
        int (*socket_cb)(int domain, int type, int protocol, void *opaque);

        /* Connect callback */
        int (*connect_cb)(int sockfd,
                          const struct sockaddr *addr,
                          int addrlen,
                          const char *id,
                          void *opaque);

        /* Close socket callback */
        int (*closesocket_cb)(int sockfd, void *opaque);

        /* File open callback */
        int (*open_cb)(const char *pathname,
                       int flags,
                       mode_t mode,
                       void *opaque);

        /* Address resolution callback */
        int (*resolve_cb)(const char *node,
                          const char *service,
                          const struct addrinfo *hints,
                          struct addrinfo **res,
                          void *opaque);

        /* Background queue event callback */
        void (*background_event_cb)(rd_kafka_t *rk,
                                    rd_kafka_event_t *rkev,
                                    void *opaque);


        /* Opaque passed to callbacks. */
        void *opaque;

        /* For use with value-less properties. */
        int dummy;


        /* Admin client defaults */
        struct {
                int request_timeout_ms; /* AdminOptions.request_timeout */
        } admin;


        /*
         * Test mocks
         */
        struct {
                int broker_cnt; /**< Number of mock brokers */
                int broker_rtt; /**< Broker RTT */
        } mock;

        /*
         * Unit test pluggable interfaces
         */
        struct {
                /**< Inject errors in ProduceResponse handler */
                rd_kafka_resp_err_t (*handle_ProduceResponse)(
                    rd_kafka_t *rk,
                    int32_t brokerid,
                    uint64_t msgid,
                    rd_kafka_resp_err_t err);
        } ut;

        char *sw_name;    /**< Software/client name */
        char *sw_version; /**< Software/client version */

        struct {
                /** Properties on (implicit pass-thru) default_topic_conf were
                 *  overwritten by passing an explicit default_topic_conf. */
                rd_bool_t default_topic_conf_overwritten;
        } warn;
};

int rd_kafka_socket_cb_linux(int domain, int type, int protocol, void *opaque);
int rd_kafka_socket_cb_generic(int domain,
                               int type,
                               int protocol,
                               void *opaque);
#ifndef _WIN32
int rd_kafka_open_cb_linux(const char *pathname,
                           int flags,
                           mode_t mode,
                           void *opaque);
#endif
int rd_kafka_open_cb_generic(const char *pathname,
                             int flags,
                             mode_t mode,
                             void *opaque);



struct rd_kafka_topic_conf_s {
        struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */

        int required_acks;
        int32_t request_timeout_ms;
        int message_timeout_ms;

        int32_t (*partitioner)(const rd_kafka_topic_t *rkt,
                               const void *keydata,
                               size_t keylen,
                               int32_t partition_cnt,
                               void *rkt_opaque,
                               void *msg_opaque);
        char *partitioner_str;

        rd_bool_t random_partitioner; /**< rd_true - random
                                       *  rd_false - sticky */

        int queuing_strategy; /* RD_KAFKA_QUEUE_FIFO|LIFO */
        int (*msg_order_cmp)(const void *a, const void *b);

        rd_kafka_compression_t compression_codec;
        rd_kafka_complevel_t compression_level;
        int produce_offset_report;

        int consume_callback_max_msgs;
        int auto_commit;
        int auto_commit_interval_ms;
        int auto_offset_reset;
        char *offset_store_path;
        int offset_store_sync_interval_ms;

        rd_kafka_offset_method_t offset_store_method;

        /* Application provided opaque pointer (this is rkt_opaque) */
        void *opaque;
};


char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp);

void rd_kafka_anyconf_destroy(int scope, void *conf);

rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,
                                    const char *name);

void rd_kafka_desensitize_str(char *str);

void rd_kafka_conf_desensitize(rd_kafka_conf_t *conf);
void rd_kafka_topic_conf_desensitize(rd_kafka_topic_conf_t *tconf);

const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
                                   rd_kafka_conf_t *conf);
const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
                                         const rd_kafka_conf_t *conf,
                                         rd_kafka_topic_conf_t *tconf);


int rd_kafka_conf_warn(rd_kafka_t *rk);

void rd_kafka_anyconf_dump_dbg(rd_kafka_t *rk,
                               int scope,
                               const void *conf,
                               const char *description);

#include "rdkafka_confval.h"

int unittest_conf(void);

#endif /* _RDKAFKA_CONF_H_ */
