/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2013, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_INT_H_
#define _RDKAFKA_INT_H_

#ifndef _WIN32
#define _GNU_SOURCE  /* for strndup() */
#endif

#ifdef _MSC_VER
typedef int mode_t;
#endif

#include <fcntl.h>


#include "rdsysqueue.h"

#include "rdkafka.h"
#include "rd.h"
#include "rdlog.h"
#include "rdtime.h"
#include "rdaddr.h"
#include "rdinterval.h"
#include "rdavg.h"
#include "rdlist.h"

#if WITH_SSL
#include <openssl/ssl.h>
#endif




#define rd_kafka_assert(rk, cond) do {                                  \
                if (unlikely(!(cond)))                                  \
                        rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
                                       (rk), "assert: " # cond);        \
        } while (0)


void
RD_NORETURN
rd_kafka_crash (const char *file, int line, const char *function,
                rd_kafka_t *rk, const char *reason);


/* Forward declarations */
struct rd_kafka_s;
struct rd_kafka_topic_s;
struct rd_kafka_msg_s;
struct rd_kafka_broker_s;
struct rd_kafka_toppar_s;

typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;


#include "rdkafka_op.h"
#include "rdkafka_queue.h"
#include "rdkafka_msg.h"
#include "rdkafka_proto.h"
#include "rdkafka_buf.h"
#include "rdkafka_pattern.h"
#include "rdkafka_conf.h"
#include "rdkafka_transport.h"
#include "rdkafka_timer.h"
#include "rdkafka_assignor.h"
#include "rdkafka_metadata.h"
#include "rdkafka_mock.h"
#include "rdkafka_partition.h"
#include "rdkafka_assignment.h"
#include "rdkafka_coord.h"
#include "rdkafka_mock.h"

/**
 * Protocol level sanity
 */
#define RD_KAFKAP_BROKERS_MAX     10000
#define RD_KAFKAP_TOPICS_MAX      1000000
#define RD_KAFKAP_PARTITIONS_MAX  100000


#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF)  ((OFF) < 0)




/**
 * @enum Idempotent Producer state
 */
typedef enum {
        RD_KAFKA_IDEMP_STATE_INIT,      /**< Initial state */
        RD_KAFKA_IDEMP_STATE_TERM,      /**< Instance is terminating */
        RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */
        RD_KAFKA_IDEMP_STATE_REQ_PID,   /**< Request new PID */
        RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to
                                              *   become available. */
        RD_KAFKA_IDEMP_STATE_WAIT_PID,  /**< PID requested, waiting for reply */
        RD_KAFKA_IDEMP_STATE_ASSIGNED,  /**< New PID assigned */
        RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding
                                           *   ProduceRequests to finish
                                           *   before resetting and
                                           *   re-requesting a new PID. */
        RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding
                                          *   ProduceRequests to finish
                                          *   before bumping the current
                                          *   epoch. */
} rd_kafka_idemp_state_t;

/**
 * @returns the idemp_state_t string representation
 */
static RD_UNUSED const char *
rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) {
        static const char *names[] = {
                "Init",
                "Terminate",
                "FatalError",
                "RequestPID",
                "WaitTransport",
                "WaitPID",
                "Assigned",
                "DrainReset",
                "DrainBump"
        };
        return names[state];
}




/**
 * @enum Transactional Producer state
 */
typedef enum {
        /**< Initial state */
        RD_KAFKA_TXN_STATE_INIT,
        /**< Awaiting PID to be acquired by rdkafka_idempotence.c */
        RD_KAFKA_TXN_STATE_WAIT_PID,
        /**< PID acquired, but application has not made a successful
         *   init_transactions() call. */
        RD_KAFKA_TXN_STATE_READY_NOT_ACKED,
        /**< PID acquired, no active transaction. */
        RD_KAFKA_TXN_STATE_READY,
        /**< begin_transaction() has been called. */
        RD_KAFKA_TXN_STATE_IN_TRANSACTION,
        /**< commit_transaction() has been called. */
        RD_KAFKA_TXN_STATE_BEGIN_COMMIT,
        /**< commit_transaction() has been called and all outstanding
         *   messages, partitions, and offsets have been sent. */
        RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION,
        /**< Transaction successfully committed but application has not made
         *   a successful commit_transaction() call yet. */
        RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED,
        /**< abort_transaction() has been called. */
        RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
        /**< Transaction successfully aborted but application has not made
         *   a successful abort_transaction() call yet. */
        RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED,
        /**< An abortable error has occurred. */
        RD_KAFKA_TXN_STATE_ABORTABLE_ERROR,
        /* A fatal error has occured. */
        RD_KAFKA_TXN_STATE_FATAL_ERROR
} rd_kafka_txn_state_t;


/**
 * @returns the txn_state_t string representation
 */
static RD_UNUSED const char *
rd_kafka_txn_state2str (rd_kafka_txn_state_t state) {
        static const char *names[] = {
                "Init",
                "WaitPID",
                "ReadyNotAcked",
                "Ready",
                "InTransaction",
                "BeginCommit",
                "CommittingTransaction",
                "CommitNotAcked",
                "AbortingTransaction",
                "AbortedNotAcked",
                "AbortableError",
                "FatalError"
        };
        return names[state];
}





/**
 * Kafka handle, internal representation of the application's rd_kafka_t.
 */

struct rd_kafka_s {
	rd_kafka_q_t *rk_rep;   /* kafka -> application reply queue */
	rd_kafka_q_t *rk_ops;   /* any -> rdkafka main thread ops */

	TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
        rd_list_t                  rk_broker_by_id; /* Fast id lookups. */
	rd_atomic32_t              rk_broker_cnt;
        /**< Number of brokers in state >= UP */
        rd_atomic32_t              rk_broker_up_cnt;
        /**< Number of logical brokers in state >= UP, this is a sub-set
         *   of rk_broker_up_cnt. */
        rd_atomic32_t              rk_logical_broker_up_cnt;
        /**< Number of brokers that are down, only includes brokers
         *   that have had at least one connection attempt. */
	rd_atomic32_t              rk_broker_down_cnt;
        /**< Logical brokers currently without an address.
         *   Used for calculating ERR__ALL_BROKERS_DOWN. */
        rd_atomic32_t              rk_broker_addrless_cnt;

        mtx_t                      rk_internal_rkb_lock;
	rd_kafka_broker_t         *rk_internal_rkb;

	/* Broadcasting of broker state changes to wake up
	 * functions waiting for a state change. */
	cnd_t                      rk_broker_state_change_cnd;
	mtx_t                      rk_broker_state_change_lock;
	int                        rk_broker_state_change_version;
        /* List of (rd_kafka_enq_once_t*) objects waiting for broker
         * state changes. Protected by rk_broker_state_change_lock. */
        rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */

	TAILQ_HEAD(, rd_kafka_topic_s)  rk_topics;
	int              rk_topic_cnt;

        struct rd_kafka_cgrp_s *rk_cgrp;

        rd_kafka_conf_t  rk_conf;
        rd_kafka_q_t    *rk_logq;          /* Log queue if `log.queue` set */
        char             rk_name[128];
	rd_kafkap_str_t *rk_client_id;
        rd_kafkap_str_t *rk_group_id;    /* Consumer group id */

	rd_atomic32_t    rk_terminate;   /**< Set to RD_KAFKA_DESTROY_F_..
                                          *   flags instance
                                          *   is being destroyed.
                                          *   The value set is the
                                          *   destroy flags from
                                          *   rd_kafka_destroy*() and
                                          *   the two internal flags shown
                                          *   below.
                                          *
                                          * Order:
                                          * 1. user_flags | .._F_DESTROY_CALLED
                                          *    is set in rd_kafka_destroy*().
                                          * 2. consumer_close() is called
                                          *    for consumers.
                                          * 3. .._F_TERMINATE is set to
                                          *    signal all background threads
                                          *    to terminate.
                                          */

#define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure
                                          *   rk_terminate is set to non-zero
                                          *   value even if user passed
                                          *   no destroy flags. */
#define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called
                                               *  ..destroy*() and we've
                                               * begun the termination
                                               * process.
                                               * This flag is needed to avoid
                                               * rk_terminate from being
                                               * 0 when destroy_flags()
                                               * is called with flags=0
                                               * and prior to _F_TERMINATE
                                               * has been set. */
#define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4     /**< Immediate non-blocking
                                              *   destruction without waiting
                                              *   for all resources
                                              *   to be cleaned up.
                                              *   WARNING: Memory and resource
                                              *            leaks possible.
                                              *   This flag automatically sets
                                              *   .._NO_CONSUMER_CLOSE. */


	rwlock_t         rk_lock;
	rd_kafka_type_t  rk_type;
	struct timeval   rk_tv_state_change;

        rd_atomic64_t    rk_ts_last_poll;   /**< Timestamp of last application
                                             *   consumer_poll() call
                                             *   (or equivalent).
                                             *   Used to enforce
                                             *   max.poll.interval.ms.
                                             *   Only relevant for consumer. */
        /* First fatal error. */
        struct {
                rd_atomic32_t err; /**< rd_kafka_resp_err_t */
                char *errstr;      /**< Protected by rk_lock */
                int cnt;           /**< Number of errors raised, only
                                    *   the first one is stored. */
        } rk_fatal;

	rd_atomic32_t    rk_last_throttle;  /* Last throttle_time_ms value
					     * from broker. */

        /* Locks: rd_kafka_*lock() */
        rd_ts_t          rk_ts_metadata;    /* Timestamp of most recent
                                             * metadata. */

	struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
	rd_ts_t          rk_ts_full_metadata;       /* Timesstamp of .. */
        struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */

        char            *rk_clusterid;      /* ClusterId from metadata */
        int32_t          rk_controllerid;   /* ControllerId from metadata */

        /**< Producer: Delivery report mode */
        enum {
                RD_KAFKA_DR_MODE_NONE,  /**< No delivery reports */
                RD_KAFKA_DR_MODE_CB,    /**< Delivery reports through callback */
                RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/
        } rk_drmode;

        /* Simple consumer count:
         *  >0: Running in legacy / Simple Consumer mode,
         *   0: No consumers running
         *  <0: Running in High level consumer mode */
        rd_atomic32_t    rk_simple_cnt;

        /**
         * Exactly Once Semantics and Idempotent Producer
         *
         * @locks rk_lock
         */
        struct {
                /*
                 * Idempotence
                 */
                rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer
                                                     *   state */
                rd_ts_t ts_idemp_state;/**< Last state change */
                rd_kafka_pid_t pid;    /**< Current Producer ID and Epoch */
                int epoch_cnt;         /**< Number of times pid/epoch changed */
                rd_atomic32_t inflight_toppar_cnt; /**< Current number of
                                                    *   toppars with inflight
                                                    *   requests. */
                rd_kafka_timer_t pid_tmr; /**< PID FSM timer */

                /*
                 * Transactions
                 *
                 * All field access is from the rdkafka main thread,
                 * unless a specific lock is mentioned in the doc string.
                 *
                 */
                rd_atomic32_t txn_may_enq;      /**< Transaction state allows
                                                 *   application to enqueue
                                                 *   (produce) messages. */

                rd_kafkap_str_t *transactional_id; /**< transactional.id */
                rd_kafka_txn_state_t txn_state; /**< Transactional state.
                                                 *   @locks rk_lock */
                rd_ts_t ts_txn_state;           /**< Last state change.
                                                 *   @locks rk_lock */
                rd_kafka_broker_t *txn_coord;   /**< Transaction coordinator,
                                                 *   this is a logical broker.*/
                rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord
                                                    *   broker.
                                                    *   This is only used to
                                                    *   check if the coord
                                                    *   changes. */
                rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for
                                                          *   coordinator to
                                                          *   take action when
                                                          *   the broker state
                                                          *   changes. */
                rd_bool_t txn_requires_epoch_bump; /**< Coordinator epoch bump
                                                    *   required to recover from
                                                    *   idempotent producer
                                                    *   fatal error. */

                /**< Blocking transactional API application call
                 *   currently being handled, its state, reply queue and how
                 *   to handle timeout.
                 *   Only one transactional API call is allowed at any time.
                 *   Protected by the rk_lock. */
                struct {
                        char name[64];        /**< API name, e.g.,
                                               *   SendOffsetsToTransaction */
                        rd_kafka_timer_t tmr; /**< Timeout timer, the timeout
                                               * is specified by the app. */

                        int flags;            /**< Flags */
#define RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT 0x1 /**< Set state to abortable
                                                      *   error on timeout,
                                                      *   i.e., fail the txn,
                                                      *   and set txn_requires_abort
                                                      *   on the returned error.
                                                      */
#define RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT 0x2 /**< Set retriable flag
                                                          *   on the error
                                                          *   on timeout. */
#define RD_KAFKA_TXN_CURR_API_F_FOR_REUSE 0x4        /**< Do not reset the
                                                      *   current API when it
                                                      *   completes successfully
                                                      *   Instead keep it alive
                                                      *   and allow reuse with
                                                      *   .._F_REUSE, blocking
                                                      *   any non-F_REUSE
                                                      *   curr API calls. */
#define RD_KAFKA_TXN_CURR_API_F_REUSE     0x8        /**< Reuse/continue with
                                                      *   current API state.
                                                      *   This is used for
                                                      *   multi-stage APIs,
                                                      *   such as txn commit. */
                } txn_curr_api;

                /**< Copy (and reference) of the original init_transactions(),
                 *   but out-lives the timeout of the curr API.
                 *   This is used as the reply queue for when the
                 *   black box idempotent producer has acquired the
                 *   initial PID (or fails to do so).
                 *   Since that acquisition may take longer than the
                 *   init_transactions() API timeout this extra reference
                 *   needs to be kept around.
                 *   If the originating init_transactions() call has timed
                 *   out and returned this queue reference simply points
                 *   to a disabled queue that will discard any ops enqueued.
                 *
                 *   @locks rk_lock
                 */
                rd_kafka_q_t *txn_init_rkq;

                int txn_req_cnt;                /**< Number of transaction
                                                 *   requests sent.
                                                 *   This is incremented when a
                                                 *   AddPartitionsToTxn or
                                                 *   AddOffsetsToTxn request
                                                 *   has been sent for the
                                                 *   current transaction,
                                                 *   to keep track of
                                                 *   whether the broker is
                                                 *   aware of the current
                                                 *   transaction and thus
                                                 *   requires an EndTxn request
                                                 *   on abort or not. */

                /**< Timer to trigger registration of pending partitions */
                rd_kafka_timer_t         txn_register_parts_tmr;

                /**< Lock for txn_pending_rktps and txn_waitresp_rktps */
                mtx_t                    txn_pending_lock;

                /**< Partitions pending being added to transaction. */
                rd_kafka_toppar_tqhead_t txn_pending_rktps;

                /**< Partitions in-flight added to transaction. */
                rd_kafka_toppar_tqhead_t txn_waitresp_rktps;

                /**< Partitions added and registered to transaction. */
                rd_kafka_toppar_tqhead_t txn_rktps;

                /**< Number of messages that failed delivery.
                 *   If this number is >0 on transaction_commit then an
                 *   abortable transaction error will be raised.
                 *   Is reset to zero on each begin_transaction(). */
                rd_atomic64_t txn_dr_fails;

                /**< Current transaction error. */
                rd_kafka_resp_err_t txn_err;

                /**< Current transaction error string, if any. */
                char               *txn_errstr;

                /**< Last InitProducerIdRequest error. */
                rd_kafka_resp_err_t txn_init_err;

                /**< Waiting for transaction coordinator query response */
                rd_bool_t           txn_wait_coord;

                /**< Transaction coordinator query timer */
                rd_kafka_timer_t    txn_coord_tmr;
        } rk_eos;


        /**
         * Consumer state
         *
         * @locality rdkafka main thread
         * @locks_required none
         */
        struct {
                /** Application consumer queue for messages, events and errors.
                 *  (typically points to rkcg_q) */
                rd_kafka_q_t *q;
                /** Current assigned partitions through assign() et.al. */
                rd_kafka_assignment_t assignment;
                /** Waiting for this number of commits to finish. */
                int wait_commit_cnt;
        } rk_consumer;

        /**<
         * Coordinator cache.
         *
         * @locks none
         * @locality rdkafka main thread
         */
        rd_kafka_coord_cache_t   rk_coord_cache; /**< Coordinator cache */

        TAILQ_HEAD(, rd_kafka_coord_req_s) rk_coord_reqs; /**< Coordinator
                                                           *   requests */


	struct {
		mtx_t lock;       /* Protects acces to this struct */
		cnd_t cnd;        /* For waking up blocking injectors */
		unsigned int cnt; /* Current message count */
		size_t size;      /* Current message size sum */
	        unsigned int max_cnt; /* Max limit */
		size_t max_size; /* Max limit */
	} rk_curr_msgs;

        rd_kafka_timers_t rk_timers;
	thrd_t rk_thread;

        int rk_initialized;       /**< Will be > 0 when the rd_kafka_t
                                   *   instance has been fully initialized. */

        int   rk_init_wait_cnt;   /**< Number of background threads that
                                   *   need to finish initialization. */
        cnd_t rk_init_cnd;        /**< Cond-var used to wait for main thread
                                   *   to finish its initialization before
                                   *   before rd_kafka_new() returns. */
        mtx_t rk_init_lock;       /**< Lock for rk_init_wait and _cmd */

        rd_ts_t rk_ts_created;    /**< Timestamp (monotonic clock) of
                                   *   rd_kafka_t creation. */

        /**
         * Background thread and queue,
         * enabled by setting `background_event_cb()`.
         */
        struct {
                rd_kafka_q_t *q;  /**< Queue served by background thread. */
                thrd_t thread;    /**< Background thread. */
                int calling;      /**< Indicates whether the event callback
                                   *   is being called, reset back to 0
                                   *   when the callback returns.
                                   *   This can be used for troubleshooting
                                   *   purposes. */
        } rk_background;


        /*
         * Logs, events or actions to rate limit / suppress
         */
        struct {
                /**< Log: No brokers support Idempotent Producer */
                rd_interval_t no_idemp_brokers;

                /**< Sparse connections: randomly select broker
                 *   to bring up. This interval should allow
                 *   for a previous connection to be established,
                 *   which varies between different environments:
                 *   Use 10 < reconnect.backoff.jitter.ms / 2 < 1000.
                 */
                rd_interval_t sparse_connect_random;
                /**< Lock for sparse_connect_random */
                mtx_t         sparse_connect_lock;

                /**< Broker metadata refresh interval:
                 *   this is rate-limiting the number of topic-less
                 *   broker/cluster metadata refreshes when there are no
                 *   topics to refresh.
                 *   Will be refreshed every topic.metadata.refresh.interval.ms
                 *   but no more often than every 10s.
                 *   No locks: only accessed by rdkafka main thread. */
                rd_interval_t broker_metadata_refresh;

                /**< Suppression for allow.auto.create.topics=false not being
                 *   supported by the broker. */
                rd_interval_t allow_auto_create_topics;
        } rk_suppress;

        struct {
                void *handle; /**< Provider-specific handle struct pointer.
                               *   Typically assigned in provider's .init() */
        } rk_sasl;

        /* Test mocks */
        struct {
                rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created
                                                   *   by test.mock.num.brokers
                                                   */
                rd_atomic32_t cluster_cnt;        /**< Total number of mock
                                                   *   clusters, created either
                                                   *   through
                                                   *   test.mock.num.brokers
                                                   *   or mock_cluster_new().
                                                   */

        } rk_mock;
};

#define rd_kafka_wrlock(rk)    rwlock_wrlock(&(rk)->rk_lock)
#define rd_kafka_rdlock(rk)    rwlock_rdlock(&(rk)->rk_lock)
#define rd_kafka_rdunlock(rk)    rwlock_rdunlock(&(rk)->rk_lock)
#define rd_kafka_wrunlock(rk)    rwlock_wrunlock(&(rk)->rk_lock)


/**
 * @brief Add \p cnt messages and of total size \p size bytes to the
 *        internal bookkeeping of current message counts.
 *        If the total message count or size after add would exceed the
 *        configured limits \c queue.buffering.max.messages and
 *        \c queue.buffering.max.kbytes then depending on the value of
 *        \p block the function either blocks until enough space is available
 *        if \p block is 1, else immediately returns
 *        RD_KAFKA_RESP_ERR__QUEUE_FULL.
 *
 * @param rdmtx If non-null and \p block is set and blocking is to ensue,
 *              then unlock this mutex for the duration of the blocking
 *              and then reacquire with a read-lock.
 */
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
			int block, rwlock_t *rdlock) {

	if (rk->rk_type != RD_KAFKA_PRODUCER)
		return RD_KAFKA_RESP_ERR_NO_ERROR;

	mtx_lock(&rk->rk_curr_msgs.lock);
	while (unlikely(rk->rk_curr_msgs.cnt + cnt >
			rk->rk_curr_msgs.max_cnt ||
			(unsigned long long)(rk->rk_curr_msgs.size + size) >
			(unsigned long long)rk->rk_curr_msgs.max_size)) {
		if (!block) {
			mtx_unlock(&rk->rk_curr_msgs.lock);
			return RD_KAFKA_RESP_ERR__QUEUE_FULL;
		}

                if (rdlock)
                        rwlock_rdunlock(rdlock);

		cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);

                if (rdlock)
                        rwlock_rdlock(rdlock);

	}

	rk->rk_curr_msgs.cnt  += cnt;
	rk->rk_curr_msgs.size += size;
	mtx_unlock(&rk->rk_curr_msgs.lock);

	return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief Subtract \p cnt messages of total size \p size from the
 *        current bookkeeping and broadcast a wakeup on the condvar
 *        for any waiting & blocking threads.
 */
static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
        int broadcast = 0;

	if (rk->rk_type != RD_KAFKA_PRODUCER)
		return;

	mtx_lock(&rk->rk_curr_msgs.lock);
	rd_kafka_assert(NULL,
			rk->rk_curr_msgs.cnt >= cnt &&
			rk->rk_curr_msgs.size >= size);

        /* If the subtraction would pass one of the thresholds
         * broadcast a wake-up to any waiting listeners. */
        if ((rk->rk_curr_msgs.cnt - cnt == 0) ||
            (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
             rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
            (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
             rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
                broadcast = 1;

	rk->rk_curr_msgs.cnt  -= cnt;
	rk->rk_curr_msgs.size -= size;

        if (unlikely(broadcast))
                cnd_broadcast(&rk->rk_curr_msgs.cnd);

	mtx_unlock(&rk->rk_curr_msgs.lock);
}

static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
	if (rk->rk_type != RD_KAFKA_PRODUCER) {
		*cntp = 0;
		*sizep = 0;
		return;
	}

	mtx_lock(&rk->rk_curr_msgs.lock);
	*cntp = rk->rk_curr_msgs.cnt;
	*sizep = rk->rk_curr_msgs.size;
	mtx_unlock(&rk->rk_curr_msgs.lock);
}

static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
	int cnt;
	if (rk->rk_type != RD_KAFKA_PRODUCER)
		return 0;

	mtx_lock(&rk->rk_curr_msgs.lock);
	cnt = rk->rk_curr_msgs.cnt;
	mtx_unlock(&rk->rk_curr_msgs.lock);

	return cnt;
}

/**
 * @brief Wait until \p tspec for curr_msgs to reach 0.
 *
 * @returns rd_true if zero is reached, or rd_false on timeout.
 *          The remaining messages are returned in \p *curr_msgsp
 */
static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_curr_msgs_wait_zero (rd_kafka_t *rk, int timeout_ms,
                              unsigned int *curr_msgsp) {
        unsigned int cnt;
        struct timespec tspec;

        rd_timeout_init_timespec(&tspec, timeout_ms);

        mtx_lock(&rk->rk_curr_msgs.lock);
        while ((cnt = rk->rk_curr_msgs.cnt) > 0) {
                if (cnd_timedwait_abs(&rk->rk_curr_msgs.cnd,
                                      &rk->rk_curr_msgs.lock,
                                      &tspec) == thrd_timedout)
                        break;
        }
        mtx_unlock(&rk->rk_curr_msgs.lock);

        *curr_msgsp = cnt;
        return cnt == 0;
}

void rd_kafka_destroy_final (rd_kafka_t *rk);

void rd_kafka_global_init (void);

/**
 * @returns true if \p rk handle is terminating.
 *
 * @remark If consumer_close() is called from destroy*() it will be
 *         called prior to _F_TERMINATE being set and will thus not
 *         be able to use rd_kafka_terminating() to know it is shutting down.
 *         That code should instead just check that rk_terminate is non-zero
 *         (the _F_DESTROY_CALLED flag will be set).
 */
#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \
                                  RD_KAFKA_DESTROY_F_TERMINATE)

/**
 * @returns the destroy flags set matching \p flags, which might be
 *          a subset of the flags.
 */
#define rd_kafka_destroy_flags_check(rk,flags) \
        (rd_atomic32_get(&(rk)->rk_terminate) & (flags))

/**
 * @returns true if no consumer callbacks, or standard consumer_close
 *          behaviour, should be triggered. */
#define rd_kafka_destroy_flags_no_consumer_close(rk) \
        rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)

#define rd_kafka_is_simple_consumer(rk) \
        (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
int rd_kafka_simple_consumer_add (rd_kafka_t *rk);


/**
 * @returns true if idempotency is enabled (producer only).
 */
#define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence)

/**
 * @returns true if the producer is transactional (producer only).
 */
#define rd_kafka_is_transactional(rk)                   \
        ((rk)->rk_conf.eos.transactional_id != NULL)


#define RD_KAFKA_PURGE_F_ABORT_TXN 0x100  /**< Internal flag used when
                                           *   aborting transaction */
#define RD_KAFKA_PURGE_F_MASK 0x107
const char *rd_kafka_purge_flags2str (int flags);


#include "rdkafka_topic.h"
#include "rdkafka_partition.h"














/**
 * Debug contexts
 */
#define RD_KAFKA_DBG_GENERIC        0x1
#define RD_KAFKA_DBG_BROKER         0x2
#define RD_KAFKA_DBG_TOPIC          0x4
#define RD_KAFKA_DBG_METADATA       0x8
#define RD_KAFKA_DBG_FEATURE        0x10
#define RD_KAFKA_DBG_QUEUE          0x20
#define RD_KAFKA_DBG_MSG            0x40
#define RD_KAFKA_DBG_PROTOCOL       0x80
#define RD_KAFKA_DBG_CGRP           0x100
#define RD_KAFKA_DBG_SECURITY       0x200
#define RD_KAFKA_DBG_FETCH          0x400
#define RD_KAFKA_DBG_INTERCEPTOR    0x800
#define RD_KAFKA_DBG_PLUGIN         0x1000
#define RD_KAFKA_DBG_CONSUMER       0x2000
#define RD_KAFKA_DBG_ADMIN          0x4000
#define RD_KAFKA_DBG_EOS            0x8000
#define RD_KAFKA_DBG_MOCK           0x10000
#define RD_KAFKA_DBG_ASSIGNOR       0x20000
#define RD_KAFKA_DBG_CONF           0x40000
#define RD_KAFKA_DBG_ALL            0xfffff
#define RD_KAFKA_DBG_NONE           0x0


void rd_kafka_log0(const rd_kafka_conf_t *conf,
                   const rd_kafka_t *rk, const char *extra, int level,
                   int ctx,
                   const char *fac, const char *fmt, ...) RD_FORMAT(printf,
                                                                    7, 8);

#define rd_kafka_log(rk,level,fac,...)               \
        rd_kafka_log0(&rk->rk_conf, rk, NULL, level, \
                RD_KAFKA_DBG_NONE, fac, __VA_ARGS__)

#define rd_kafka_dbg(rk,ctx,fac,...) do {                                   \
                if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
                        rd_kafka_log0(&rk->rk_conf,rk,NULL,                 \
                                      LOG_DEBUG,(RD_KAFKA_DBG_ ## ctx),     \
                                      fac,__VA_ARGS__);                     \
        } while (0)

/* dbg() not requiring an rk, just the conf object, for early logging */
#define rd_kafka_dbg0(conf,ctx,fac,...) do {                            \
                if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx)))   \
                        rd_kafka_log0(conf,NULL,NULL,                   \
                                      LOG_DEBUG,(RD_KAFKA_DBG_ ## ctx), \
                                      fac,__VA_ARGS__);                 \
        } while (0)

/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
 *       when logging another broker's name in the message. */
#define rd_rkb_log0(rkb,level,ctx,fac,...) do {                           \
        char _logname[RD_KAFKA_NODENAME_SIZE];                            \
                mtx_lock(&(rkb)->rkb_logname_lock);                       \
                rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \
                mtx_unlock(&(rkb)->rkb_logname_lock);                     \
        rd_kafka_log0(&(rkb)->rkb_rk->rk_conf,                            \
                              (rkb)->rkb_rk, _logname,                    \
                              level, ctx, fac, __VA_ARGS__);              \
        } while (0)

#define rd_rkb_log(rkb,level,fac,...) \
        rd_rkb_log0(rkb,level,RD_KAFKA_DBG_NONE,fac, __VA_ARGS__)

#define rd_rkb_dbg(rkb,ctx,fac,...) do {                       \
        if (unlikely((rkb)->rkb_rk->rk_conf.debug &            \
                 (RD_KAFKA_DBG_ ## ctx))) {                    \
            rd_rkb_log0(rkb, LOG_DEBUG,(RD_KAFKA_DBG_ ## ctx), \
                    fac, __VA_ARGS__);                         \
                }                                              \
        } while (0)



extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;

static RD_UNUSED RD_INLINE
rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
					     int errnox) {
        if (errnox) {
                /* MSVC:
                 * This is the correct way to set errno on Windows,
                 * but it is still pointless due to different errnos in
                 * in different runtimes:
                 * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
                 * errno is thus highly deprecated, and buggy, on Windows
                 * when using librdkafka as a dynamically loaded DLL. */
                rd_set_errno(errnox);
        }
	rd_kafka_last_error_code = err;
	return err;
}


int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
                               rd_kafka_resp_err_t err,
                               const char *fmt, ...) RD_FORMAT(printf, 4, 5);
#define rd_kafka_set_fatal_error(rk,err,fmt,...)                        \
        rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__)

static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_fatal_error_code (rd_kafka_t *rk) {
        /* This is an optimization to avoid an atomic read which are costly
         * on some platforms:
         * Fatal errors are currently only raised by the idempotent producer
         * and static consumers (group.instance.id). */
        if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
            (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id))
                return rd_atomic32_get(&rk->rk_fatal.err);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


extern rd_atomic32_t rd_kafka_thread_cnt_curr;
extern char RD_TLS rd_kafka_thread_name[64];

void rd_kafka_set_thread_name (const char *fmt, ...) RD_FORMAT(printf, 1, 2);
void rd_kafka_set_thread_sysname (const char *fmt, ...) RD_FORMAT(printf, 1, 2);

int rd_kafka_path_is_dir (const char *path);
rd_bool_t rd_kafka_dir_is_empty (const char *path);

rd_kafka_op_res_t
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                  rd_kafka_q_cb_type_t cb_type, void *opaque);

rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_topic_t *rkt);


/**
 * @returns the number of milliseconds the maximum poll interval
 *          was exceeded, or 0 if not exceeded.
 *
 * @remark Only relevant for high-level consumer.
 *
 * @locality any
 * @locks none
 */
static RD_INLINE RD_UNUSED int
rd_kafka_max_poll_exceeded (rd_kafka_t *rk) {
        rd_ts_t last_poll;
        int exceeded;

        if (rk->rk_type != RD_KAFKA_CONSUMER)
                return 0;

        last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);

        /* Application is blocked in librdkafka function, see
         * rd_kafka_app_poll_blocking(). */
        if (last_poll == INT64_MAX)
                return 0;

        exceeded = (int)((rd_clock() - last_poll) / 1000ll) -
                rk->rk_conf.max_poll_interval_ms;

        if (unlikely(exceeded > 0))
                return exceeded;

        return 0;
}

/**
 * @brief Call on entry to blocking polling function to indicate
 *        that the application is blocked waiting for librdkafka
 *        and that max.poll.interval.ms should not be enforced.
 *
 *        Call app_polled() Upon return from the function calling
 *        this function to register the application's last time of poll.
 *
 * @remark Only relevant for high-level consumer.
 *
 * @locality any
 * @locks none
 */
static RD_INLINE RD_UNUSED void
rd_kafka_app_poll_blocking (rd_kafka_t *rk) {
        if (rk->rk_type == RD_KAFKA_CONSUMER)
                rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
}

/**
 * @brief Set the last application poll time to now.
 *
 * @remark Only relevant for high-level consumer.
 *
 * @locality any
 * @locks none
 */
static RD_INLINE RD_UNUSED void
rd_kafka_app_polled (rd_kafka_t *rk) {
        if (rk->rk_type == RD_KAFKA_CONSUMER)
                rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
}


/**
 * rdkafka_background.c
 */
int rd_kafka_background_thread_main (void *arg);

#endif /* _RDKAFKA_INT_H_ */
