/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2013, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#define _GNU_SOURCE
#include <errno.h>
#include <string.h>
#include <stdarg.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/stat.h>
#if !_WIN32
#include <sys/types.h>
#include <dirent.h>
#endif

#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_broker.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_offset.h"
#include "rdkafka_transport.h"
#include "rdkafka_cgrp.h"
#include "rdkafka_assignor.h"
#include "rdkafka_request.h"
#include "rdkafka_event.h"
#include "rdkafka_sasl.h"
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_sasl_oauthbearer.h"
#if WITH_SSL
#include "rdkafka_ssl.h"
#endif

#include "rdtime.h"
#include "crc32c.h"
#include "rdunittest.h"

#ifdef _WIN32
#include <sys/types.h>
#include <sys/timeb.h>
#endif



static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;

/**
 * @brief Global counter+lock for all active librdkafka instances
 */
mtx_t rd_kafka_global_lock;
int rd_kafka_global_cnt;


/**
 * Last API error code, per thread.
 * Shared among all rd_kafka_t instances.
 */
rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;


/**
 * Current number of threads created by rdkafka.
 * This is used in regression tests.
 */
rd_atomic32_t rd_kafka_thread_cnt_curr;
int rd_kafka_thread_cnt (void) {
	return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
}

/**
 * Current thread's log name (TLS)
 */
char RD_TLS rd_kafka_thread_name[64] = "app";

void rd_kafka_set_thread_name (const char *fmt, ...) {
        va_list ap;

        va_start(ap, fmt);
        rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name),
                     fmt, ap);
        va_end(ap);
}

/**
 * @brief Current thread's system name (TLS)
 *
 * Note the name must be 15 characters or less, because it is passed to
 * pthread_setname_np on Linux which imposes this limit.
 */
static char RD_TLS rd_kafka_thread_sysname[16] = "app";

void rd_kafka_set_thread_sysname (const char *fmt, ...) {
        va_list ap;

        va_start(ap, fmt);
        rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
                     fmt, ap);
        va_end(ap);

        thrd_setname(rd_kafka_thread_sysname);
}

static void rd_kafka_global_init0 (void) {
	mtx_init(&rd_kafka_global_lock, mtx_plain);
#if ENABLE_DEVEL
	rd_atomic32_init(&rd_kafka_op_cnt, 0);
#endif
        crc32c_global_init();
#if WITH_SSL
        /* The configuration interface might need to use
         * OpenSSL to parse keys, prior to any rd_kafka_t
         * object has been created. */
        rd_kafka_ssl_init();
#endif
}

/**
 * @brief Initialize once per process
 */
void rd_kafka_global_init (void) {
        call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
}


/**
 * @brief Seed the PRNG with current_time.milliseconds
 */
static void rd_kafka_global_srand (void) {
	struct timeval tv;

	rd_gettimeofday(&tv, NULL);

        srand((unsigned int)(tv.tv_usec / 1000));
}


/**
 * @returns the current number of active librdkafka instances
 */
static int rd_kafka_global_cnt_get (void) {
	int r;
	mtx_lock(&rd_kafka_global_lock);
	r = rd_kafka_global_cnt;
	mtx_unlock(&rd_kafka_global_lock);
	return r;
}


/**
 * @brief Increase counter for active librdkafka instances.
 * If this is the first instance the global constructors will be called, if any.
 */
static void rd_kafka_global_cnt_incr (void) {
	mtx_lock(&rd_kafka_global_lock);
	rd_kafka_global_cnt++;
	if (rd_kafka_global_cnt == 1) {
		rd_kafka_transport_init();
#if WITH_SSL
                rd_kafka_ssl_init();
#endif
                rd_kafka_sasl_global_init();
	}
	mtx_unlock(&rd_kafka_global_lock);
}

/**
 * @brief Decrease counter for active librdkafka instances.
 * If this counter reaches 0 the global destructors will be called, if any.
 */
static void rd_kafka_global_cnt_decr (void) {
	mtx_lock(&rd_kafka_global_lock);
	rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
	rd_kafka_global_cnt--;
	if (rd_kafka_global_cnt == 0) {
                rd_kafka_sasl_global_term();
#if WITH_SSL
                rd_kafka_ssl_term();
#endif
	}
	mtx_unlock(&rd_kafka_global_lock);
}


/**
 * Wait for all rd_kafka_t objects to be destroyed.
 * Returns 0 if all kafka objects are now destroyed, or -1 if the
 * timeout was reached.
 */
int rd_kafka_wait_destroyed (int timeout_ms) {
	rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);

	while (rd_kafka_thread_cnt() > 0 ||
	       rd_kafka_global_cnt_get() > 0) {
		if (rd_clock() >= timeout) {
			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
						ETIMEDOUT);
			return -1;
		}
		rd_usleep(25000, NULL); /* 25ms */
	}

	return 0;
}

static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
                              const rd_kafka_t *rk, int level, int ctx,
                              const char *fac, const char *buf) {
        if (level > conf->log_level)
                return;
        else if (rk && conf->log_queue) {
                rd_kafka_op_t *rko;

                if (!rk->rk_logq)
                        return; /* Terminating */

                rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
                rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
                rko->rko_u.log.level = level;
                rd_strlcpy(rko->rko_u.log.fac, fac, sizeof(rko->rko_u.log.fac));
                rko->rko_u.log.str = rd_strdup(buf);
                rko->rko_u.log.ctx = ctx;
                rd_kafka_q_enq(rk->rk_logq, rko);

        } else if (conf->log_cb) {
                conf->log_cb(rk, level, fac, buf);
        }
}

/**
 * @brief Logger
 *
 * @remark conf must be set, but rk may be NULL
 */
void rd_kafka_log0 (const rd_kafka_conf_t *conf,
                    const rd_kafka_t *rk,
                    const char *extra, int level, int ctx,
                    const char *fac, const char *fmt, ...) {
	char buf[2048];
	va_list ap;
	unsigned int elen = 0;
        unsigned int of = 0;

	if (level > conf->log_level)
		return;

	if (conf->log_thread_name) {
		elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
				   rd_kafka_thread_name);
		if (unlikely(elen >= sizeof(buf)))
			elen = sizeof(buf);
		of = elen;
	}

	if (extra) {
		elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
		if (unlikely(elen >= sizeof(buf)-of))
			elen = sizeof(buf)-of;
                of += elen;
	}

	va_start(ap, fmt);
	rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
	va_end(ap);

        rd_kafka_log_buf(conf, rk, level, ctx, fac, buf);
}

rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token (rd_kafka_t *rk,
                                const char *token_value,
                                int64_t md_lifetime_ms,
                                const char *md_principal_name,
                                const char **extensions, size_t extension_size,
                                char *errstr, size_t errstr_size) {
#if WITH_SASL_OAUTHBEARER
        return rd_kafka_oauthbearer_set_token0(
                rk, token_value,
                md_lifetime_ms, md_principal_name, extensions, extension_size,
                errstr, errstr_size);
#else
        rd_snprintf(errstr, errstr_size,
                    "librdkafka not built with SASL OAUTHBEARER support");
        return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
#endif
}

rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr) {
#if WITH_SASL_OAUTHBEARER
        return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
#else
        return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
#endif
}

void rd_kafka_log_print(const rd_kafka_t *rk, int level,
	const char *fac, const char *buf) {
	int secs, msecs;
	struct timeval tv;
	rd_gettimeofday(&tv, NULL);
	secs = (int)tv.tv_sec;
	msecs = (int)(tv.tv_usec / 1000);
	fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
		level, secs, msecs,
		fac, rk ? rk->rk_name : "", buf);
}

void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
			  const char *fac, const char *buf) {
#if WITH_SYSLOG
	static int initialized = 0;

	if (!initialized)
		openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);

	syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
#else
        rd_assert(!*"syslog support not enabled in this build");
#endif
}

void rd_kafka_set_logger (rd_kafka_t *rk,
			  void (*func) (const rd_kafka_t *rk, int level,
					const char *fac, const char *buf)) {
#if !WITH_SYSLOG
        if (func == rd_kafka_log_syslog)
                rd_assert(!*"syslog support not enabled in this build");
#endif
	rk->rk_conf.log_cb = func;
}

void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
	rk->rk_conf.log_level = level;
}






static const char *rd_kafka_type2str (rd_kafka_type_t type) {
	static const char *types[] = {
		[RD_KAFKA_PRODUCER] = "producer",
		[RD_KAFKA_CONSUMER] = "consumer",
	};
	return types[type];
}

#define _ERR_DESC(ENUM,DESC) \
	[ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, &(# ENUM)[18]/*pfx*/, DESC }

static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
	_ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
		  "Local: Bad message format"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
		  "Local: Invalid compressed data"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
		  "Local: Broker handle destroyed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
		  "Local: Communication failure with broker"), //FIXME: too specific
	_ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
		  "Local: Broker transport failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
		  "Local: Critical system resource failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
		  "Local: Host resolution failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
		  "Local: Message timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
		  "Broker: No more messages"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
		  "Local: Unknown partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__FS,
		  "Local: File or filesystem error"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
		  "Local: Unknown topic"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
		  "Local: All broker connections are down"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
		  "Local: Invalid argument or configuration"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
		  "Local: Timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
		  "Local: Queue full"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
		  "Local: ISR count insufficient"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
		  "Local: Broker node update"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
		  "Local: SSL error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
		  "Local: Waiting for coordinator"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
		  "Local: Unknown group"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
		  "Local: Operation in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
		  "Local: Previous operation in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
		  "Local: Existing subscription"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
		  "Local: Assign partitions"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
		  "Local: Revoke partitions"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
		  "Local: Conflicting use"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
		  "Local: Erroneous state"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
		  "Local: Unknown protocol"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
		  "Local: Not implemented"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
		  "Local: Authentication failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
		  "Local: No offset stored"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
		  "Local: Outdated"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
		  "Local: Timed out in queue"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
                  "Local: Required feature not supported by broker"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
                  "Local: Awaiting cache update"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
                  "Local: Operation interrupted"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
                  "Local: Key serialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
                  "Local: Value serialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
                  "Local: Key deserialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
                  "Local: Value deserialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
                  "Local: Partial response"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
                  "Local: Read-only object"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
                  "Local: No such entry"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW,
                  "Local: Read underflow"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE,
                  "Local: Invalid type"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY,
                  "Local: Retry operation"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE,
                  "Local: Purged in queue"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT,
                  "Local: Purged in flight"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL,
                  "Local: Fatal error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT,
                  "Local: Inconsistent state"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
                  "Local: Gap-less ordering would not be guaranteed "
                  "if proceeding"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
                  "Local: Maximum application poll interval "
                  "(max.poll.interval.ms) exceeded"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_BROKER,
                  "Local: Unknown broker"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
                  "Local: Functionality not configured"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__FENCED,
                  "Local: This instance has been fenced by a newer instance"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__APPLICATION,
                  "Local: Application generated error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST,
                  "Local: Group partition assignment lost"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOOP,
                  "Local: No operation performed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET,
                  "Local: No offset to automatically reset to"),

	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
		  "Unknown broker error"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
		  "Success"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
		  "Broker: Offset out of range"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
		  "Broker: Invalid message"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
		  "Broker: Unknown topic or partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
		  "Broker: Invalid message size"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
		  "Broker: Leader not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
		  "Broker: Not leader for partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
		  "Broker: Request timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
		  "Broker: Broker not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
		  "Broker: Replica not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
		  "Broker: Message size too large"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
		  "Broker: StaleControllerEpochCode"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
		  "Broker: Offset metadata string too large"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
		  "Broker: Broker disconnected before response received"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
		  "Broker: Coordinator load in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
		  "Broker: Coordinator not available"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
		  "Broker: Not coordinator"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
		  "Broker: Invalid topic"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
		  "Broker: Message batch larger than configured server "
		  "segment size"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
		  "Broker: Not enough in-sync replicas"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
		  "Broker: Message(s) written to insufficient number of "
		  "in-sync replicas"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
		  "Broker: Invalid required acks value"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
		  "Broker: Specified group generation id is not valid"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
		  "Broker: Inconsistent group protocol"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
		  "Broker: Invalid group.id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
		  "Broker: Unknown member"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
		  "Broker: Invalid session timeout"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
		  "Broker: Group rebalance in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
		  "Broker: Commit offset data size is not valid"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
		  "Broker: Topic authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
		  "Broker: Group authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
		  "Broker: Cluster authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
		  "Broker: Invalid timestamp"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
		  "Broker: Unsupported SASL mechanism"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
		  "Broker: Request not valid in current SASL state"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
		  "Broker: API version not supported"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
		  "Broker: Topic already exists"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
		  "Broker: Invalid number of partitions"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
		  "Broker: Invalid replication factor"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
		  "Broker: Invalid replica assignment"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
		  "Broker: Configuration is invalid"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
		  "Broker: Not controller for cluster"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
		  "Broker: Invalid request"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
		  "Broker: Message format on broker does not support request"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
                  "Broker: Policy violation"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
                  "Broker: Broker received an out of order sequence number"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
                  "Broker: Broker received a duplicate sequence number"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
                  "Broker: Producer attempted an operation with an old epoch"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
                  "Broker: Producer attempted a transactional operation in "
                  "an invalid state"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
                  "Broker: Producer attempted to use a producer id which is "
                  "not currently assigned to its transactional id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
                  "Broker: Transaction timeout is larger than the maximum "
                  "value allowed by the broker's max.transaction.timeout.ms"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                  "Broker: Producer attempted to update a transaction while "
                  "another concurrent operation on the same transaction was "
                  "ongoing"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
                  "Broker: Indicates that the transaction coordinator sending "
                  "a WriteTxnMarker is no longer the current coordinator for "
                  "a given producer"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
                  "Broker: Transactional Id authorization failed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
                  "Broker: Security features are disabled"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
                  "Broker: Operation not attempted"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
                  "Broker: Disk error when trying to access log file on disk"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
                  "Broker: The user-specified log directory is not found "
                  "in the broker config"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
                  "Broker: SASL Authentication failed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
                  "Broker: Unknown Producer Id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
                  "Broker: Partition reassignment is in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
                  "Broker: Delegation Token feature is not enabled"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
                  "Broker: Delegation Token is not found on server"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
                  "Broker: Specified Principal is not valid Owner/Renewer"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
                  "Broker: Delegation Token requests are not allowed on "
                  "this connection"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
                  "Broker: Delegation Token authorization failed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
                  "Broker: Delegation Token is expired"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
                  "Broker: Supplied principalType is not supported"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
                  "Broker: The group is not empty"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
                  "Broker: The group id does not exist"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
                  "Broker: The fetch session ID was not found"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
                  "Broker: The fetch session epoch is invalid"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
                  "Broker: No matching listener"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
                  "Broker: Topic deletion is disabled"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
                  "Broker: Leader epoch is older than broker epoch"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
                  "Broker: Leader epoch is newer than broker epoch"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
                  "Broker: Unsupported compression type"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
                  "Broker: Broker epoch has changed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
                  "Broker: Leader high watermark is not caught up"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
                  "Broker: Group member needs a valid member ID"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
                  "Broker: Preferred leader was not available"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
                  "Broker: Consumer group has reached maximum size"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
                  "Broker: Static consumer fenced by other consumer with same "
                  "group.instance.id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE,
                  "Broker: Eligible partition leaders are not available"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED,
                  "Broker: Leader election not needed for topic partition"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS,
                  "Broker: No partition reassignment is in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC,
                  "Broker: Deleting offsets of a topic while the consumer "
                  "group is subscribed to it"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD,
                  "Broker: Broker failed to validate record"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                  "Broker: There are unstable offsets that need to be cleared"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED,
                  "Broker: Throttling quota has been exceeded"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_PRODUCER_FENCED,
                  "Broker: There is a newer producer with the same "
                  "transactionalId which fences the current one"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND,
                  "Broker: Request illegally referred to resource that "
                  "does not exist"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE,
                  "Broker: Request illegally referred to the same resource "
                  "twice"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL,
                  "Broker: Requested credential would not meet criteria for "
                  "acceptability"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET,
                  "Broker: Indicates that the either the sender or recipient "
                  "of a voter-only request is not one of the expected voters"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION,
                  "Broker: Invalid update version"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED,
                  "Broker: Unable to update finalized features due to "
                  "server error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
                  "Broker: Request principal deserialization failed during "
                  "forwarding"),

	_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
};


void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
			     size_t *cntp) {
	*errdescs = rd_kafka_err_descs;
	*cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
}


const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
	static RD_TLS char ret[32];
	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;

	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
		     !rd_kafka_err_descs[idx].desc)) {
		rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
		return ret;
	}

	return rd_kafka_err_descs[idx].desc;
}


const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
	static RD_TLS char ret[32];
	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;

	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
		     !rd_kafka_err_descs[idx].desc)) {
		rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
		return ret;
	}

	return rd_kafka_err_descs[idx].name;
}


rd_kafka_resp_err_t rd_kafka_last_error (void) {
	return rd_kafka_last_error_code;
}


rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
	switch (errnox)
	{
	case EINVAL:
		return RD_KAFKA_RESP_ERR__INVALID_ARG;

        case EBUSY:
                return RD_KAFKA_RESP_ERR__CONFLICT;

	case ENOENT:
		return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;

	case ESRCH:
		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

	case ETIMEDOUT:
		return RD_KAFKA_RESP_ERR__TIMED_OUT;

	case EMSGSIZE:
		return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;

	case ENOBUFS:
		return RD_KAFKA_RESP_ERR__QUEUE_FULL;

        case ECANCELED:
                return RD_KAFKA_RESP_ERR__FATAL;

	default:
		return RD_KAFKA_RESP_ERR__FAIL;
	}
}


rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk,
                                          char *errstr, size_t errstr_size) {
        rd_kafka_resp_err_t err;

        if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
                rd_kafka_rdlock(rk);
                rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
                rd_kafka_rdunlock(rk);
        }

        return err;
}


/**
 * @brief Set's the fatal error for this instance.
 *
 * @param do_lock RD_DO_LOCK: rd_kafka_wrlock() will be acquired and released,
 *                RD_DONT_LOCK: caller must hold rd_kafka_wrlock().
 *
 * @returns 1 if the error was set, or 0 if a previous fatal error
 *          has already been set on this instance.
 *
 * @locality any
 * @locks none
 */
int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
                               rd_kafka_resp_err_t err,
                               const char *fmt, ...) {
        va_list ap;
        char buf[512];

        if (do_lock)
                rd_kafka_wrlock(rk);
        rk->rk_fatal.cnt++;
        if (rd_atomic32_get(&rk->rk_fatal.err)) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
                rd_kafka_dbg(rk, GENERIC, "FATAL",
                             "Suppressing subsequent fatal error: %s",
                             rd_kafka_err2name(err));
                return 0;
        }

        rd_atomic32_set(&rk->rk_fatal.err, err);

        va_start(ap, fmt);
        rd_vsnprintf(buf, sizeof(buf), fmt, ap);
        va_end(ap);
        rk->rk_fatal.errstr = rd_strdup(buf);

        if (do_lock)
                rd_kafka_wrunlock(rk);

        /* If there is an error callback or event handler we
         * also log the fatal error as it happens.
         * If there is no error callback the error event
         * will be automatically logged, and this check here
         * prevents us from duplicate logs. */
        if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
                rd_kafka_log(rk, LOG_EMERG, "FATAL",
                             "Fatal error: %s: %s",
                             rd_kafka_err2str(err), rk->rk_fatal.errstr);
        else
                rd_kafka_dbg(rk, ALL, "FATAL",
                             "Fatal error: %s: %s",
                             rd_kafka_err2str(err), rk->rk_fatal.errstr);

        /* Indicate to the application that a fatal error was raised,
         * the app should use rd_kafka_fatal_error() to extract the
         * fatal error code itself.
         * For the high-level consumer we propagate the error as a
         * consumer error so it is returned from consumer_poll(),
         * while for all other client types (the producer) we propagate to
         * the standard error handler (typically error_cb). */
        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
                rd_kafka_consumer_err(rk->rk_cgrp->rkcg_q, RD_KAFKA_NODEID_UA,
                                      RD_KAFKA_RESP_ERR__FATAL, 0, NULL, NULL,
                                      RD_KAFKA_OFFSET_INVALID,
                                      "Fatal error: %s: %s",
                                      rd_kafka_err2str(err),
                                      rk->rk_fatal.errstr);
        else
                rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
                                "Fatal error: %s: %s",
                                rd_kafka_err2str(err), rk->rk_fatal.errstr);


        /* Tell rdkafka main thread to purge producer queues, but not
         * in-flight since we'll want proper delivery status for transmitted
         * requests.
         * Need NON_BLOCKING to avoid dead-lock if user is
         * calling purge() at the same time, which could be
         * waiting for this broker thread to handle its
         * OP_PURGE request. */
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
                rko->rko_u.purge.flags = RD_KAFKA_PURGE_F_QUEUE|
                        RD_KAFKA_PURGE_F_NON_BLOCKING;
                rd_kafka_q_enq(rk->rk_ops, rko);
        }

        return 1;
}


rd_kafka_resp_err_t
rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                           const char *reason) {
        if (!rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
                return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
        else
                return RD_KAFKA_RESP_ERR_NO_ERROR;
}



/**
 * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
 *
 * @locality application thread
 */
void rd_kafka_destroy_final (rd_kafka_t *rk) {

        rd_kafka_assert(rk, rd_kafka_terminating(rk));

        /* Synchronize state */
        rd_kafka_wrlock(rk);
        rd_kafka_wrunlock(rk);

        /* Terminate SASL provider */
        if (rk->rk_conf.sasl.provider)
                rd_kafka_sasl_term(rk);

        rd_kafka_timers_destroy(&rk->rk_timers);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");

        /* Destroy cgrp */
        if (rk->rk_cgrp) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Destroying cgrp");
                /* Reset queue forwarding (rep -> cgrp) */
                rd_kafka_q_fwd_set(rk->rk_rep, NULL);
                rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
        }

        rd_kafka_assignors_term(rk);

        if (rk->rk_type == RD_KAFKA_CONSUMER) {
                rd_kafka_assignment_destroy(rk);
                if (rk->rk_consumer.q)
                        rd_kafka_q_destroy(rk->rk_consumer.q);
        }

	/* Purge op-queues */
	rd_kafka_q_destroy_owner(rk->rk_rep);
	rd_kafka_q_destroy_owner(rk->rk_ops);

#if WITH_SSL
        if (rk->rk_conf.ssl.ctx) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
                rd_kafka_ssl_ctx_term(rk);
        }
#endif

        /* It is not safe to log after this point. */
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Termination done: freeing resources");

        if (rk->rk_logq) {
                rd_kafka_q_destroy_owner(rk->rk_logq);
                rk->rk_logq = NULL;
        }

        if (rk->rk_type == RD_KAFKA_PRODUCER) {
		cnd_destroy(&rk->rk_curr_msgs.cnd);
		mtx_destroy(&rk->rk_curr_msgs.lock);
	}

        if (rk->rk_fatal.errstr) {
                rd_free(rk->rk_fatal.errstr);
                rk->rk_fatal.errstr = NULL;
        }

	cnd_destroy(&rk->rk_broker_state_change_cnd);
	mtx_destroy(&rk->rk_broker_state_change_lock);

        mtx_destroy(&rk->rk_suppress.sparse_connect_lock);

        cnd_destroy(&rk->rk_init_cnd);
        mtx_destroy(&rk->rk_init_lock);

	if (rk->rk_full_metadata)
		rd_kafka_metadata_destroy(rk->rk_full_metadata);
        rd_kafkap_str_destroy(rk->rk_client_id);
        rd_kafkap_str_destroy(rk->rk_group_id);
        rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
	rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
        rd_list_destroy(&rk->rk_broker_by_id);

	rwlock_destroy(&rk->rk_lock);

	rd_free(rk);
	rd_kafka_global_cnt_decr();
}


static void rd_kafka_destroy_app (rd_kafka_t *rk, int flags) {
        thrd_t thrd;
#ifndef _WIN32
	int term_sig = rk->rk_conf.term_sig;
#endif
        int res;
        char flags_str[256];
        static const char *rd_kafka_destroy_flags_names[] = {
                "Terminate",
                "DestroyCalled",
                "Immediate",
                "NoConsumerClose",
                NULL
        };

        /* Fatal errors and _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
        if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE ||
            rd_kafka_fatal_error_code(rk))
                flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;

        rd_flags2str(flags_str, sizeof(flags_str),
                     rd_kafka_destroy_flags_names, flags);
        rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance "
                     "(destroy flags %s (0x%x))",
                     flags ? flags_str : "none", flags);

        /* If producer still has messages in queue the application
         * is terminating the producer without first calling flush() or purge()
         * which is a common new user mistake, so hint the user of proper
         * shutdown semantics. */
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
                unsigned int tot_cnt;
                size_t tot_size;

                rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);

                if (tot_cnt > 0)
                        rd_kafka_log(rk, LOG_WARNING, "TERMINATE",
                                     "Producer terminating with %u message%s "
                                     "(%"PRIusz" byte%s) still in "
                                     "queue or transit: "
                                     "use flush() to wait for "
                                     "outstanding message delivery",
                                     tot_cnt, tot_cnt > 1 ? "s" : "",
                                     tot_size, tot_size > 1 ? "s" : "");
        }

        /* Make sure destroy is not called from a librdkafka thread
         * since this will most likely cause a deadlock.
         * FIXME: include broker threads (for log_cb) */
        if (thrd_is_current(rk->rk_thread) ||
            thrd_is_current(rk->rk_background.thread)) {
                rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
                             "Application bug: "
                             "rd_kafka_destroy() called from "
                             "librdkafka owned thread");
                rd_kafka_assert(NULL,
                                !*"Application bug: "
                                "calling rd_kafka_destroy() from "
                                "librdkafka owned thread is prohibited");
        }

        /* Before signaling for general termination, set the destroy
         * flags to hint cgrp how to shut down. */
        rd_atomic32_set(&rk->rk_terminate,
                        flags|RD_KAFKA_DESTROY_F_DESTROY_CALLED);

        /* The legacy/simple consumer lacks an API to close down the consumer*/
        if (rk->rk_cgrp) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Terminating consumer group handler");
                rd_kafka_consumer_close(rk);
        }

        /* With the consumer closed, terminate the rest of librdkafka. */
        rd_atomic32_set(&rk->rk_terminate, flags|RD_KAFKA_DESTROY_F_TERMINATE);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
        rd_kafka_wrlock(rk);
        thrd = rk->rk_thread;
        rd_kafka_timers_interrupt(&rk->rk_timers);
        rd_kafka_wrunlock(rk);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Sending TERMINATE to internal main thread");
        /* Send op to trigger queue/io wake-up.
         * The op itself is (likely) ignored by the receiver. */
        rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

#ifndef _WIN32
        /* Interrupt main kafka thread to speed up termination. */
	if (term_sig) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Sending thread kill signal %d", term_sig);
                pthread_kill(thrd, term_sig);
        }
#endif

        if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
                return; /* FIXME: thread resource leak */

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Joining internal main thread");

        if (thrd_join(thrd, &res) != thrd_success)
                rd_kafka_log(rk, LOG_ERR, "DESTROY",
                             "Failed to join internal main thread: %s "
                             "(was process forked?)",
                             rd_strerror(errno));

        rd_kafka_destroy_final(rk);
}


/* NOTE: Must only be called by application.
 *       librdkafka itself must use rd_kafka_destroy0(). */
void rd_kafka_destroy (rd_kafka_t *rk) {
        rd_kafka_destroy_app(rk, 0);
}

void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags) {
        rd_kafka_destroy_app(rk, flags);
}


/**
 * Main destructor for rd_kafka_t
 *
 * Locality: rdkafka main thread or application thread during rd_kafka_new()
 */
static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
	rd_kafka_topic_t *rkt, *rkt_tmp;
	rd_kafka_broker_t *rkb, *rkb_tmp;
        rd_list_t wait_thrds;
        thrd_t *thrd;
        int i;

        rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");

        /* Trigger any state-change waiters (which should check the
         * terminate flag whenever they wake up). */
        rd_kafka_brokers_broadcast_state_change(rk);

        if (rk->rk_background.thread) {
                int res;
                /* Send op to trigger queue/io wake-up.
                 * The op itself is (likely) ignored by the receiver. */
                rd_kafka_q_enq(rk->rk_background.q,
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

                rd_kafka_dbg(rk, ALL, "DESTROY",
                             "Waiting for background queue thread "
                             "to terminate");
                thrd_join(rk->rk_background.thread, &res);
                rd_kafka_q_destroy_owner(rk->rk_background.q);
        }

        /* Call on_destroy() interceptors */
        rd_kafka_interceptors_on_destroy(rk);

	/* Brokers pick up on rk_terminate automatically. */

        /* List of (broker) threads to join to synchronize termination */
        rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);

	rd_kafka_wrlock(rk);

        rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
	/* Decommission all topics */
	TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
		rd_kafka_wrunlock(rk);
		rd_kafka_topic_partitions_remove(rkt);
		rd_kafka_wrlock(rk);
	}

        /* Decommission brokers.
         * Broker thread holds a refcount and detects when broker refcounts
         * reaches 1 and then decommissions itself. */
        TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
                /* Add broker's thread to wait_thrds list for later joining */
                thrd = rd_malloc(sizeof(*thrd));
                *thrd = rkb->rkb_thread;
                rd_list_add(&wait_thrds, thrd);
                rd_kafka_wrunlock(rk);

                rd_kafka_dbg(rk, BROKER, "DESTROY",
                             "Sending TERMINATE to %s",
                             rd_kafka_broker_name(rkb));
                /* Send op to trigger queue/io wake-up.
                 * The op itself is (likely) ignored by the broker thread. */
                rd_kafka_q_enq(rkb->rkb_ops,
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

#ifndef _WIN32
                /* Interrupt IO threads to speed up termination. */
                if (rk->rk_conf.term_sig)
			pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
#endif

                rd_kafka_broker_destroy(rkb);

                rd_kafka_wrlock(rk);
        }

        if (rk->rk_clusterid) {
                rd_free(rk->rk_clusterid);
                rk->rk_clusterid = NULL;
        }

        /* Destroy coord requests */
        rd_kafka_coord_reqs_term(rk);

        /* Destroy the coordinator cache */
        rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);

        /* Purge metadata cache.
         * #3279:
         * We mustn't call cache_destroy() here since there might be outstanding
         * broker rkos that hold references to the metadata cache lock,
         * and these brokers are destroyed below. So to avoid a circular
         * dependency refcnt deadlock we first purge the cache here
         * and destroy it after the brokers are destroyed. */
        rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);

        rd_kafka_wrunlock(rk);

        mtx_lock(&rk->rk_broker_state_change_lock);
        /* Purge broker state change waiters */
        rd_list_destroy(&rk->rk_broker_state_change_waiters);
        mtx_unlock(&rk->rk_broker_state_change_lock);

        if (rk->rk_type == RD_KAFKA_CONSUMER) {
                if (rk->rk_consumer.q)
                        rd_kafka_q_disable(rk->rk_consumer.q);
        }

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Purging reply queue");

	/* Purge op-queue */
        rd_kafka_q_disable(rk->rk_rep);
	rd_kafka_q_purge(rk->rk_rep);

	/* Loose our special reference to the internal broker. */
        mtx_lock(&rk->rk_internal_rkb_lock);
	if ((rkb = rk->rk_internal_rkb)) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Decommissioning internal broker");

                /* Send op to trigger queue wake-up. */
                rd_kafka_q_enq(rkb->rkb_ops,
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

                rk->rk_internal_rkb = NULL;
                thrd = rd_malloc(sizeof(*thrd));
                *thrd = rkb->rkb_thread;
                rd_list_add(&wait_thrds, thrd);
        }
        mtx_unlock(&rk->rk_internal_rkb_lock);
	if (rkb)
		rd_kafka_broker_destroy(rkb);


        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));

        /* Join broker threads */
        RD_LIST_FOREACH(thrd, &wait_thrds, i) {
                int res;
                if (thrd_join(*thrd, &res) != thrd_success)
                        ;
                rd_free(thrd);
        }

        rd_list_destroy(&wait_thrds);

        /* Destroy mock cluster */
        if (rk->rk_mock.cluster)
                rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);

        if (rd_atomic32_get(&rk->rk_mock.cluster_cnt) > 0) {
                rd_kafka_log(rk, LOG_EMERG, "MOCK",
                             "%d mock cluster(s) still active: "
                             "must be explicitly destroyed with "
                             "rd_kafka_mock_cluster_destroy() prior to "
                             "terminating the rd_kafka_t instance",
                             (int)rd_atomic32_get(&rk->rk_mock.cluster_cnt));
                rd_assert(!*"All mock clusters must be destroyed prior to "
                          "rd_kafka_t destroy");
        }

        /* Destroy metadata cache */
        rd_kafka_wrlock(rk);
        rd_kafka_metadata_cache_destroy(rk);
        rd_kafka_wrunlock(rk);
}

/**
 * @brief Buffer state for stats emitter
 */
struct _stats_emit {
        char   *buf;      /* Pointer to allocated buffer */
        size_t  size;     /* Current allocated size of buf */
        size_t  of;       /* Current write-offset in buf */
};


/* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
 * current scope. */
#define _st_printf(...) do {                                            \
                ssize_t _r;                                             \
                ssize_t _rem = st->size - st->of;                       \
                _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__);    \
                if (_r >= _rem) {                                       \
                        st->size *= 2;                                  \
                        _rem = st->size - st->of;                       \
                        st->buf = rd_realloc(st->buf, st->size);        \
                        _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
                }                                                       \
                st->of += _r;                                           \
        } while (0)

struct _stats_total {
        int64_t tx;          /**< broker.tx */
        int64_t tx_bytes;    /**< broker.tx_bytes */
        int64_t rx;          /**< broker.rx */
        int64_t rx_bytes;    /**< broker.rx_bytes */
        int64_t txmsgs;      /**< partition.txmsgs */
        int64_t txmsg_bytes; /**< partition.txbytes */
        int64_t rxmsgs;      /**< partition.rxmsgs */
        int64_t rxmsg_bytes; /**< partition.rxbytes */
};



/**
 * @brief Rollover and emit an average window.
 */
static RD_INLINE void rd_kafka_stats_emit_avg (struct _stats_emit *st,
                                               const char *name,
                                               rd_avg_t *src_avg) {
        rd_avg_t avg;

        rd_avg_rollover(&avg, src_avg);
        _st_printf(
                "\"%s\": {"
                " \"min\":%"PRId64","
                " \"max\":%"PRId64","
                " \"avg\":%"PRId64","
                " \"sum\":%"PRId64","
                " \"stddev\": %"PRId64","
                " \"p50\": %"PRId64","
                " \"p75\": %"PRId64","
                " \"p90\": %"PRId64","
                " \"p95\": %"PRId64","
                " \"p99\": %"PRId64","
                " \"p99_99\": %"PRId64","
                " \"outofrange\": %"PRId64","
                " \"hdrsize\": %"PRId32","
                " \"cnt\":%i "
                "}, ",
                name,
                avg.ra_v.minv,
                avg.ra_v.maxv,
                avg.ra_v.avg,
                avg.ra_v.sum,
                (int64_t)avg.ra_hist.stddev,
                avg.ra_hist.p50,
                avg.ra_hist.p75,
                avg.ra_hist.p90,
                avg.ra_hist.p95,
                avg.ra_hist.p99,
                avg.ra_hist.p99_99,
                avg.ra_hist.oor,
                avg.ra_hist.hdrsize,
                avg.ra_v.cnt);
        rd_avg_destroy(&avg);
}

/**
 * Emit stats for toppar
 */
static RD_INLINE void rd_kafka_stats_emit_toppar (struct _stats_emit *st,
                                                  struct _stats_total *total,
                                                  rd_kafka_toppar_t *rktp,
                                                  int first) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        int64_t end_offset;
        int64_t consumer_lag = -1;
        int64_t consumer_lag_stored = -1;
        struct offset_stats offs;
        int32_t broker_id = -1;

        rd_kafka_toppar_lock(rktp);

        if (rktp->rktp_broker) {
                rd_kafka_broker_lock(rktp->rktp_broker);
                broker_id = rktp->rktp_broker->rkb_nodeid;
                rd_kafka_broker_unlock(rktp->rktp_broker);
        }

        /* Grab a copy of the latest finalized offset stats */
        offs = rktp->rktp_offsets_fin;

        end_offset = (rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED)
                ? rktp->rktp_ls_offset
                : rktp->rktp_hi_offset;

        /* Calculate consumer_lag by using the highest offset
         * of stored_offset (the last message passed to application + 1, or
         * if enable.auto.offset.store=false the last message manually stored),
         * or the committed_offset (the last message committed by this or
         * another consumer).
         * Using stored_offset allows consumer_lag to be up to date even if
         * offsets are not (yet) committed.
         */
        if (end_offset != RD_KAFKA_OFFSET_INVALID) {
                if (rktp->rktp_stored_offset >= 0 &&
                    rktp->rktp_stored_offset <= end_offset)
                        consumer_lag_stored =
                                end_offset - rktp->rktp_stored_offset;
                if (rktp->rktp_committed_offset >= 0 &&
                    rktp->rktp_committed_offset <= end_offset)
                        consumer_lag = end_offset - rktp->rktp_committed_offset;
        }

	_st_printf("%s\"%"PRId32"\": { "
		   "\"partition\":%"PRId32", "
		   "\"broker\":%"PRId32", "
		   "\"leader\":%"PRId32", "
		   "\"desired\":%s, "
		   "\"unknown\":%s, "
		   "\"msgq_cnt\":%i, "
		   "\"msgq_bytes\":%"PRIusz", "
		   "\"xmit_msgq_cnt\":%i, "
		   "\"xmit_msgq_bytes\":%"PRIusz", "
		   "\"fetchq_cnt\":%i, "
		   "\"fetchq_size\":%"PRIu64", "
		   "\"fetch_state\":\"%s\", "
		   "\"query_offset\":%"PRId64", "
		   "\"next_offset\":%"PRId64", "
		   "\"app_offset\":%"PRId64", "
		   "\"stored_offset\":%"PRId64", "
		   "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
		   "\"committed_offset\":%"PRId64", "
		   "\"eof_offset\":%"PRId64", "
		   "\"lo_offset\":%"PRId64", "
		   "\"hi_offset\":%"PRId64", "
                   "\"ls_offset\":%"PRId64", "
                   "\"consumer_lag\":%"PRId64", "
                   "\"consumer_lag_stored\":%"PRId64", "
		   "\"txmsgs\":%"PRIu64", "
		   "\"txbytes\":%"PRIu64", "
                   "\"rxmsgs\":%"PRIu64", "
                   "\"rxbytes\":%"PRIu64", "
                   "\"msgs\": %"PRIu64", "
                   "\"rx_ver_drops\": %"PRIu64", "
                   "\"msgs_inflight\": %"PRId32", "
                   "\"next_ack_seq\": %"PRId32", "
                   "\"next_err_seq\": %"PRId32", "
                   "\"acked_msgid\": %"PRIu64
                   "} ",
		   first ? "" : ", ",
		   rktp->rktp_partition,
		   rktp->rktp_partition,
                   broker_id,
                   rktp->rktp_leader_id,
		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
                   rd_kafka_msgq_len(&rktp->rktp_msgq),
		   rd_kafka_msgq_size(&rktp->rktp_msgq),
                   /* FIXME: xmit_msgq is local to the broker thread. */
                   0,
                   (size_t)0,
		   rd_kafka_q_len(rktp->rktp_fetchq),
		   rd_kafka_q_size(rktp->rktp_fetchq),
		   rd_kafka_fetch_states[rktp->rktp_fetch_state],
		   rktp->rktp_query_offset,
                   offs.fetch_offset,
		   rktp->rktp_app_offset,
		   rktp->rktp_stored_offset,
		   rktp->rktp_committed_offset, /* FIXME: issue #80 */
		   rktp->rktp_committed_offset,
                   offs.eof_offset,
		   rktp->rktp_lo_offset,
		   rktp->rktp_hi_offset,
                   rktp->rktp_ls_offset,
                   consumer_lag,
                   consumer_lag_stored,
                   rd_atomic64_get(&rktp->rktp_c.tx_msgs),
                   rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
                   rd_atomic64_get(&rktp->rktp_c.rx_msgs),
                   rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
                   rk->rk_type == RD_KAFKA_PRODUCER ?
                   rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs) :
                   rd_atomic64_get(&rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
                   rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
                   rd_atomic32_get(&rktp->rktp_msgs_inflight),
                   rktp->rktp_eos.next_ack_seq,
                   rktp->rktp_eos.next_err_seq,
                   rktp->rktp_eos.acked_msgid);

        if (total) {
                total->txmsgs      += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
                total->txmsg_bytes += rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
                total->rxmsgs      += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
                total->rxmsg_bytes += rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
        }

        rd_kafka_toppar_unlock(rktp);
}

/**
 * @brief Emit broker request type stats
 */
static void rd_kafka_stats_emit_broker_reqs (struct _stats_emit *st,
                                             rd_kafka_broker_t *rkb) {
        /* Filter out request types that will never be sent by the client. */
        static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
                [RD_KAFKA_PRODUCER] = {
                        [RD_KAFKAP_Fetch] = rd_true,
                        [RD_KAFKAP_OffsetCommit] = rd_true,
                        [RD_KAFKAP_OffsetFetch] = rd_true,
                        [RD_KAFKAP_JoinGroup] = rd_true,
                        [RD_KAFKAP_Heartbeat] = rd_true,
                        [RD_KAFKAP_LeaveGroup] = rd_true,
                        [RD_KAFKAP_SyncGroup] = rd_true
                },
                [RD_KAFKA_CONSUMER] = {
                        [RD_KAFKAP_Produce] = rd_true,
                        [RD_KAFKAP_InitProducerId] = rd_true,
                        /* Transactional producer */
                        [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
                        [RD_KAFKAP_AddOffsetsToTxn] = rd_true,
                        [RD_KAFKAP_EndTxn] = rd_true,
                        [RD_KAFKAP_TxnOffsetCommit] = rd_true,
                },
                [2/*any client type*/] = {
                        [RD_KAFKAP_UpdateMetadata] = rd_true,
                        [RD_KAFKAP_ControlledShutdown] = rd_true,
                        [RD_KAFKAP_LeaderAndIsr] = rd_true,
                        [RD_KAFKAP_StopReplica] = rd_true,
                        [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,

                        [RD_KAFKAP_WriteTxnMarkers] = rd_true,

                        [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
                        [RD_KAFKAP_DescribeLogDirs] = rd_true,

                        [RD_KAFKAP_SaslAuthenticate] = rd_false,

                        [RD_KAFKAP_CreateDelegationToken] = rd_true,
                        [RD_KAFKAP_RenewDelegationToken] = rd_true,
                        [RD_KAFKAP_ExpireDelegationToken] = rd_true,
                        [RD_KAFKAP_DescribeDelegationToken] = rd_true,
                        [RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
                        [RD_KAFKAP_ElectLeaders] = rd_true,
                        [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
                        [RD_KAFKAP_ListPartitionReassignments] = rd_true,
                        [RD_KAFKAP_AlterUserScramCredentials] = rd_true,
                        [RD_KAFKAP_Vote] = rd_true,
                        [RD_KAFKAP_BeginQuorumEpoch] = rd_true,
                        [RD_KAFKAP_EndQuorumEpoch] = rd_true,
                        [RD_KAFKAP_DescribeQuorum] = rd_true,
                        [RD_KAFKAP_AlterIsr] = rd_true,
                        [RD_KAFKAP_UpdateFeatures] = rd_true,
                        [RD_KAFKAP_Envelope] = rd_true,
                },
                [3/*hide-unless-non-zero*/] = {
                        /* Hide Admin requests unless they've been used */
                        [RD_KAFKAP_CreateTopics] =  rd_true,
                        [RD_KAFKAP_DeleteTopics] =  rd_true,
                        [RD_KAFKAP_DeleteRecords] =  rd_true,
                        [RD_KAFKAP_CreatePartitions] =  rd_true,
                        [RD_KAFKAP_DescribeAcls] = rd_true,
                        [RD_KAFKAP_CreateAcls] = rd_true,
                        [RD_KAFKAP_DeleteAcls] = rd_true,
                        [RD_KAFKAP_DescribeConfigs] = rd_true,
                        [RD_KAFKAP_AlterConfigs] = rd_true,
                        [RD_KAFKAP_DeleteGroups] = rd_true,
                        [RD_KAFKAP_ListGroups] = rd_true,
                        [RD_KAFKAP_DescribeGroups] = rd_true
                }
        };
        int i;
        int cnt = 0;

        _st_printf("\"req\": { ");
        for (i = 0 ; i < RD_KAFKAP__NUM ; i++) {
                int64_t v;

                if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
                        continue;

                v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
                if (!v && filter[3][i])
                        continue; /* Filter out zero values */

                _st_printf("%s\"%s\": %"PRId64,
                           cnt > 0 ? ", " : "",
                           rd_kafka_ApiKey2str(i), v);

                cnt++;
        }
        _st_printf(" }, ");
}


/**
 * Emit all statistics
 */
static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
	rd_kafka_broker_t *rkb;
	rd_kafka_topic_t *rkt;
	rd_ts_t now;
	rd_kafka_op_t *rko;
	unsigned int tot_cnt;
	size_t tot_size;
        rd_kafka_resp_err_t err;
        struct _stats_emit stx = { .size = 1024*10 };
        struct _stats_emit *st = &stx;
        struct _stats_total total = {0};

        st->buf = rd_malloc(st->size);


	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
	rd_kafka_rdlock(rk);

	now = rd_clock();
	_st_printf("{ "
                   "\"name\": \"%s\", "
                   "\"client_id\": \"%s\", "
                   "\"type\": \"%s\", "
		   "\"ts\":%"PRId64", "
		   "\"time\":%lli, "
                   "\"age\":%"PRId64", "
		   "\"replyq\":%i, "
                   "\"msg_cnt\":%u, "
		   "\"msg_size\":%"PRIusz", "
                   "\"msg_max\":%u, "
		   "\"msg_size_max\":%"PRIusz", "
                   "\"simple_cnt\":%i, "
                   "\"metadata_cache_cnt\":%i, "
		   "\"brokers\":{ "/*open brokers*/,
                   rk->rk_name,
                   rk->rk_conf.client_id_str,
                   rd_kafka_type2str(rk->rk_type),
		   now,
		   (signed long long)time(NULL),
                   now - rk->rk_ts_created,
		   rd_kafka_q_len(rk->rk_rep),
		   tot_cnt, tot_size,
		   rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
                   rd_atomic32_get(&rk->rk_simple_cnt),
                   rk->rk_metadata_cache.rkmc_cnt);


	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
		rd_kafka_toppar_t *rktp;
                rd_ts_t txidle = -1, rxidle = -1;

		rd_kafka_broker_lock(rkb);

                if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
                        /* Calculate tx and rx idle time in usecs */
                        txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
                        rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);

                        if (txidle)
                                txidle = RD_MAX(now - txidle, 0);
                        else
                                txidle = -1;

                        if (rxidle)
                                rxidle = RD_MAX(now - rxidle, 0);
                        else
                                rxidle = -1;
                }

		_st_printf("%s\"%s\": { "/*open broker*/
			   "\"name\":\"%s\", "
			   "\"nodeid\":%"PRId32", "
                           "\"nodename\":\"%s\", "
                           "\"source\":\"%s\", "
			   "\"state\":\"%s\", "
                           "\"stateage\":%"PRId64", "
			   "\"outbuf_cnt\":%i, "
			   "\"outbuf_msg_cnt\":%i, "
			   "\"waitresp_cnt\":%i, "
			   "\"waitresp_msg_cnt\":%i, "
			   "\"tx\":%"PRIu64", "
			   "\"txbytes\":%"PRIu64", "
			   "\"txerrs\":%"PRIu64", "
			   "\"txretries\":%"PRIu64", "
                           "\"txidle\":%"PRIu64", "
			   "\"req_timeouts\":%"PRIu64", "
			   "\"rx\":%"PRIu64", "
			   "\"rxbytes\":%"PRIu64", "
			   "\"rxerrs\":%"PRIu64", "
                           "\"rxcorriderrs\":%"PRIu64", "
                           "\"rxpartial\":%"PRIu64", "
                           "\"rxidle\":%"PRIu64", "
                           "\"zbuf_grow\":%"PRIu64", "
                           "\"buf_grow\":%"PRIu64", "
                           "\"wakeups\":%"PRIu64", "
                           "\"connects\":%"PRId32", "
                           "\"disconnects\":%"PRId32", ",
			   rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
			   rkb->rkb_name,
			   rkb->rkb_name,
			   rkb->rkb_nodeid,
                           rkb->rkb_nodename,
                           rd_kafka_confsource2str(rkb->rkb_source),
			   rd_kafka_broker_state_names[rkb->rkb_state],
                           rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
			   rd_atomic64_get(&rkb->rkb_c.tx),
			   rd_atomic64_get(&rkb->rkb_c.tx_bytes),
			   rd_atomic64_get(&rkb->rkb_c.tx_err),
			   rd_atomic64_get(&rkb->rkb_c.tx_retries),
                           txidle,
			   rd_atomic64_get(&rkb->rkb_c.req_timeouts),
			   rd_atomic64_get(&rkb->rkb_c.rx),
			   rd_atomic64_get(&rkb->rkb_c.rx_bytes),
			   rd_atomic64_get(&rkb->rkb_c.rx_err),
			   rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
			   rd_atomic64_get(&rkb->rkb_c.rx_partial),
                           rxidle,
                           rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
                           rd_atomic64_get(&rkb->rkb_c.buf_grow),
                           rd_atomic64_get(&rkb->rkb_c.wakeups),
                           rd_atomic32_get(&rkb->rkb_c.connects),
                           rd_atomic32_get(&rkb->rkb_c.disconnects));

                total.tx       += rd_atomic64_get(&rkb->rkb_c.tx);
                total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
                total.rx       += rd_atomic64_get(&rkb->rkb_c.rx);
                total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);

                rd_kafka_stats_emit_avg(st, "int_latency",
                                        &rkb->rkb_avg_int_latency);
                rd_kafka_stats_emit_avg(st, "outbuf_latency",
                                        &rkb->rkb_avg_outbuf_latency);
                rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
                rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);

                rd_kafka_stats_emit_broker_reqs(st, rkb);

                _st_printf("\"toppars\":{ "/*open toppars*/);

		TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
			_st_printf("%s\"%.*s-%"PRId32"\": { "
				   "\"topic\":\"%.*s\", "
				   "\"partition\":%"PRId32"} ",
				   rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                                   rktp->rktp_partition,
				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
				   rktp->rktp_partition);
		}

		rd_kafka_broker_unlock(rkb);

		_st_printf("} "/*close toppars*/
			   "} "/*close broker*/);
	}


	_st_printf("}, " /* close "brokers" array */
		   "\"topics\":{ ");

	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                rd_kafka_toppar_t *rktp;
		int i, j;

		rd_kafka_topic_rdlock(rkt);
		_st_printf("%s\"%.*s\": { "
			   "\"topic\":\"%.*s\", "
                           "\"age\":%"PRId64", "
			   "\"metadata_age\":%"PRId64", ",
			   rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
                           (now - rkt->rkt_ts_create)/1000,
			   rkt->rkt_ts_metadata ?
			   (now - rkt->rkt_ts_metadata)/1000 : 0);

                rd_kafka_stats_emit_avg(st, "batchsize",
                                        &rkt->rkt_avg_batchsize);
                rd_kafka_stats_emit_avg(st, "batchcnt",
                                        &rkt->rkt_avg_batchcnt);

                _st_printf("\"partitions\":{ " /*open partitions*/);

                for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
                        rd_kafka_stats_emit_toppar(st, &total, rkt->rkt_p[i],
                                                   i == 0);

                RD_LIST_FOREACH(rktp, &rkt->rkt_desp, j)
                        rd_kafka_stats_emit_toppar(st, &total, rktp, i+j == 0);

                i += j;

                if (rkt->rkt_ua)
                        rd_kafka_stats_emit_toppar(st, NULL, rkt->rkt_ua,
                                                   i++ == 0);

		rd_kafka_topic_rdunlock(rkt);

		_st_printf("} "/*close partitions*/
			   "} "/*close topic*/);

	}
	_st_printf("} "/*close topics*/);

        if (rk->rk_cgrp) {
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
                _st_printf(", \"cgrp\": { "
                           "\"state\": \"%s\", "
                           "\"stateage\": %"PRId64", "
                           "\"join_state\": \"%s\", "
                           "\"rebalance_age\": %"PRId64", "
                           "\"rebalance_cnt\": %d, "
                           "\"rebalance_reason\": \"%s\", "
                           "\"assignment_size\": %d }",
                           rd_kafka_cgrp_state_names[rkcg->rkcg_state],
                           rkcg->rkcg_ts_statechange ?
                           (now - rkcg->rkcg_ts_statechange) / 1000 : 0,
                           rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
                           rkcg->rkcg_c.ts_rebalance ?
                           (now - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
                           rkcg->rkcg_c.rebalance_cnt,
                           rkcg->rkcg_c.rebalance_reason,
                           rkcg->rkcg_c.assignment_size);
        }

        if (rd_kafka_is_idempotent(rk)) {
                _st_printf(", \"eos\": { "
                           "\"idemp_state\": \"%s\", "
                           "\"idemp_stateage\": %"PRId64", "
                           "\"txn_state\": \"%s\", "
                           "\"txn_stateage\": %"PRId64", "
                           "\"txn_may_enq\": %s, "
                           "\"producer_id\": %"PRId64", "
                           "\"producer_epoch\": %hd, "
                           "\"epoch_cnt\": %d "
                           "}",
                           rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
                           (now - rk->rk_eos.ts_idemp_state) / 1000,
                           rd_kafka_txn_state2str(rk->rk_eos.txn_state),
                           (now - rk->rk_eos.ts_txn_state) / 1000,
                           rd_atomic32_get(&rk->rk_eos.txn_may_enq) ?
                           "true":"false",
                           rk->rk_eos.pid.id,
                           rk->rk_eos.pid.epoch,
                           rk->rk_eos.epoch_cnt);
        }

        if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
                _st_printf(", \"fatal\": { "
                           "\"error\": \"%s\", "
                           "\"reason\": \"%s\", "
                           "\"cnt\": %d "
                           "}",
                           rd_kafka_err2str(err),
                           rk->rk_fatal.errstr,
                           rk->rk_fatal.cnt);

	rd_kafka_rdunlock(rk);

        /* Total counters */
        _st_printf(", "
                   "\"tx\":%"PRId64", "
                   "\"tx_bytes\":%"PRId64", "
                   "\"rx\":%"PRId64", "
                   "\"rx_bytes\":%"PRId64", "
                   "\"txmsgs\":%"PRId64", "
                   "\"txmsg_bytes\":%"PRId64", "
                   "\"rxmsgs\":%"PRId64", "
                   "\"rxmsg_bytes\":%"PRId64,
                   total.tx,
                   total.tx_bytes,
                   total.rx,
                   total.rx_bytes,
                   total.txmsgs,
                   total.txmsg_bytes,
                   total.rxmsgs,
                   total.rxmsg_bytes);

        _st_printf("}"/*close object*/);


	/* Enqueue op for application */
	rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
	rko->rko_u.stats.json = st->buf;
	rko->rko_u.stats.json_len = st->of;
	rd_kafka_q_enq(rk->rk_rep, rko);
}


/**
 * @brief 1 second generic timer.
 *
 * @locality rdkafka main thread
 * @locks none
 */
static void rd_kafka_1s_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;

        /* Scan topic state, message timeouts, etc. */
        rd_kafka_topic_scan_all(rk, rd_clock());

        /* Sparse connections:
         * try to maintain at least one connection to the cluster. */
        if (rk->rk_conf.sparse_connections &&
            rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
                rd_kafka_connect_any(rk, "no cluster connection");

        rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
}

static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
	rd_kafka_stats_emit_all(rk);
}


/**
 * @brief Periodic metadata refresh callback
 *
 * @locality rdkafka main thread
 */
static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
        rd_kafka_resp_err_t err;

        /* High-level consumer:
         * We need to query both locally known topics and subscribed topics
         * so that we can detect locally known topics changing partition
         * count or disappearing, as well as detect previously non-existent
         * subscribed topics now being available in the cluster. */
        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
                err = rd_kafka_metadata_refresh_consumer_topics(
                        rk, NULL,
                        "periodic topic and broker list refresh");
        else
                err = rd_kafka_metadata_refresh_known_topics(
                        rk, NULL, rd_true/*force*/,
                        "periodic topic and broker list refresh");


        if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
            rd_interval(&rk->rk_suppress.broker_metadata_refresh,
                        10*1000*1000 /*10s*/, 0) > 0) {
                /* If there are no (locally referenced) topics
                 * to query, refresh the broker list.
                 * This avoids getting idle-disconnected for clients
                 * that have not yet referenced a topic and makes
                 * sure such a client has an up to date broker list. */
                rd_kafka_metadata_refresh_brokers(
                        rk, NULL, "periodic broker list refresh");
        }
}



/**
 * @brief Wait for background threads to initialize.
 *
 * @returns the number of background threads still not initialized.
 *
 * @locality app thread calling rd_kafka_new()
 * @locks none
 */
static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
        struct timespec tspec;
        int ret;

        rd_timeout_init_timespec(&tspec, timeout_ms);

        mtx_lock(&rk->rk_init_lock);
        while (rk->rk_init_wait_cnt > 0 &&
               cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock,
                                 &tspec) == thrd_success)
                ;
        ret = rk->rk_init_wait_cnt;
        mtx_unlock(&rk->rk_init_lock);

        return ret;
}


/**
 * Main loop for Kafka handler thread.
 */
static int rd_kafka_thread_main (void *arg) {
        rd_kafka_t *rk = arg;
	rd_kafka_timer_t tmr_1s = RD_ZERO_INIT;
	rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
	rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;

        rd_kafka_set_thread_name("main");
        rd_kafka_set_thread_sysname("rdk:main");

        rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);

	(void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);

	/* Acquire lock (which was held by thread creator during creation)
	 * to synchronise state. */
	rd_kafka_wrlock(rk);
	rd_kafka_wrunlock(rk);

        /* 1 second timer for topic scan and connection checking. */
        rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
                             rd_kafka_1s_tmr_cb, NULL);
        if (rk->rk_conf.stats_interval_ms)
                rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
                                     rk->rk_conf.stats_interval_ms * 1000ll,
                                     rd_kafka_stats_emit_tmr_cb, NULL);
        if (rk->rk_conf.metadata_refresh_interval_ms > 0)
                rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
                                     rk->rk_conf.metadata_refresh_interval_ms *
                                     1000ll,
                                     rd_kafka_metadata_refresh_cb, NULL);

        if (rk->rk_cgrp)
                rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);

        if (rd_kafka_is_idempotent(rk))
                rd_kafka_idemp_init(rk);

        mtx_lock(&rk->rk_init_lock);
        rk->rk_init_wait_cnt--;
        cnd_broadcast(&rk->rk_init_cnd);
        mtx_unlock(&rk->rk_init_lock);

	while (likely(!rd_kafka_terminating(rk) ||
		      rd_kafka_q_len(rk->rk_ops) ||
                      (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state != RD_KAFKA_CGRP_STATE_TERM)))) {
                rd_ts_t sleeptime = rd_kafka_timers_next(
                        &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
                rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
		if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
			rd_kafka_cgrp_serve(rk->rk_cgrp);
		rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
	}

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Internal main thread terminating");

        if (rd_kafka_is_idempotent(rk))
                rd_kafka_idemp_term(rk);

	rd_kafka_q_disable(rk->rk_ops);
	rd_kafka_q_purge(rk->rk_ops);

        rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
        if (rk->rk_conf.stats_interval_ms)
                rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
        rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);

        /* Synchronise state */
        rd_kafka_wrlock(rk);
        rd_kafka_wrunlock(rk);

        rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_MAIN);

        rd_kafka_destroy_internal(rk);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Internal main thread termination done");

	rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);

	return 0;
}


static void rd_kafka_term_sig_handler (int sig) {
	/* nop */
}


rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
			  char *errstr, size_t errstr_size) {
	rd_kafka_t *rk;
	static rd_atomic32_t rkid;
        rd_kafka_conf_t *conf;
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
        int ret_errno = 0;
        const char *conf_err;
#ifndef _WIN32
        sigset_t newset, oldset;
#endif
        char builtin_features[128];
        size_t bflen;

        rd_kafka_global_init();

        /* rd_kafka_new() takes ownership of the provided \p app_conf
         * object if rd_kafka_new() succeeds.
         * Since \p app_conf is optional we allocate a default configuration
         * object here if \p app_conf is NULL.
         * The configuration object itself is struct-copied later
         * leaving the default *conf pointer to be ready for freeing.
         * In case new() fails and app_conf was specified we will clear out
         * rk_conf to avoid double-freeing from destroy_internal() and the
         * user's eventual call to rd_kafka_conf_destroy().
         * This is all a bit tricky but that's the nature of
         * legacy interfaces. */
        if (!app_conf)
                conf = rd_kafka_conf_new();
        else
                conf = app_conf;

        /* Verify and finalize configuration */
        if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
                /* Incompatible configuration settings */
                rd_snprintf(errstr, errstr_size, "%s", conf_err);
                if (!app_conf)
                        rd_kafka_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }


	rd_kafka_global_cnt_incr();

	/*
	 * Set up the handle.
	 */
	rk = rd_calloc(1, sizeof(*rk));

	rk->rk_type = type;
        rk->rk_ts_created = rd_clock();

        /* Struct-copy the config object. */
	rk->rk_conf = *conf;
        if (!app_conf)
                rd_free(conf); /* Free the base config struct only,
                                * not its fields since they were copied to
                                * rk_conf just above. Those fields are
                                * freed from rd_kafka_destroy_internal()
                                * as the rk itself is destroyed. */

        /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap. */
        if (rk->rk_conf.enable_random_seed)
                call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);

        /* Call on_new() interceptors */
        rd_kafka_interceptors_on_new(rk, &rk->rk_conf);

	rwlock_init(&rk->rk_lock);
        mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);

	cnd_init(&rk->rk_broker_state_change_cnd);
	mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
        rd_list_init(&rk->rk_broker_state_change_waiters, 8,
                     rd_kafka_enq_once_trigger_destroy);

        cnd_init(&rk->rk_init_cnd);
        mtx_init(&rk->rk_init_lock, mtx_plain);

        rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
        rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
        rd_interval_init(&rk->rk_suppress.sparse_connect_random);
        mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);

        rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);

	rk->rk_rep = rd_kafka_q_new(rk);
	rk->rk_ops = rd_kafka_q_new(rk);
        rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
        rk->rk_ops->rkq_opaque = rk;

        if (rk->rk_conf.log_queue) {
                rk->rk_logq = rd_kafka_q_new(rk);
                rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
                rk->rk_logq->rkq_opaque = rk;
        }

	TAILQ_INIT(&rk->rk_brokers);
	TAILQ_INIT(&rk->rk_topics);
        rd_kafka_timers_init(&rk->rk_timers, rk, rk->rk_ops);
        rd_kafka_metadata_cache_init(rk);
        rd_kafka_coord_cache_init(&rk->rk_coord_cache,
                                  rk->rk_conf.metadata_max_age_ms);
        rd_kafka_coord_reqs_init(rk);

	if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
                rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
        else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
                rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
        else
                rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
        if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;

	if (rk->rk_conf.rebalance_cb)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
	if (rk->rk_conf.offset_commit_cb)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
        if (rk->rk_conf.error_cb)
                rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
#if WITH_SASL_OAUTHBEARER
        if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
            !rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
                rd_kafka_conf_set_oauthbearer_token_refresh_cb(
                        &rk->rk_conf,
                        rd_kafka_oauthbearer_unsecured_token);

        if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
                rk->rk_conf.enabled_events |=
                        RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
#endif

        rk->rk_controllerid = -1;

        /* Admin client defaults */
        rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;

	if (rk->rk_conf.debug)
                rk->rk_conf.log_level = LOG_DEBUG;

	rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
                    rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
                    rd_atomic32_add(&rkid, 1));

	/* Construct clientid kafka string */
	rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);

        /* Convert group.id to kafka string (may be NULL) */
        rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);

        /* Config fixups */
        rk->rk_conf.queued_max_msg_bytes =
                (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;

	/* Enable api.version.request=true if fallback.broker.version
	 * indicates a supporting broker. */
	if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
		rk->rk_conf.api_version_request = 1;

        if (rk->rk_type == RD_KAFKA_PRODUCER) {
                mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
                cnd_init(&rk->rk_curr_msgs.cnd);
                rk->rk_curr_msgs.max_cnt =
                        rk->rk_conf.queue_buffering_max_msgs;
                if ((unsigned long long)rk->rk_conf.
                    queue_buffering_max_kbytes * 1024 >
                    (unsigned long long)SIZE_MAX) {
                        rk->rk_curr_msgs.max_size = SIZE_MAX;
                        rd_kafka_log(rk, LOG_WARNING, "QUEUESIZE",
                                     "queue.buffering.max.kbytes adjusted "
                                     "to system SIZE_MAX limit %"PRIusz" bytes",
                                     rk->rk_curr_msgs.max_size);
                } else {
                        rk->rk_curr_msgs.max_size =
                                (size_t)rk->rk_conf.
                                queue_buffering_max_kbytes * 1024;
                }
        }

        if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
                ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                ret_errno = EINVAL;
                goto fail;
        }

        /* Create Mock cluster */
        rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
        if (rk->rk_conf.mock.broker_cnt > 0) {
                rk->rk_mock.cluster = rd_kafka_mock_cluster_new(
                        rk, rk->rk_conf.mock.broker_cnt);

                if (!rk->rk_mock.cluster) {
                        rd_snprintf(errstr, errstr_size,
                                    "Failed to create mock cluster, see logs");
                        ret_err = RD_KAFKA_RESP_ERR__FAIL;
                        ret_errno = EINVAL;
                        goto fail;
                }

                rd_kafka_log(rk, LOG_NOTICE, "MOCK", "Mock cluster enabled: "
                             "original bootstrap.servers and security.protocol "
                             "ignored and replaced");

                /* Overwrite bootstrap.servers and connection settings */
                if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
                                      rd_kafka_mock_cluster_bootstraps(
                                              rk->rk_mock.cluster),
                                      NULL, 0) != RD_KAFKA_CONF_OK)
                        rd_assert(!"failed to replace mock bootstrap.servers");

                if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
                                      "plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
                        rd_assert(!"failed to reset mock security.protocol");

                rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
        }


        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
                /* Select SASL provider */
                if (rd_kafka_sasl_select_provider(rk,
                                                  errstr, errstr_size) == -1) {
                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                        ret_errno = EINVAL;
                        goto fail;
                }

                /* Initialize SASL provider */
                if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
                        rk->rk_conf.sasl.provider = NULL;
                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                        ret_errno = EINVAL;
                        goto fail;
                }
        }

#if WITH_SSL
        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
                /* Create SSL context */
                if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                        ret_errno = EINVAL;
                        goto fail;
                }
        }
#endif

        if (type == RD_KAFKA_CONSUMER) {
                rd_kafka_assignment_init(rk);

                if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
                        /* Create consumer group handle */
                        rk->rk_cgrp = rd_kafka_cgrp_new(rk,
                                                        rk->rk_group_id,
                                                        rk->rk_client_id);
                        rk->rk_consumer.q =
                                rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
                } else {
                        /* Legacy consumer */
                        rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
                }

        } else if (type == RD_KAFKA_PRODUCER) {
                rk->rk_eos.transactional_id =
                        rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
        }

#ifndef _WIN32
        /* Block all signals in newly created threads.
         * To avoid race condition we block all signals in the calling
         * thread, which the new thread will inherit its sigmask from,
         * and then restore the original sigmask of the calling thread when
         * we're done creating the thread. */
        sigemptyset(&oldset);
        sigfillset(&newset);
	if (rk->rk_conf.term_sig) {
		struct sigaction sa_term = {
			.sa_handler = rd_kafka_term_sig_handler
		};
		sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
	}
        pthread_sigmask(SIG_SETMASK, &newset, &oldset);
#endif

        mtx_lock(&rk->rk_init_lock);

        /* Create background thread and queue if background_event_cb()
         * has been configured.
         * Do this before creating the main thread since after
         * the main thread is created it is no longer trivial to error
         * out from rd_kafka_new(). */
        if (rk->rk_conf.background_event_cb) {
                /* Hold off background thread until thrd_create() is done. */
                rd_kafka_wrlock(rk);

                rk->rk_background.q = rd_kafka_q_new(rk);

                rk->rk_init_wait_cnt++;

                if ((thrd_create(&rk->rk_background.thread,
                                 rd_kafka_background_thread_main, rk)) !=
                    thrd_success) {
                        rk->rk_init_wait_cnt--;
                        ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
                        ret_errno = errno;
                        if (errstr)
                                rd_snprintf(errstr, errstr_size,
                                            "Failed to create background "
                                            "thread: %s (%i)",
                                            rd_strerror(errno), errno);
                        rd_kafka_wrunlock(rk);
                        mtx_unlock(&rk->rk_init_lock);

#ifndef _WIN32
                        /* Restore sigmask of caller */
                        pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
                        goto fail;
                }

                rd_kafka_wrunlock(rk);
        }



	/* Lock handle here to synchronise state, i.e., hold off
	 * the thread until we've finalized the handle. */
	rd_kafka_wrlock(rk);

	/* Create handler thread */
        rk->rk_init_wait_cnt++;
	if ((thrd_create(&rk->rk_thread,
			 rd_kafka_thread_main, rk)) != thrd_success) {
                rk->rk_init_wait_cnt--;
                ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
                ret_errno = errno;
		if (errstr)
			rd_snprintf(errstr, errstr_size,
				    "Failed to create thread: %s (%i)",
				    rd_strerror(errno), errno);
		rd_kafka_wrunlock(rk);
                mtx_unlock(&rk->rk_init_lock);
#ifndef _WIN32
                /* Restore sigmask of caller */
                pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
                goto fail;
        }

        rd_kafka_wrunlock(rk);
        mtx_unlock(&rk->rk_init_lock);

        /*
         * @warning `goto fail` is prohibited past this point
         */

        mtx_lock(&rk->rk_internal_rkb_lock);
	rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
						  RD_KAFKA_PROTO_PLAINTEXT,
						  "", 0, RD_KAFKA_NODEID_UA);
        mtx_unlock(&rk->rk_internal_rkb_lock);

	/* Add initial list of brokers from configuration */
	if (rk->rk_conf.brokerlist) {
		if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
			rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
					"No brokers configured");
	}

#ifndef _WIN32
	/* Restore sigmask of caller */
	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif

        /* Wait for background threads to fully initialize so that
         * the client instance is fully functional at the time it is
         * returned from the constructor. */
        if (rd_kafka_init_wait(rk, 60*1000) != 0) {
                /* This should never happen unless there is a bug
                 * or the OS is not scheduling the background threads.
                 * Either case there is no point in handling this gracefully
                 * in the current state since the thread joins are likely
                 * to hang as well. */
                mtx_lock(&rk->rk_init_lock);
                rd_kafka_log(rk, LOG_CRIT, "INIT",
                             "Failed to initialize %s: "
                             "%d background thread(s) did not initialize "
                             "within 60 seconds",
                             rk->rk_name, rk->rk_init_wait_cnt);
                if (errstr)
                        rd_snprintf(errstr, errstr_size,
                                    "Timed out waiting for "
                                    "%d background thread(s) to initialize",
                                    rk->rk_init_wait_cnt);
                mtx_unlock(&rk->rk_init_lock);

                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
                                        EDEADLK);
                return NULL;
        }

        rk->rk_initialized = 1;

        bflen = sizeof(builtin_features);
        if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
                              builtin_features, &bflen) !=
            RD_KAFKA_CONF_OK)
                rd_snprintf(builtin_features, sizeof(builtin_features), "?");
        rd_kafka_dbg(rk, ALL, "INIT",
                     "librdkafka v%s (0x%x) %s initialized "
                     "(builtin.features %s, %s, debug 0x%x)",
                     rd_kafka_version_str(), rd_kafka_version(),
                     rk->rk_name,
                     builtin_features, BUILT_WITH,
                     rk->rk_conf.debug);

        /* Log warnings for deprecated configuration */
        rd_kafka_conf_warn(rk);

        /* Debug dump configuration */
        if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
                rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL,
                                       &rk->rk_conf,
                                       "Client configuration");
                if (rk->rk_conf.topic_conf)
                        rd_kafka_anyconf_dump_dbg(
                                rk, _RK_TOPIC,
                                rk->rk_conf.topic_conf,
                                "Default topic configuration");
        }

        /* Free user supplied conf's base pointer on success,
         * but not the actual allocated fields since the struct
         * will have been copied in its entirety above. */
        if (app_conf)
                rd_free(app_conf);
        rd_kafka_set_last_error(0, 0);

        return rk;

fail:
        /*
         * Error out and clean up
         */

        /*
         * Tell background thread to terminate and wait for it to return.
         */
        rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);

        /* Terminate SASL provider */
        if (rk->rk_conf.sasl.provider)
                rd_kafka_sasl_term(rk);

        if (rk->rk_background.thread) {
                int res;
                thrd_join(rk->rk_background.thread, &res);
                rd_kafka_q_destroy_owner(rk->rk_background.q);
        }

        /* If on_new() interceptors have been called we also need
         * to allow interceptor clean-up by calling on_destroy() */
        rd_kafka_interceptors_on_destroy(rk);

        /* If rk_conf is a struct-copy of the application configuration
         * we need to avoid rk_conf fields from being freed from
         * rd_kafka_destroy_internal() since they belong to app_conf.
         * However, there are some internal fields, such as interceptors,
         * that belong to rk_conf and thus needs to be cleaned up.
         * Legacy APIs, sigh.. */
        if (app_conf) {
                rd_kafka_assignors_term(rk);
                rd_kafka_interceptors_destroy(&rk->rk_conf);
                memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
        }

        rd_kafka_destroy_internal(rk);
        rd_kafka_destroy_final(rk);

        rd_kafka_set_last_error(ret_err, ret_errno);

        return NULL;
}




/**
 * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
 * friends) since it does not have an API for stopping the cgrp we will need to
 * sort that out automatically in the background when all consumption
 * has stopped.
 *
 * Returns 0 if a  High level consumer is already instantiated
 * which means a Simple consumer cannot co-operate with it, else 1.
 *
 * A rd_kafka_t handle can never migrate from simple to high-level, or
 * vice versa, so we dont need a ..consumer_del().
 */
int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
        if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
                return 0;

        return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
}




/**
 * rktp fetch is split up in these parts:
 *   * application side:
 *   * broker side (handled by current leader broker thread for rktp):
 *          - the fetch state, initial offset, etc.
 *          - fetching messages, updating fetched offset, etc.
 *          - offset commits
 *
 * Communication between the two are:
 *    app side -> rdkafka main side: rktp_ops
 *    broker thread -> app side: rktp_fetchq
 *
 * There is no shared state between these threads, instead
 * state is communicated through the two op queues, and state synchronization
 * is performed by version barriers.
 *
 */

static RD_UNUSED
int rd_kafka_consume_start0 (rd_kafka_topic_t *rkt, int32_t partition,
                             int64_t offset, rd_kafka_q_t *rkq) {
	rd_kafka_toppar_t *rktp;

	if (partition < 0) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return -1;
        }

	rd_kafka_topic_wrlock(rkt);
	rktp = rd_kafka_toppar_desired_add(rkt, partition);
	rd_kafka_topic_wrunlock(rkt);

        /* Verify offset */
	if (offset == RD_KAFKA_OFFSET_BEGINNING ||
	    offset == RD_KAFKA_OFFSET_END ||
            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
                /* logical offsets */

	} else if (offset == RD_KAFKA_OFFSET_STORED) {
		/* offset manager */

                if (rkt->rkt_conf.offset_store_method ==
                    RD_KAFKA_OFFSET_METHOD_BROKER &&
                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
                        /* Broker based offsets require a group id. */
                        rd_kafka_toppar_destroy(rktp);
			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
						EINVAL);
                        return -1;
                }

	} else if (offset < 0) {
		rd_kafka_toppar_destroy(rktp);
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
					EINVAL);
		return -1;

        }

        rd_kafka_toppar_op_fetch_start(rktp, offset, rkq, RD_KAFKA_NO_REPLYQ);

        rd_kafka_toppar_destroy(rktp);

	rd_kafka_set_last_error(0, 0);
	return 0;
}




int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
			    int64_t offset) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
                     "Start consuming partition %"PRId32,partition);
 	return rd_kafka_consume_start0(rkt, partition, offset, NULL);
}

int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
				  int64_t offset, rd_kafka_queue_t *rkqu) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);

 	return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
}




static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
        rd_kafka_q_t *tmpq = NULL;
        rd_kafka_resp_err_t err;

        rd_kafka_topic_wrlock(rktp->rktp_rkt);
        rd_kafka_toppar_lock(rktp);
	rd_kafka_toppar_desired_del(rktp);
        rd_kafka_toppar_unlock(rktp);
	rd_kafka_topic_wrunlock(rktp->rktp_rkt);

        tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);

        rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));

        /* Synchronisation: Wait for stop reply from broker thread */
        err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
        rd_kafka_q_destroy_owner(tmpq);

	rd_kafka_set_last_error(err, err ? EINVAL : 0);

	return err ? -1 : 0;
}


int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
	rd_kafka_toppar_t *rktp;
        int r;

	if (partition == RD_KAFKA_PARTITION_UA) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
		return -1;
	}

	rd_kafka_topic_wrlock(rkt);
	if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
	    !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
		rd_kafka_topic_wrunlock(rkt);
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}
        rd_kafka_topic_wrunlock(rkt);

        r = rd_kafka_consume_stop0(rktp);
	/* set_last_error() called by stop0() */

        rd_kafka_toppar_destroy(rktp);

        return r;
}



rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
                                   int32_t partition,
                                   int64_t offset,
                                   int timeout_ms) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
	rd_kafka_toppar_t *rktp;
        rd_kafka_q_t *tmpq = NULL;
        rd_kafka_resp_err_t err;
        rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;

        /* FIXME: simple consumer check */

	if (partition == RD_KAFKA_PARTITION_UA)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

	rd_kafka_topic_rdlock(rkt);
	if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
	    !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
		rd_kafka_topic_rdunlock(rkt);
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
	}
	rd_kafka_topic_rdunlock(rkt);

        if (timeout_ms) {
                tmpq = rd_kafka_q_new(rkt->rkt_rk);
                replyq = RD_KAFKA_REPLYQ(tmpq, 0);
        }

        if ((err = rd_kafka_toppar_op_seek(rktp, offset, replyq))) {
                if (tmpq)
                        rd_kafka_q_destroy_owner(tmpq);
                rd_kafka_toppar_destroy(rktp);
                return err;
        }

	rd_kafka_toppar_destroy(rktp);

        if (tmpq) {
                err = rd_kafka_q_wait_result(tmpq, timeout_ms);
                rd_kafka_q_destroy_owner(tmpq);
                return err;
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


rd_kafka_error_t *
rd_kafka_seek_partitions (rd_kafka_t *rk,
                          rd_kafka_topic_partition_list_t *partitions,
                          int timeout_ms) {
        rd_kafka_q_t *tmpq = NULL;
        rd_kafka_topic_partition_t *rktpar;
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
        int cnt = 0;

        if (rk->rk_type != RD_KAFKA_CONSUMER)
                return rd_kafka_error_new(
                        RD_KAFKA_RESP_ERR__INVALID_ARG,
                        "Must only be used on consumer instance");

        if (!partitions || partitions->cnt == 0)
                return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
                                          "partitions must be specified");

        if (timeout_ms)
                tmpq = rd_kafka_q_new(rk);

        RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
                rd_kafka_toppar_t *rktp;
                rd_kafka_resp_err_t err;

                rktp = rd_kafka_toppar_get2(rk,
                                            rktpar->topic,
                                            rktpar->partition,
                                            rd_false/*no-ua-on-miss*/,
                                            rd_false/*no-create-on-miss*/);
                if (!rktp) {
                        rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        continue;
                }

                err = rd_kafka_toppar_op_seek(rktp, rktpar->offset,
                                              RD_KAFKA_REPLYQ(tmpq, 0));
                if (err) {
                        rktpar->err = err;
                } else {
                        rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
                        cnt++;
                }

                rd_kafka_toppar_destroy(rktp); /* refcnt from toppar_get2() */
        }

        if (!timeout_ms)
                return NULL;


        while (cnt > 0) {
                rd_kafka_op_t *rko;

                rko = rd_kafka_q_pop(tmpq, rd_timeout_remains(abs_timeout), 0);
                if (!rko) {
                        rd_kafka_q_destroy_owner(tmpq);

                        return rd_kafka_error_new(
                                RD_KAFKA_RESP_ERR__TIMED_OUT,
                                "Timed out waiting for %d remaining partition "
                                "seek(s) to finish", cnt);
                }

                if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
                        rd_kafka_q_destroy_owner(tmpq);
                        rd_kafka_op_destroy(rko);

                        return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
                                                  "Instance is terminating");
                }

                rd_assert(rko->rko_rktp);

                rktpar = rd_kafka_topic_partition_list_find(
                        partitions,
                        rko->rko_rktp->rktp_rkt->rkt_topic->str,
                        rko->rko_rktp->rktp_partition);
                rd_assert(rktpar);

                rktpar->err = rko->rko_err;

                rd_kafka_op_destroy(rko);

                cnt--;
        }

        rd_kafka_q_destroy_owner(tmpq);

        return NULL;
}



static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
					int timeout_ms,
					rd_kafka_message_t **rkmessages,
					size_t rkmessages_size) {
	/* Populate application's rkmessages array. */
	return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
					   rkmessages, rkmessages_size);
}


ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
				int timeout_ms,
				rd_kafka_message_t **rkmessages,
				size_t rkmessages_size) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
	rd_kafka_toppar_t *rktp;
	ssize_t cnt;

	/* Get toppar */
	rd_kafka_topic_rdlock(rkt);
	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!rktp))
		rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

	/* Populate application's rkmessages array. */
	cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
					  rkmessages, rkmessages_size);

	rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */

	rd_kafka_set_last_error(0, 0);

	return cnt;
}

ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
				      int timeout_ms,
				      rd_kafka_message_t **rkmessages,
				      size_t rkmessages_size) {
	/* Populate application's rkmessages array. */
	return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
				       rkmessages, rkmessages_size);
}


struct consume_ctx {
	void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
	void *opaque;
};


/**
 * Trampoline for application's consume_cb()
 */
static rd_kafka_op_res_t
rd_kafka_consume_cb (rd_kafka_t *rk,
                     rd_kafka_q_t *rkq,
                     rd_kafka_op_t *rko,
                     rd_kafka_q_cb_type_t cb_type, void *opaque) {
	struct consume_ctx *ctx = opaque;
	rd_kafka_message_t *rkmessage;

        if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
            rko->rko_type == RD_KAFKA_OP_BARRIER) {
                rd_kafka_op_destroy(rko);
                return RD_KAFKA_OP_RES_HANDLED;
        }

	rkmessage = rd_kafka_message_get(rko);

	rd_kafka_op_offset_store(rk, rko);

        ctx->consume_cb(rkmessage, ctx->opaque);

        rd_kafka_op_destroy(rko);

        return RD_KAFKA_OP_RES_HANDLED;
}



static rd_kafka_op_res_t
rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
                            void (*consume_cb) (rd_kafka_message_t
                                                *rkmessage,
                                                void *opaque),
                            void *opaque) {
        struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
        rd_kafka_op_res_t res;

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rkq->rkq_rk);

        res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
                               RD_KAFKA_Q_CB_RETURN,
                               rd_kafka_consume_cb, &ctx);

        rd_kafka_app_polled(rkq->rkq_rk);

        return res;
}


int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
			       int timeout_ms,
			       void (*consume_cb) (rd_kafka_message_t
						   *rkmessage,
						   void *opaque),
			       void *opaque) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
	rd_kafka_toppar_t *rktp;
	int r;

	/* Get toppar */
	rd_kafka_topic_rdlock(rkt);
	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!rktp))
		rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

	r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
                                       rkt->rkt_conf.consume_callback_max_msgs,
				       consume_cb, opaque);

	rd_kafka_toppar_destroy(rktp);

	rd_kafka_set_last_error(0, 0);

	return r;
}



int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
				     int timeout_ms,
				     void (*consume_cb) (rd_kafka_message_t
							 *rkmessage,
							 void *opaque),
				     void *opaque) {
	return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
					  consume_cb, opaque);
}


/**
 * Serve queue 'rkq' and return one message.
 * By serving the queue it will also call any registered callbacks
 * registered for matching events, this includes consumer_cb()
 * in which case no message will be returned.
 */
static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
                                              rd_kafka_q_t *rkq,
					      int timeout_ms) {
	rd_kafka_op_t *rko;
	rd_kafka_message_t *rkmessage = NULL;
	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rk);

	rd_kafka_yield_thread = 0;
        while ((rko = rd_kafka_q_pop(rkq,
                                     rd_timeout_remains_us(abs_timeout), 0))) {
                rd_kafka_op_res_t res;

                res = rd_kafka_poll_cb(rk, rkq, rko,
                                       RD_KAFKA_Q_CB_RETURN, NULL);

                if (res == RD_KAFKA_OP_RES_PASS)
                        break;

                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
                             rd_kafka_yield_thread)) {
                        /* Callback called rd_kafka_yield(), we must
                         * stop dispatching the queue and return. */
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
                                                EINTR);
                        rd_kafka_app_polled(rk);
                        return NULL;
                }

                /* Message was handled by callback. */
                continue;
        }

	if (!rko) {
		/* Timeout reached with no op returned. */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
					ETIMEDOUT);
                rd_kafka_app_polled(rk);
		return NULL;
	}

        rd_kafka_assert(rk,
                        rko->rko_type == RD_KAFKA_OP_FETCH ||
                        rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);

	/* Get rkmessage from rko */
	rkmessage = rd_kafka_message_get(rko);

	/* Store offset */
	rd_kafka_op_offset_store(rk, rko);

	rd_kafka_set_last_error(0, 0);

        rd_kafka_app_polled(rk);

	return rkmessage;
}

rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
                                      int32_t partition,
				      int timeout_ms) {
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
	rd_kafka_toppar_t *rktp;
	rd_kafka_message_t *rkmessage;

	rd_kafka_topic_rdlock(rkt);
	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!rktp))
		rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return NULL;
	}

	rkmessage = rd_kafka_consume0(rkt->rkt_rk,
                                      rktp->rktp_fetchq, timeout_ms);

	rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */

	return rkmessage;
}


rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
					    int timeout_ms) {
	return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
}




rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
        rd_kafka_cgrp_t *rkcg;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

        rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}




rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
                                            int timeout_ms) {
        rd_kafka_cgrp_t *rkcg;

        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
                return rkmessage;
        }

        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
}


rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
        rd_kafka_cgrp_t *rkcg;
        rd_kafka_op_t *rko;
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
	rd_kafka_q_t *rkq;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

        /* If a fatal error has been raised and this is an
         * explicit consumer_close() from the application we return
         * a fatal error. Otherwise let the "silent" no_consumer_close
         * logic be performed to clean up properly. */
        if (rd_kafka_fatal_error_code(rk) &&
            !rd_kafka_destroy_flags_no_consumer_close(rk))
                return RD_KAFKA_RESP_ERR__FATAL;

        rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");

	/* Redirect cgrp queue to our temporary queue to make sure
	 * all posted ops (e.g., rebalance callbacks) are served by
	 * this function. */
	rkq = rd_kafka_q_new(rk);
	rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);

        rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */

        /* Disable the queue if termination is immediate or the user
         * does not want the blocking consumer_close() behaviour, this will
         * cause any ops posted for this queue (such as rebalance) to
         * be destroyed.
         */
        if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
                rd_kafka_dbg(rk, CONSUMER, "CLOSE",
                             "Disabling and purging temporary queue to quench "
                             "close events");
                rd_kafka_q_disable(rkq);
                /* Purge ops already enqueued */
                rd_kafka_q_purge(rkq);
        } else {
                rd_kafka_dbg(rk, CONSUMER, "CLOSE",
                             "Waiting for close events");
                while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
                        rd_kafka_op_res_t res;
                        if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
                            RD_KAFKA_OP_TERMINATE) {
                                err = rko->rko_err;
                                rd_kafka_op_destroy(rko);
                                break;
                        }
                        res = rd_kafka_poll_cb(rk, rkq, rko,
                                               RD_KAFKA_Q_CB_RETURN, NULL);
                        if (res == RD_KAFKA_OP_RES_PASS)
                                rd_kafka_op_destroy(rko);
                        /* Ignore YIELD, we need to finish */
                }
        }

        rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);

        rd_kafka_q_destroy_owner(rkq);

        rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Consumer closed");

        return err;
}



rd_kafka_resp_err_t
rd_kafka_committed (rd_kafka_t *rk,
		    rd_kafka_topic_partition_list_t *partitions,
		    int timeout_ms) {
        rd_kafka_q_t *rkq;
        rd_kafka_resp_err_t err;
        rd_kafka_cgrp_t *rkcg;
	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        if (!partitions)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

	/* Set default offsets. */
	rd_kafka_topic_partition_list_reset_offsets(partitions,
                                                    RD_KAFKA_OFFSET_INVALID);

	rkq = rd_kafka_q_new(rk);

        do {
                rd_kafka_op_t *rko;
		int state_version = rd_kafka_brokers_get_state_version(rk);

                rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
		rd_kafka_op_set_replyq(rko, rkq, NULL);

                /* Issue #827
                 * Copy partition list to avoid use-after-free if we time out
                 * here, the app frees the list, and then cgrp starts
                 * processing the op. */
		rko->rko_u.offset_fetch.partitions =
                        rd_kafka_topic_partition_list_copy(partitions);
                rko->rko_u.offset_fetch.require_stable =
                        rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
		rko->rko_u.offset_fetch.do_free = 1;

                if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
                        err = RD_KAFKA_RESP_ERR__DESTROY;
                        break;
                }

                rko = rd_kafka_q_pop(rkq,
                                     rd_timeout_remains_us(abs_timeout), 0);
                if (rko) {
                        if (!(err = rko->rko_err))
                                rd_kafka_topic_partition_list_update(
                                        partitions,
                                        rko->rko_u.offset_fetch.partitions);
                        else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
				    err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
				   !rd_kafka_brokers_wait_state_change(
					   rk, state_version,
					   rd_timeout_remains(abs_timeout)))
				err = RD_KAFKA_RESP_ERR__TIMED_OUT;

                        rd_kafka_op_destroy(rko);
                } else
                        err = RD_KAFKA_RESP_ERR__TIMED_OUT;
        } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
		 err == RD_KAFKA_RESP_ERR__WAIT_COORD);

        rd_kafka_q_destroy_owner(rkq);

        return err;
}



rd_kafka_resp_err_t
rd_kafka_position (rd_kafka_t *rk,
		   rd_kafka_topic_partition_list_t *partitions) {
 	int i;

	for (i = 0 ; i < partitions->cnt ; i++) {
		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
		rd_kafka_toppar_t *rktp;

		if (!(rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
						    rktpar->partition, 0, 1))) {
			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
			rktpar->offset = RD_KAFKA_OFFSET_INVALID;
			continue;
		}

		rd_kafka_toppar_lock(rktp);
		rktpar->offset = rktp->rktp_app_offset;
		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
		rd_kafka_toppar_unlock(rktp);
		rd_kafka_toppar_destroy(rktp);
	}

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}



struct _query_wmark_offsets_state {
	rd_kafka_resp_err_t err;
	const char *topic;
	int32_t partition;
	int64_t offsets[2];
	int     offidx;  /* next offset to set from response */
	rd_ts_t ts_end;
	int     state_version;  /* Broker state version */
};

static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
						  rd_kafka_broker_t *rkb,
						  rd_kafka_resp_err_t err,
						  rd_kafka_buf_t *rkbuf,
						  rd_kafka_buf_t *request,
						  void *opaque) {
	struct _query_wmark_offsets_state *state;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope when query_watermark..()
                 * timed out and returned to the caller. */
                return;
        }

        state = opaque;

        offsets = rd_kafka_topic_partition_list_new(1);
        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
                rd_kafka_topic_partition_list_destroy(offsets);
                return; /* Retrying */
        }

	/* Retry if no broker connection is available yet. */
	if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
	     err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
	    rkb &&
	    rd_kafka_brokers_wait_state_change(
		    rkb->rkb_rk, state->state_version,
		    rd_timeout_remains(state->ts_end))) {
		/* Retry */
		state->state_version = rd_kafka_brokers_get_state_version(rk);
		request->rkbuf_retries = 0;
		if (rd_kafka_buf_retry(rkb, request)) {
                        rd_kafka_topic_partition_list_destroy(offsets);
                        return; /* Retry in progress */
                }
		/* FALLTHRU */
	}

        /* Partition not seen in response. */
        if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
                                                          state->topic,
                                                          state->partition)))
                err = RD_KAFKA_RESP_ERR__BAD_MSG;
        else if (rktpar->err)
                err = rktpar->err;
        else
                state->offsets[state->offidx] = rktpar->offset;

        state->offidx++;

        if (err || state->offidx == 2) /* Error or Done */
                state->err = err;

        rd_kafka_topic_partition_list_destroy(offsets);
}


rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
                                  int32_t partition,
                                  int64_t *low, int64_t *high, int timeout_ms) {
        rd_kafka_q_t *rkq;
        struct _query_wmark_offsets_state state;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_kafka_topic_partition_list_t *partitions;
        rd_kafka_topic_partition_t *rktpar;
        struct rd_kafka_partition_leader *leader;
        rd_list_t leaders;
        rd_kafka_resp_err_t err;

        partitions = rd_kafka_topic_partition_list_new(1);
        rktpar = rd_kafka_topic_partition_list_add(partitions,
                                                   topic, partition);

        rd_list_init(&leaders, partitions->cnt,
                     (void *)rd_kafka_partition_leader_destroy);

        err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
                                                          &leaders, timeout_ms);
        if (err) {
                         rd_list_destroy(&leaders);
                         rd_kafka_topic_partition_list_destroy(partitions);
                         return err;
        }

        leader = rd_list_elem(&leaders, 0);

        rkq = rd_kafka_q_new(rk);

        /* Due to KAFKA-1588 we need to send a request for each wanted offset,
         * in this case one for the low watermark and one for the high. */
        state.topic = topic;
        state.partition = partition;
        state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
        state.offsets[1] = RD_KAFKA_OFFSET_END;
        state.offidx = 0;
        state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
        state.ts_end = ts_end;
        state.state_version = rd_kafka_brokers_get_state_version(rk);


        rktpar->offset =  RD_KAFKA_OFFSET_BEGINNING;
        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
                               RD_KAFKA_REPLYQ(rkq, 0),
                               rd_kafka_query_wmark_offsets_resp_cb,
                               &state);

        rktpar->offset =  RD_KAFKA_OFFSET_END;
        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
                               RD_KAFKA_REPLYQ(rkq, 0),
                               rd_kafka_query_wmark_offsets_resp_cb,
                               &state);

        rd_kafka_topic_partition_list_destroy(partitions);
        rd_list_destroy(&leaders);

        /* Wait for reply (or timeout) */
        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
                                rd_kafka_poll_cb, NULL) !=
               RD_KAFKA_OP_RES_YIELD)
                ;

        rd_kafka_q_destroy_owner(rkq);

        if (state.err)
                return state.err;
        else if (state.offidx != 2)
                return RD_KAFKA_RESP_ERR__FAIL;

        /* We are not certain about the returned order. */
        if (state.offsets[0] < state.offsets[1]) {
                *low = state.offsets[0];
                *high  = state.offsets[1];
        } else {
                *low = state.offsets[1];
                *high = state.offsets[0];
        }

        /* If partition is empty only one offset (the last) will be returned. */
        if (*low < 0 && *high >= 0)
                *low = *high;

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
				int32_t partition,
				int64_t *low, int64_t *high) {
	rd_kafka_toppar_t *rktp;

	rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
	if (!rktp)
		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

	rd_kafka_toppar_lock(rktp);
	*low = rktp->rktp_lo_offset;
	*high = rktp->rktp_hi_offset;
	rd_kafka_toppar_unlock(rktp);

	rd_kafka_toppar_destroy(rktp);

	return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief get_offsets_for_times() state
 */
struct _get_offsets_for_times {
        rd_kafka_topic_partition_list_t *results;
        rd_kafka_resp_err_t err;
        int wait_reply;
        int state_version;
        rd_ts_t ts_end;
};

/**
 * @brief Handle OffsetRequest responses
 */
static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
                                                  rd_kafka_broker_t *rkb,
                                                  rd_kafka_resp_err_t err,
                                                  rd_kafka_buf_t *rkbuf,
                                                  rd_kafka_buf_t *request,
                                                  void *opaque) {
        struct _get_offsets_for_times *state;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope when offsets_for_times()
                 * timed out and returned to the caller. */
                return;
        }

        state = opaque;

        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
                                     state->results);
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
                return; /* Retrying */

        /* Retry if no broker connection is available yet. */
        if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
             err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
            rkb &&
            rd_kafka_brokers_wait_state_change(
                    rkb->rkb_rk, state->state_version,
                    rd_timeout_remains(state->ts_end))) {
                /* Retry */
                state->state_version = rd_kafka_brokers_get_state_version(rk);
                request->rkbuf_retries = 0;
                if (rd_kafka_buf_retry(rkb, request))
                        return; /* Retry in progress */
                /* FALLTHRU */
        }

        if (err && !state->err)
                state->err = err;

        state->wait_reply--;
}


rd_kafka_resp_err_t
rd_kafka_offsets_for_times (rd_kafka_t *rk,
                            rd_kafka_topic_partition_list_t *offsets,
                            int timeout_ms) {
        rd_kafka_q_t *rkq;
        struct _get_offsets_for_times state = RD_ZERO_INIT;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_list_t leaders;
        int i;
        rd_kafka_resp_err_t err;
        struct rd_kafka_partition_leader *leader;
        int tmout;

        if (offsets->cnt == 0)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

        rd_list_init(&leaders, offsets->cnt,
                     (void *)rd_kafka_partition_leader_destroy);

        err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
                                                          timeout_ms);
        if (err) {
                rd_list_destroy(&leaders);
                return err;
        }


        rkq = rd_kafka_q_new(rk);

        state.wait_reply = 0;
        state.results = rd_kafka_topic_partition_list_new(offsets->cnt);

        /* For each leader send a request for its partitions */
        RD_LIST_FOREACH(leader, &leaders, i) {
                state.wait_reply++;
                rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
                                       RD_KAFKA_REPLYQ(rkq, 0),
                                       rd_kafka_get_offsets_for_times_resp_cb,
                                       &state);
        }

        rd_list_destroy(&leaders);

        /* Wait for reply (or timeout) */
        while (state.wait_reply > 0 &&
               !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
                                 rd_kafka_poll_cb, NULL);

        rd_kafka_q_destroy_owner(rkq);

        if (state.wait_reply > 0 && !state.err)
                state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;

        /* Then update the queried partitions. */
        if (!state.err)
                rd_kafka_topic_partition_list_update(offsets, state.results);

        rd_kafka_topic_partition_list_destroy(state.results);

        return state.err;
}


/**
 * @brief rd_kafka_poll() (and similar) op callback handler.
 *        Will either call registered callback depending on cb_type and op type
 *        or return op to application, if applicable (e.g., fetch message).
 *
 * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
 *          other res types (such as OP_RES_PASS).
 *
 * @locality any thread that serves op queues
 */
rd_kafka_op_res_t
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                  rd_kafka_q_cb_type_t cb_type, void *opaque) {
	rd_kafka_msg_t *rkm;
        rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;

        /* Special handling for events based on cb_type */
        if (cb_type == RD_KAFKA_Q_CB_EVENT &&
            rd_kafka_event_setup(rk, rko)) {
                /* Return-as-event requested. */
                return RD_KAFKA_OP_RES_PASS; /* Return as event */
        }

        switch ((int)rko->rko_type)
        {
        case RD_KAFKA_OP_FETCH:
                if (!rk->rk_conf.consume_cb ||
                    cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
                else {
                        struct consume_ctx ctx = {
                                .consume_cb = rk->rk_conf.consume_cb,
                                .opaque = rk->rk_conf.opaque };

                        return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
                }
                break;

        case RD_KAFKA_OP_REBALANCE:
                if (rk->rk_conf.rebalance_cb)
                        rk->rk_conf.rebalance_cb(
                                rk, rko->rko_err,
                                rko->rko_u.rebalance.partitions,
                                rk->rk_conf.opaque);
                else {
                        /** If EVENT_REBALANCE is enabled but rebalance_cb
                         *  isn't, we need to perform a dummy assign for the
                         *  application. This might happen during termination
                         *  with consumer_close() */
                        rd_kafka_dbg(rk, CGRP, "UNASSIGN",
                                     "Forcing unassign of %d partition(s)",
                                     rko->rko_u.rebalance.partitions ?
                                     rko->rko_u.rebalance.partitions->cnt : 0);
                        rd_kafka_assign(rk, NULL);
                }
                break;

        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
		if (!rko->rko_u.offset_commit.cb)
			return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
		rko->rko_u.offset_commit.cb(
                        rk, rko->rko_err,
			rko->rko_u.offset_commit.partitions,
			rko->rko_u.offset_commit.opaque);
                break;

        case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY:
                /* Reply from toppar FETCH_STOP */
                rd_kafka_assignment_partition_stopped(rk, rko->rko_rktp);
                break;

        case RD_KAFKA_OP_CONSUMER_ERR:
                /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
                 *   Consumer errors are returned to the application
                 *   as rkmessages, not error callbacks.
                 *
                 * rd_kafka_poll() (_Q_CB_GLOBAL):
                 *   convert to ERR op (fallthru)
                 */
                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
                        /* return as message_t to application */
                        return RD_KAFKA_OP_RES_PASS;
                }
		/* FALLTHRU */

	case RD_KAFKA_OP_ERR:
		if (rk->rk_conf.error_cb)
			rk->rk_conf.error_cb(rk, rko->rko_err,
					     rko->rko_u.err.errstr,
                                             rk->rk_conf.opaque);
                else
                        rd_kafka_log(rk, LOG_ERR, "ERROR",
                                     "%s: %s",
                                     rk->rk_name,
                                     rko->rko_u.err.errstr);
                break;

	case RD_KAFKA_OP_DR:
		/* Delivery report:
		 * call application DR callback for each message. */
		while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
                        rd_kafka_message_t *rkmessage;

			TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
				     rkm, rkm_link);

                        rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);

                        if (likely(rk->rk_conf.dr_msg_cb != NULL)) {
                                rk->rk_conf.dr_msg_cb(rk, rkmessage,
                                                      rk->rk_conf.opaque);

                        } else if (rk->rk_conf.dr_cb) {
                                rk->rk_conf.dr_cb(rk,
                                                  rkmessage->payload,
                                                  rkmessage->len,
                                                  rkmessage->err,
                                                  rk->rk_conf.opaque,
                                                  rkmessage->_private);
                        } else if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
                                rd_kafka_log(rk, LOG_WARNING, "DRDROP",
                                             "Dropped delivery report for "
                                             "message to "
                                             "%s [%"PRId32"] (%s) with "
                                             "opaque %p: flush() or poll() "
                                             "should not be called when "
                                             "EVENT_DR is enabled",
                                             rd_kafka_topic_name(rkmessage->
                                                                 rkt),
                                             rkmessage->partition,
                                             rd_kafka_err2name(rkmessage->err),
                                             rkmessage->_private);
                        } else {
                                rd_assert(!*"BUG: neither a delivery report "
                                          "callback or EVENT_DR flag set");
                        }

                        rd_kafka_msg_destroy(rk, rkm);

                        if (unlikely(rd_kafka_yield_thread)) {
                                /* Callback called yield(),
                                 * re-enqueue the op (if there are any
                                 * remaining messages). */
                                if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
                                                 rkmq_msgs))
                                        rd_kafka_q_reenq(rkq, rko);
                                else
                                        rd_kafka_op_destroy(rko);
                                return RD_KAFKA_OP_RES_YIELD;
                        }
		}

		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);

		break;

	case RD_KAFKA_OP_THROTTLE:
		if (rk->rk_conf.throttle_cb)
			rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename,
						rko->rko_u.throttle.nodeid,
						rko->rko_u.throttle.
						throttle_time,
						rk->rk_conf.opaque);
		break;

	case RD_KAFKA_OP_STATS:
		/* Statistics */
		if (rk->rk_conf.stats_cb &&
		    rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
                                         rko->rko_u.stats.json_len,
					 rk->rk_conf.opaque) == 1)
			rko->rko_u.stats.json = NULL; /* Application wanted json ptr */
		break;

        case RD_KAFKA_OP_LOG:
                if (likely(rk->rk_conf.log_cb &&
                           rk->rk_conf.log_level >= rko->rko_u.log.level))
                        rk->rk_conf.log_cb(rk,
                                           rko->rko_u.log.level,
                                           rko->rko_u.log.fac,
                                           rko->rko_u.log.str);
                break;

        case RD_KAFKA_OP_TERMINATE:
                /* nop: just a wake-up */
                break;

        case RD_KAFKA_OP_CREATETOPICS:
        case RD_KAFKA_OP_DELETETOPICS:
        case RD_KAFKA_OP_CREATEPARTITIONS:
        case RD_KAFKA_OP_ALTERCONFIGS:
        case RD_KAFKA_OP_DESCRIBECONFIGS:
        case RD_KAFKA_OP_DELETERECORDS:
        case RD_KAFKA_OP_DELETEGROUPS:
        case RD_KAFKA_OP_ADMIN_FANOUT:
                /* Calls op_destroy() from worker callback,
                 * when the time comes. */
                res = rd_kafka_op_call(rk, rkq, rko);
                break;

        case RD_KAFKA_OP_ADMIN_RESULT:
                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
                        return RD_KAFKA_OP_RES_PASS; /* Don't handle here */

                /* Op is silently destroyed below */
                break;

        case RD_KAFKA_OP_TXN:
                /* Must only be handled by rdkafka main thread */
                rd_assert(thrd_is_current(rk->rk_thread));
                res = rd_kafka_op_call(rk, rkq, rko);
                break;

        case RD_KAFKA_OP_BARRIER:
                break;

        case RD_KAFKA_OP_PURGE:
                rd_kafka_purge(rk, rko->rko_u.purge.flags);
                break;

        default:
                rd_kafka_assert(rk, !*"cant handle op type");
                break;
        }

        if (res == RD_KAFKA_OP_RES_HANDLED)
                rd_kafka_op_destroy(rko);

        return res;
}

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
        int r;

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rk);

        r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
                             RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

        rd_kafka_app_polled(rk);

        return r;
}


rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
        rd_kafka_op_t *rko;

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

        rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
                                   RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

        rd_kafka_app_polled(rkqu->rkqu_rk);

        if (!rko)
                return NULL;

        return rko;
}

int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) {
        int r;

        if (timeout_ms)
                rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

        r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
                             RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

        rd_kafka_app_polled(rkqu->rkqu_rk);

        return r;
}



static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
				  rd_kafka_toppar_t *rktp) {

	fprintf(fp, "%s%.*s [%"PRId32"] broker %s, "
                "leader_id %s\n",
		indent,
		RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		rktp->rktp_partition,
		rktp->rktp_broker ?
		rktp->rktp_broker->rkb_name : "none",
                rktp->rktp_leader ?
                rktp->rktp_leader->rkb_name : "none");
	fprintf(fp,
		"%s refcnt %i\n"
		"%s msgq:      %i messages\n"
		"%s xmit_msgq: %i messages\n"
		"%s total:     %"PRIu64" messages, %"PRIu64" bytes\n",
		indent, rd_refcnt_get(&rktp->rktp_refcnt),
		indent, rktp->rktp_msgq.rkmq_msg_cnt,
		indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt,
                indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs),
                rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
}

static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) {
	rd_kafka_toppar_t *rktp;

        if (locks)
                rd_kafka_broker_lock(rkb);
        fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
                " in state %s (for %.3fs)\n",
                rkb, rkb->rkb_name, rkb->rkb_nodeid,
                rd_kafka_broker_state_names[rkb->rkb_state],
                rkb->rkb_ts_state ?
                (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f :
                0.0f);
        fprintf(fp, "  refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
        fprintf(fp, "  outbuf_cnt: %i waitresp_cnt: %i\n",
                rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
                rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
        fprintf(fp,
                "  %"PRIu64 " messages sent, %"PRIu64" bytes, "
                "%"PRIu64" errors, %"PRIu64" timeouts\n"
                "  %"PRIu64 " messages received, %"PRIu64" bytes, "
                "%"PRIu64" errors\n"
                "  %"PRIu64 " messageset transmissions were retried\n",
                rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes),
                rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts),
                rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes),
                rd_atomic64_get(&rkb->rkb_c.rx_err),
                rd_atomic64_get(&rkb->rkb_c.tx_retries));

        fprintf(fp, "  %i toppars:\n", rkb->rkb_toppar_cnt);
        TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
                rd_kafka_toppar_dump(fp, "   ", rktp);
        if (locks) {
                rd_kafka_broker_unlock(rkb);
        }
}


static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
	rd_kafka_broker_t *rkb;
	rd_kafka_topic_t *rkt;
        rd_kafka_toppar_t *rktp;
        int i;
	unsigned int tot_cnt;
	size_t tot_size;

	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);

	if (locks)
                rd_kafka_rdlock(rk);
#if ENABLE_DEVEL
        fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
#endif
	fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);

	fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n",
		tot_cnt, tot_size);
	fprintf(fp, " rk_rep reply queue: %i ops\n",
		rd_kafka_q_len(rk->rk_rep));

	fprintf(fp, " brokers:\n");
        if (locks)
                mtx_lock(&rk->rk_internal_rkb_lock);
        if (rk->rk_internal_rkb)
                rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
        if (locks)
                mtx_unlock(&rk->rk_internal_rkb_lock);

	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
                rd_kafka_broker_dump(fp, rkb, locks);
	}

        fprintf(fp, " cgrp:\n");
        if (rk->rk_cgrp) {
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
                fprintf(fp, "  %.*s in state %s, flags 0x%x\n",
                        RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
                        rd_kafka_cgrp_state_names[rkcg->rkcg_state],
                        rkcg->rkcg_flags);
                fprintf(fp, "   coord_id %"PRId32", broker %s\n",
                        rkcg->rkcg_coord_id,
                        rkcg->rkcg_curr_coord ?
                        rd_kafka_broker_name(rkcg->rkcg_curr_coord):"(none)");

                fprintf(fp, "  toppars:\n");
                RD_LIST_FOREACH(rktp, &rkcg->rkcg_toppars, i) {
                        fprintf(fp, "   %.*s [%"PRId32"] in state %s\n",
                                RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                                rktp->rktp_partition,
                                rd_kafka_fetch_states[rktp->rktp_fetch_state]);
                }
        }

	fprintf(fp, " topics:\n");
	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
		fprintf(fp, "  %.*s with %"PRId32" partitions, state %s, "
                        "refcnt %i\n",
			RD_KAFKAP_STR_PR(rkt->rkt_topic),
			rkt->rkt_partition_cnt,
                        rd_kafka_topic_state_names[rkt->rkt_state],
                        rd_refcnt_get(&rkt->rkt_refcnt));
		if (rkt->rkt_ua)
			rd_kafka_toppar_dump(fp, "   ", rkt->rkt_ua);
                if (rd_list_empty(&rkt->rkt_desp)) {
                        fprintf(fp, "   desired partitions:");
                        RD_LIST_FOREACH(rktp, &rkt->rkt_desp,  i)
                                fprintf(fp, " %"PRId32, rktp->rktp_partition);
                        fprintf(fp, "\n");
                }
	}

        fprintf(fp, "\n");
        rd_kafka_metadata_cache_dump(fp, rk);

        if (locks)
                rd_kafka_rdunlock(rk);
}

void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
        if (rk)
                rd_kafka_dump0(fp, rk, 1/*locks*/);
}



const char *rd_kafka_name (const rd_kafka_t *rk) {
	return rk->rk_name;
}

rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
        return rk->rk_type;
}


char *rd_kafka_memberid (const rd_kafka_t *rk) {
	rd_kafka_op_t *rko;
	rd_kafka_cgrp_t *rkcg;
	char *memberid;

	if (!(rkcg = rd_kafka_cgrp_get(rk)))
		return NULL;

	rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
	if (!rko)
		return NULL;
	memberid = rko->rko_u.name.str;
	rko->rko_u.name.str = NULL;
	rd_kafka_op_destroy(rko);

	return memberid;
}


char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) {
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        /* ClusterId is returned in Metadata >=V2 responses and
         * cached on the rk. If no cached value is available
         * it means no metadata has been received yet, or we're
         * using a lower protocol version
         * (e.g., lack of api.version.request=true). */

        while (1) {
                int remains_ms;

                rd_kafka_rdlock(rk);

                if (rk->rk_clusterid) {
                        /* Cached clusterid available. */
                        char *ret = rd_strdup(rk->rk_clusterid);
                        rd_kafka_rdunlock(rk);
                        return ret;
                } else if (rk->rk_ts_metadata > 0) {
                        /* Metadata received but no clusterid,
                         * this probably means the broker is too old
                         * or api.version.request=false. */
                        rd_kafka_rdunlock(rk);
                        return NULL;
                }

                rd_kafka_rdunlock(rk);

                /* Wait for up to timeout_ms for a metadata refresh,
                 * if permitted by application. */
                remains_ms = rd_timeout_remains(abs_timeout);
                if (rd_timeout_expired(remains_ms))
                        return NULL;

                rd_kafka_metadata_cache_wait_change(rk, remains_ms);
        }

        return NULL;
}


int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms) {
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        /* ControllerId is returned in Metadata >=V1 responses and
         * cached on the rk. If no cached value is available
         * it means no metadata has been received yet, or we're
         * using a lower protocol version
         * (e.g., lack of api.version.request=true). */

        while (1) {
                int remains_ms;
                int version;

                version = rd_kafka_brokers_get_state_version(rk);

                rd_kafka_rdlock(rk);

                if (rk->rk_controllerid != -1) {
                        /* Cached controllerid available. */
                        rd_kafka_rdunlock(rk);
                        return rk->rk_controllerid;
                } else if (rk->rk_ts_metadata > 0) {
                        /* Metadata received but no clusterid,
                         * this probably means the broker is too old
                         * or api.version.request=false. */
                        rd_kafka_rdunlock(rk);
                        return -1;
                }

                rd_kafka_rdunlock(rk);

                /* Wait for up to timeout_ms for a metadata refresh,
                 * if permitted by application. */
                remains_ms = rd_timeout_remains(abs_timeout);
                if (rd_timeout_expired(remains_ms))
                        return -1;

                rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
        }

        return -1;
}


void *rd_kafka_opaque (const rd_kafka_t *rk) {
        return rk->rk_conf.opaque;
}


int rd_kafka_outq_len (rd_kafka_t *rk) {
        return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
                (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
}


rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
        unsigned int msg_cnt = 0;

	if (rk->rk_type != RD_KAFKA_PRODUCER)
		return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;

        rd_kafka_yield_thread = 0;

        if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
                /* Application wants delivery reports as events rather
                 * than callbacks, we must thus not serve this queue
                 * with rd_kafka_poll() since that would trigger non-existent
                 * delivery report callbacks, which would result
                 * in the delivery reports being dropped.
                 * Instead we rely on the application to serve the event
                 * queue in another thread, so all we do here is wait
                 * for the current message count to reach zero. */
                rd_kafka_curr_msgs_wait_zero(rk, timeout_ms, &msg_cnt);

                return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
                        RD_KAFKA_RESP_ERR_NO_ERROR;

        } else {
                /* Standard poll interface.
                 *
                 * First poll call is non-blocking for the case
                 * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
                 * called at least once. */
                rd_ts_t ts_end = rd_timeout_init(timeout_ms);
                int tmout = RD_POLL_NOWAIT;
                int qlen = 0;

                do {
                        rd_kafka_poll(rk, tmout);
                        qlen = rd_kafka_q_len(rk->rk_rep);
                        msg_cnt = rd_kafka_curr_msgs_cnt(rk);
                } while (qlen + msg_cnt > 0 &&
                         !rd_kafka_yield_thread &&
                         (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
                         RD_POLL_NOWAIT);

                return qlen + msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
                        RD_KAFKA_RESP_ERR_NO_ERROR;
        }
}

/**
 * @brief Purge the partition message queue (according to \p purge_flags) for
 *        all toppars.
 *
 * This is a necessity to avoid the race condition when a purge() is scheduled
 * shortly in-between an rktp has been created but before it has been
 * joined to a broker handler thread.
 *
 * The rktp_xmit_msgq is handled by the broker-thread purge.
 *
 * @returns the number of messages purged.
 *
 * @locks_required rd_kafka_*lock()
 * @locks_acquired rd_kafka_topic_rdlock()
 */
static int
rd_kafka_purge_toppars (rd_kafka_t *rk, int purge_flags) {
        rd_kafka_topic_t *rkt;
        int cnt = 0;

        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                rd_kafka_toppar_t *rktp;
                int i;

                rd_kafka_topic_rdlock(rkt);
                for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
                        cnt += rd_kafka_toppar_purge_queues(
                                rkt->rkt_p[i], purge_flags, rd_false/*!xmit*/);

                RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
                        cnt += rd_kafka_toppar_purge_queues(
                                rktp, purge_flags, rd_false/*!xmit*/);

                if (rkt->rkt_ua)
                        cnt += rd_kafka_toppar_purge_queues(
                                rkt->rkt_ua, purge_flags, rd_false/*!xmit*/);
                rd_kafka_topic_rdunlock(rkt);
        }

        return cnt;
}


rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
        rd_kafka_broker_t *rkb;
        rd_kafka_q_t *tmpq = NULL;
        int waitcnt = 0;

        if (rk->rk_type != RD_KAFKA_PRODUCER)
                return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;

        /* Check that future flags are not passed */
        if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

        /* Nothing to purge */
        if (!purge_flags)
                return RD_KAFKA_RESP_ERR_NO_ERROR;

        /* Set up a reply queue to wait for broker thread signalling
         * completion, unless non-blocking. */
        if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
                tmpq = rd_kafka_q_new(rk);

        rd_kafka_rdlock(rk);

        /* Purge msgq for all toppars. */
        rd_kafka_purge_toppars(rk, purge_flags);

        /* Send purge request to all broker threads */
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
                rd_kafka_broker_purge_queues(rkb, purge_flags,
                                             RD_KAFKA_REPLYQ(tmpq, 0));
                waitcnt++;
        }

        rd_kafka_rdunlock(rk);


        if (tmpq) {
                /* Wait for responses */
                while (waitcnt-- > 0)
                        rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);

                rd_kafka_q_destroy_owner(tmpq);
        }

        /* Purge messages for the UA(-1) partitions (which are not
         * handled by a broker thread) */
        if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
                rd_kafka_purge_ua_toppar_queues(rk);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}




/**
 * @returns a csv string of purge flags in thread-local storage
 */
const char *rd_kafka_purge_flags2str (int flags) {
        static const char *names[] = {
                "queue",
                "inflight",
                "non-blocking",
                NULL
        };
        static RD_TLS char ret[64];

        return rd_flags2str(ret, sizeof(ret), names, flags);
}


int rd_kafka_version (void) {
	return RD_KAFKA_VERSION;
}

const char *rd_kafka_version_str (void) {
	static RD_TLS char ret[128];
	size_t of = 0, r;

	if (*ret)
		return ret;

#ifdef LIBRDKAFKA_GIT_VERSION
	if (*LIBRDKAFKA_GIT_VERSION) {
		of = rd_snprintf(ret, sizeof(ret), "%s",
				 *LIBRDKAFKA_GIT_VERSION == 'v' ?
                                 LIBRDKAFKA_GIT_VERSION+1 :
                                 LIBRDKAFKA_GIT_VERSION);
		if (of > sizeof(ret))
			of = sizeof(ret);
	}
#endif

#define _my_sprintf(...) do {						\
		r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__);	\
		if (r > sizeof(ret)-of)					\
			r = sizeof(ret)-of;				\
		of += r;						\
	} while(0)

	if (of == 0) {
		int ver = rd_kafka_version();
		int prel = (ver & 0xff);
		_my_sprintf("%i.%i.%i",
			    (ver >> 24) & 0xff,
			    (ver >> 16) & 0xff,
			    (ver >> 8) & 0xff);
		if (prel != 0xff) {
			/* pre-builds below 200 are just running numbers,
			 * above 200 are RC numbers. */
			if (prel <= 200)
				_my_sprintf("-pre%d", prel);
			else
				_my_sprintf("-RC%d", prel - 200);
		}
	}

#if ENABLE_DEVEL
	_my_sprintf("-devel");
#endif

#if WITHOUT_OPTIMIZATION
	_my_sprintf("-O0");
#endif

	return ret;
}


/**
 * Assert trampoline to print some debugging information on crash.
 */
void
RD_NORETURN
rd_kafka_crash (const char *file, int line, const char *function,
                rd_kafka_t *rk, const char *reason) {
        fprintf(stderr, "*** %s:%i:%s: %s ***\n",
                file, line, function, reason);
        if (rk)
                rd_kafka_dump0(stderr, rk, 0/*no locks*/);
        abort();
}





struct list_groups_state {
        rd_kafka_q_t *q;
        rd_kafka_resp_err_t err;
        int wait_cnt;
        const char *desired_group;
        struct rd_kafka_group_list *grplist;
        int grplist_size;
};

static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
					     rd_kafka_broker_t *rkb,
                                             rd_kafka_resp_err_t err,
                                             rd_kafka_buf_t *reply,
                                             rd_kafka_buf_t *request,
                                             void *opaque) {
        struct list_groups_state *state;
        const int log_decode_errors = LOG_ERR;
        int cnt;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope due to list_groups()
                 * timing out and returning. */
                return;
        }

        state = opaque;
        state->wait_cnt--;

        if (err)
                goto err;

        rd_kafka_buf_read_i32(reply, &cnt);

        while (cnt-- > 0) {
                int16_t ErrorCode;
                rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
                int MemberCnt;
                struct rd_kafka_group_info *gi;

                if (state->grplist->group_cnt == state->grplist_size) {
                        /* Grow group array */
                        state->grplist_size *= 2;
                        state->grplist->groups =
                                rd_realloc(state->grplist->groups,
                                           state->grplist_size *
                                           sizeof(*state->grplist->groups));
                }

                gi = &state->grplist->groups[state->grplist->group_cnt++];
                memset(gi, 0, sizeof(*gi));

                rd_kafka_buf_read_i16(reply, &ErrorCode);
                rd_kafka_buf_read_str(reply, &Group);
                rd_kafka_buf_read_str(reply, &GroupState);
                rd_kafka_buf_read_str(reply, &ProtoType);
                rd_kafka_buf_read_str(reply, &Proto);
                rd_kafka_buf_read_i32(reply, &MemberCnt);

                if (MemberCnt > 100000) {
                        err = RD_KAFKA_RESP_ERR__BAD_MSG;
                        goto err;
                }

                rd_kafka_broker_lock(rkb);
                gi->broker.id = rkb->rkb_nodeid;
                gi->broker.host = rd_strdup(rkb->rkb_origname);
                gi->broker.port = rkb->rkb_port;
                rd_kafka_broker_unlock(rkb);

                gi->err = ErrorCode;
                gi->group = RD_KAFKAP_STR_DUP(&Group);
                gi->state = RD_KAFKAP_STR_DUP(&GroupState);
                gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
                gi->protocol = RD_KAFKAP_STR_DUP(&Proto);

                if (MemberCnt > 0)
                        gi->members =
                                rd_malloc(MemberCnt * sizeof(*gi->members));

                while (MemberCnt-- > 0) {
                        rd_kafkap_str_t MemberId, ClientId, ClientHost;
                        rd_kafkap_bytes_t Meta, Assignment;
                        struct rd_kafka_group_member_info *mi;

                        mi = &gi->members[gi->member_cnt++];
                        memset(mi, 0, sizeof(*mi));

                        rd_kafka_buf_read_str(reply, &MemberId);
                        rd_kafka_buf_read_str(reply, &ClientId);
                        rd_kafka_buf_read_str(reply, &ClientHost);
                        rd_kafka_buf_read_bytes(reply, &Meta);
                        rd_kafka_buf_read_bytes(reply, &Assignment);

                        mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
                        mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
                        mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);

                        if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
                                mi->member_metadata_size = 0;
                                mi->member_metadata = NULL;
                        } else {
                                mi->member_metadata_size =
                                        RD_KAFKAP_BYTES_LEN(&Meta);
                                mi->member_metadata =
                                        rd_memdup(Meta.data,
                                                  mi->member_metadata_size);
                        }

                        if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
                                mi->member_assignment_size = 0;
                                mi->member_assignment = NULL;
                        } else {
                                mi->member_assignment_size =
                                        RD_KAFKAP_BYTES_LEN(&Assignment);
                                mi->member_assignment =
                                        rd_memdup(Assignment.data,
                                                  mi->member_assignment_size);
                        }
                }
        }

err:
        state->err = err;
        return;

 err_parse:
        state->err = reply->rkbuf_err;
}

static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
					 rd_kafka_broker_t *rkb,
                                         rd_kafka_resp_err_t err,
                                         rd_kafka_buf_t *reply,
                                         rd_kafka_buf_t *request,
                                         void *opaque) {
        struct list_groups_state *state;
        const int log_decode_errors = LOG_ERR;
        int16_t ErrorCode;
        char **grps = NULL;
        int cnt, grpcnt, i = 0;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' is no longer in scope because
                 * list_groups() timed out and returned to the caller.
                 * We must not touch anything here but simply return. */
                return;
        }

        state = opaque;

        state->wait_cnt--;

        if (err)
                goto err;

        rd_kafka_buf_read_i16(reply, &ErrorCode);
        if (ErrorCode) {
                err = ErrorCode;
                goto err;
        }

        rd_kafka_buf_read_i32(reply, &cnt);

        if (state->desired_group)
                grpcnt = 1;
        else
                grpcnt = cnt;

        if (cnt == 0 || grpcnt == 0)
                return;

        grps = rd_malloc(sizeof(*grps) * grpcnt);

        while (cnt-- > 0) {
                rd_kafkap_str_t grp, proto;

                rd_kafka_buf_read_str(reply, &grp);
                rd_kafka_buf_read_str(reply, &proto);

                if (state->desired_group &&
                    rd_kafkap_str_cmp_str(&grp, state->desired_group))
                        continue;

                grps[i++] = RD_KAFKAP_STR_DUP(&grp);

                if (i == grpcnt)
                        break;
        }

        if (i > 0) {
                state->wait_cnt++;
                rd_kafka_DescribeGroupsRequest(rkb,
                                               (const char **)grps, i,
                                               RD_KAFKA_REPLYQ(state->q, 0),
                                               rd_kafka_DescribeGroups_resp_cb,
                                               state);

                while (i-- > 0)
                        rd_free(grps[i]);
        }


        rd_free(grps);

err:
        state->err = err;
        return;

 err_parse:
        if (grps)
                rd_free(grps);
        state->err = reply->rkbuf_err;
}

rd_kafka_resp_err_t
rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
                      const struct rd_kafka_group_list **grplistp,
                      int timeout_ms) {
        rd_kafka_broker_t *rkb;
        int rkb_cnt = 0;
        struct list_groups_state state = RD_ZERO_INIT;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
	int state_version = rd_kafka_brokers_get_state_version(rk);

        /* Wait until metadata has been fetched from cluster so
         * that we have a full broker list.
	 * This state only happens during initial client setup, after that
	 * there'll always be a cached metadata copy. */
        rd_kafka_rdlock(rk);
        while (!rk->rk_ts_metadata) {
                rd_kafka_rdunlock(rk);

		if (!rd_kafka_brokers_wait_state_change(
			    rk, state_version, rd_timeout_remains(ts_end)))
                        return RD_KAFKA_RESP_ERR__TIMED_OUT;

                rd_kafka_rdlock(rk);
        }

        state.q = rd_kafka_q_new(rk);
        state.desired_group = group;
        state.grplist = rd_calloc(1, sizeof(*state.grplist));
        state.grplist_size = group ? 1 : 32;

        state.grplist->groups = rd_malloc(state.grplist_size *
                                          sizeof(*state.grplist->groups));

        /* Query each broker for its list of groups */
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
                rd_kafka_broker_lock(rkb);
                if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
                        rd_kafka_broker_unlock(rkb);
                        continue;
                }
                rd_kafka_broker_unlock(rkb);

                state.wait_cnt++;
                rkb_cnt++;
                rd_kafka_ListGroupsRequest(rkb,
                                           RD_KAFKA_REPLYQ(state.q, 0),
                                           rd_kafka_ListGroups_resp_cb,
                                           &state);
        }
        rd_kafka_rdunlock(rk);

        if (rkb_cnt == 0) {
                state.err = RD_KAFKA_RESP_ERR__TRANSPORT;

        } else {
                int remains;

                while (state.wait_cnt > 0 &&
                       !rd_timeout_expired((remains =
                                            rd_timeout_remains(ts_end)))) {
                        rd_kafka_q_serve(state.q, remains, 0,
                                         RD_KAFKA_Q_CB_CALLBACK,
                                         rd_kafka_poll_cb, NULL);
                        /* Ignore yields */
                }
        }

        rd_kafka_q_destroy_owner(state.q);

        if (state.wait_cnt > 0 && !state.err) {
                if (state.grplist->group_cnt == 0)
                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
                else {
                        *grplistp = state.grplist;
                        return RD_KAFKA_RESP_ERR__PARTIAL;
                }
        }

        if (state.err)
                rd_kafka_group_list_destroy(state.grplist);
        else
                *grplistp = state.grplist;

        return state.err;
}


void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) {
        struct rd_kafka_group_list *grplist =
                (struct rd_kafka_group_list *)grplist0;

        while (grplist->group_cnt-- > 0) {
                struct rd_kafka_group_info *gi;
                gi = &grplist->groups[grplist->group_cnt];

                if (gi->broker.host)
                        rd_free(gi->broker.host);
                if (gi->group)
                        rd_free(gi->group);
                if (gi->state)
                        rd_free(gi->state);
                if (gi->protocol_type)
                        rd_free(gi->protocol_type);
                if (gi->protocol)
                        rd_free(gi->protocol);

                while (gi->member_cnt-- > 0) {
                        struct rd_kafka_group_member_info *mi;
                        mi = &gi->members[gi->member_cnt];

                        if (mi->member_id)
                                rd_free(mi->member_id);
                        if (mi->client_id)
                                rd_free(mi->client_id);
                        if (mi->client_host)
                                rd_free(mi->client_host);
                        if (mi->member_metadata)
                                rd_free(mi->member_metadata);
                        if (mi->member_assignment)
                                rd_free(mi->member_assignment);
                }

                if (gi->members)
                        rd_free(gi->members);
        }

        if (grplist->groups)
                rd_free(grplist->groups);

        rd_free(grplist);
}



const char *rd_kafka_get_debug_contexts(void) {
	return RD_KAFKA_DEBUG_CONTEXTS;
}


int rd_kafka_path_is_dir (const char *path) {
#ifdef _WIN32
	struct _stat st;
	return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
#else
	struct stat st;
	return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
#endif
}


/**
 * @returns true if directory is empty or can't be accessed, else false.
 */
rd_bool_t rd_kafka_dir_is_empty (const char *path) {
#if _WIN32
        /* FIXME: Unsupported */
        return rd_true;
#else
        DIR *dir;
        struct dirent *d;

        dir = opendir(path);
        if (!dir)
                return rd_true;

        while ((d = readdir(dir))) {

                if (!strcmp(d->d_name, ".") ||
                    !strcmp(d->d_name, ".."))
                        continue;

                if (d->d_type == DT_REG || d->d_type == DT_LNK ||
                    d->d_type == DT_DIR) {
                        closedir(dir);
                        return rd_false;
                }
        }

        closedir(dir);
        return rd_true;
#endif
}


void *rd_kafka_mem_malloc (rd_kafka_t *rk, size_t size) {
        return rd_malloc(size);
}

void *rd_kafka_mem_calloc(rd_kafka_t *rk, size_t num, size_t size) {
        return rd_calloc(num, size);
}

void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) {
        rd_free(ptr);
}


int rd_kafka_errno (void) {
        return errno;
}

int rd_kafka_unittest (void) {
        return rd_unittest();
}
