/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Apache Kafka consumer & producer performance tester
 * using the Kafka driver from librdkafka
 * (https://github.com/confluentinc/librdkafka)
 */

#ifdef _MSC_VER
#define _CRT_SECURE_NO_WARNINGS /* Silence nonsense on MSVC */
#endif

#include "../src/rd.h"

#define _GNU_SOURCE /* for strndup() */
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <errno.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
/* Do not include these defines from your program, they will not be
 * provided by librdkafka. */
#include "rd.h"
#include "rdtime.h"

#ifdef _WIN32
#include "../win32/wingetopt.h"
#include "../win32/wintime.h"
#endif


static volatile sig_atomic_t run = 1;
static int forever               = 1;
static rd_ts_t dispintvl         = 1000;
static int do_seq                = 0;
static int exit_after            = 0;
static int exit_eof              = 0;
static FILE *stats_fp;
static int dr_disp_div;
static int verbosity        = 1;
static int latency_mode     = 0;
static FILE *latency_fp     = NULL;
static int msgcnt           = -1;
static int incremental_mode = 0;
static int partition_cnt    = 0;
static int eof_cnt          = 0;
static int with_dr          = 1;
static int read_hdrs        = 0;


static void stop(int sig) {
        if (!run)
                exit(0);
        run = 0;
}

static long int msgs_wait_cnt         = 0;
static long int msgs_wait_produce_cnt = 0;
static rd_ts_t t_end;
static rd_kafka_t *global_rk;

struct avg {
        int64_t val;
        int cnt;
        uint64_t ts_start;
};

static struct {
        rd_ts_t t_start;
        rd_ts_t t_end;
        rd_ts_t t_end_send;
        uint64_t msgs;
        uint64_t msgs_last;
        uint64_t msgs_dr_ok;
        uint64_t msgs_dr_err;
        uint64_t bytes_dr_ok;
        uint64_t bytes;
        uint64_t bytes_last;
        uint64_t tx;
        uint64_t tx_err;
        uint64_t avg_rtt;
        uint64_t offset;
        rd_ts_t t_fetch_latency;
        rd_ts_t t_last;
        rd_ts_t t_enobufs_last;
        rd_ts_t t_total;
        rd_ts_t latency_last;
        rd_ts_t latency_lo;
        rd_ts_t latency_hi;
        rd_ts_t latency_sum;
        int latency_cnt;
        int64_t last_offset;
} cnt;


uint64_t wall_clock(void) {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        return ((uint64_t)tv.tv_sec * 1000000LLU) + ((uint64_t)tv.tv_usec);
}

static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
        if (err == RD_KAFKA_RESP_ERR__FATAL) {
                char errstr[512];
                err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
                printf("%% FATAL ERROR CALLBACK: %s: %s: %s\n",
                       rd_kafka_name(rk), rd_kafka_err2str(err), errstr);
        } else {
                printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk),
                       rd_kafka_err2str(err), reason);
        }
}

static void throttle_cb(rd_kafka_t *rk,
                        const char *broker_name,
                        int32_t broker_id,
                        int throttle_time_ms,
                        void *opaque) {
        printf("%% THROTTLED %dms by %s (%" PRId32 ")\n", throttle_time_ms,
               broker_name, broker_id);
}

static void offset_commit_cb(rd_kafka_t *rk,
                             rd_kafka_resp_err_t err,
                             rd_kafka_topic_partition_list_t *offsets,
                             void *opaque) {
        int i;

        if (err || verbosity >= 2)
                printf("%% Offset commit of %d partition(s): %s\n",
                       offsets->cnt, rd_kafka_err2str(err));

        for (i = 0; i < offsets->cnt; i++) {
                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
                if (rktpar->err || verbosity >= 2)
                        printf("%%  %s [%" PRId32 "] @ %" PRId64 ": %s\n",
                               rktpar->topic, rktpar->partition, rktpar->offset,
                               rd_kafka_err2str(err));
        }
}

/**
 * @brief Add latency measurement
 */
static void latency_add(int64_t ts, const char *who) {
        if (ts > cnt.latency_hi)
                cnt.latency_hi = ts;
        if (!cnt.latency_lo || ts < cnt.latency_lo)
                cnt.latency_lo = ts;
        cnt.latency_last = ts;
        cnt.latency_cnt++;
        cnt.latency_sum += ts;
        if (latency_fp)
                fprintf(latency_fp, "%" PRIu64 "\n", ts);
}


static void msg_delivered(rd_kafka_t *rk,
                          const rd_kafka_message_t *rkmessage,
                          void *opaque) {
        static rd_ts_t last;
        rd_ts_t now = rd_clock();
        static int msgs;

        msgs++;

        msgs_wait_cnt--;

        if (rkmessage->err)
                cnt.msgs_dr_err++;
        else {
                cnt.msgs_dr_ok++;
                cnt.bytes_dr_ok += rkmessage->len;
        }

        if (latency_mode) {
                /* Extract latency */
                int64_t source_ts;
                if (sscanf(rkmessage->payload, "LATENCY:%" SCNd64,
                           &source_ts) == 1)
                        latency_add(wall_clock() - source_ts, "producer");
        }


        if ((rkmessage->err && (cnt.msgs_dr_err < 50 ||
                                !(cnt.msgs_dr_err % (dispintvl / 1000)))) ||
            !last || msgs_wait_cnt < 5 || !(msgs_wait_cnt % dr_disp_div) ||
            (now - last) >= dispintvl * 1000 || verbosity >= 3) {
                if (rkmessage->err && verbosity >= 2)
                        printf("%% Message delivery failed (broker %" PRId32
                               "): "
                               "%s [%" PRId32
                               "]: "
                               "%s (%li remain)\n",
                               rd_kafka_message_broker_id(rkmessage),
                               rd_kafka_topic_name(rkmessage->rkt),
                               rkmessage->partition,
                               rd_kafka_err2str(rkmessage->err), msgs_wait_cnt);
                else if (verbosity > 2)
                        printf("%% Message delivered (offset %" PRId64
                               ", broker %" PRId32
                               "): "
                               "%li remain\n",
                               rkmessage->offset,
                               rd_kafka_message_broker_id(rkmessage),
                               msgs_wait_cnt);
                if (verbosity >= 3 && do_seq)
                        printf(" --> \"%.*s\"\n", (int)rkmessage->len,
                               (const char *)rkmessage->payload);
                last = now;
        }

        cnt.last_offset = rkmessage->offset;

        if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) {
                if (verbosity >= 2 && cnt.msgs > 0) {
                        double error_percent =
                            (double)(cnt.msgs - cnt.msgs_dr_ok) / cnt.msgs *
                            100;
                        printf(
                            "%% Messages delivered with failure "
                            "percentage of %.5f%%\n",
                            error_percent);
                }
                t_end = rd_clock();
                run   = 0;
        }

        if (exit_after && exit_after <= msgs) {
                printf("%% Hard exit after %i messages, as requested\n",
                       exit_after);
                exit(0);
        }
}


static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {

        if (rkmessage->err) {
                if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                        cnt.offset = rkmessage->offset;

                        if (verbosity >= 1)
                                printf(
                                    "%% Consumer reached end of "
                                    "%s [%" PRId32
                                    "] "
                                    "message queue at offset %" PRId64 "\n",
                                    rd_kafka_topic_name(rkmessage->rkt),
                                    rkmessage->partition, rkmessage->offset);

                        if (exit_eof && ++eof_cnt == partition_cnt)
                                run = 0;

                        return;
                }

                printf("%% Consume error for topic \"%s\" [%" PRId32
                       "] "
                       "offset %" PRId64 ": %s\n",
                       rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
                                      : "",
                       rkmessage->partition, rkmessage->offset,
                       rd_kafka_message_errstr(rkmessage));

                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
                        run = 0;

                cnt.msgs_dr_err++;
                return;
        }

        /* Start measuring from first message received */
        if (!cnt.t_start)
                cnt.t_start = cnt.t_last = rd_clock();

        cnt.offset = rkmessage->offset;
        cnt.msgs++;
        cnt.bytes += rkmessage->len;

        if (verbosity >= 3 || (verbosity >= 2 && !(cnt.msgs % 1000000)))
                printf("@%" PRId64 ": %.*s: %.*s\n", rkmessage->offset,
                       (int)rkmessage->key_len, (char *)rkmessage->key,
                       (int)rkmessage->len, (char *)rkmessage->payload);


        if (latency_mode) {
                int64_t remote_ts, ts;

                if (rkmessage->len > 8 &&
                    !memcmp(rkmessage->payload, "LATENCY:", 8) &&
                    sscanf(rkmessage->payload, "LATENCY:%" SCNd64,
                           &remote_ts) == 1) {
                        ts = wall_clock() - remote_ts;
                        if (ts > 0 && ts < (1000000 * 60 * 5)) {
                                latency_add(ts, "consumer");
                        } else {
                                if (verbosity >= 1)
                                        printf(
                                            "Received latency timestamp is too "
                                            "far off: %" PRId64
                                            "us (message offset %" PRId64
                                            "): ignored\n",
                                            ts, rkmessage->offset);
                        }
                } else if (verbosity > 1)
                        printf("not a LATENCY payload: %.*s\n",
                               (int)rkmessage->len, (char *)rkmessage->payload);
        }

        if (read_hdrs) {
                rd_kafka_headers_t *hdrs;
                /* Force parsing of headers but don't do anything with them. */
                rd_kafka_message_headers(rkmessage, &hdrs);
        }

        if (msgcnt != -1 && (int)cnt.msgs >= msgcnt)
                run = 0;
}


static void rebalance_cb(rd_kafka_t *rk,
                         rd_kafka_resp_err_t err,
                         rd_kafka_topic_partition_list_t *partitions,
                         void *opaque) {
        rd_kafka_error_t *error     = NULL;
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;

        if (exit_eof && !strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
                fprintf(stderr,
                        "%% This example has not been modified to "
                        "support -e (exit on EOF) when "
                        "partition.assignment.strategy "
                        "is set to an incremental/cooperative strategy: "
                        "-e will not behave as expected\n");

        switch (err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                fprintf(stderr,
                        "%% Group rebalanced (%s): "
                        "%d new partition(s) assigned\n",
                        rd_kafka_rebalance_protocol(rk), partitions->cnt);

                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_assign(rk, partitions);
                } else {
                        ret_err = rd_kafka_assign(rk, partitions);
                        eof_cnt = 0;
                }

                partition_cnt += partitions->cnt;
                break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                fprintf(stderr,
                        "%% Group rebalanced (%s): %d partition(s) revoked\n",
                        rd_kafka_rebalance_protocol(rk), partitions->cnt);

                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_unassign(rk, partitions);
                        partition_cnt -= partitions->cnt;
                } else {
                        ret_err       = rd_kafka_assign(rk, NULL);
                        partition_cnt = 0;
                }

                eof_cnt = 0; /* FIXME: Not correct for incremental case */
                break;

        default:
                break;
        }

        if (error) {
                fprintf(stderr, "%% incremental assign failure: %s\n",
                        rd_kafka_error_string(error));
                rd_kafka_error_destroy(error);
        } else if (ret_err) {
                fprintf(stderr, "%% assign failure: %s\n",
                        rd_kafka_err2str(ret_err));
        }
}


/**
 * Find and extract single value from a two-level search.
 * First find 'field1', then find 'field2' and extract its value.
 * Returns 0 on miss else the value.
 */
static uint64_t json_parse_fields(const char *json,
                                  const char **end,
                                  const char *field1,
                                  const char *field2) {
        const char *t = json;
        const char *t2;
        int len1 = (int)strlen(field1);
        int len2 = (int)strlen(field2);

        while ((t2 = strstr(t, field1))) {
                uint64_t v;

                t = t2;
                t += len1;

                /* Find field */
                if (!(t2 = strstr(t, field2)))
                        continue;
                t2 += len2;

                while (isspace((int)*t2))
                        t2++;

                v = strtoull(t2, (char **)&t, 10);
                if (t2 == t)
                        continue;

                *end = t;
                return v;
        }

        *end = t + strlen(t);
        return 0;
}

/**
 * Parse various values from rdkafka stats
 */
static void json_parse_stats(const char *json) {
        const char *t;
#define MAX_AVGS 100 /* max number of brokers to scan for rtt */
        uint64_t avg_rtt[MAX_AVGS + 1];
        int avg_rtt_i = 0;

        /* Store totals at end of array */
        avg_rtt[MAX_AVGS] = 0;

        /* Extract all broker RTTs */
        t = json;
        while (avg_rtt_i < MAX_AVGS && *t) {
                avg_rtt[avg_rtt_i] =
                    json_parse_fields(t, &t, "\"rtt\":", "\"avg\":");

                /* Skip low RTT values, means no messages are passing */
                if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/)
                        continue;


                avg_rtt[MAX_AVGS] += avg_rtt[avg_rtt_i];
                avg_rtt_i++;
        }

        if (avg_rtt_i > 0)
                avg_rtt[MAX_AVGS] /= avg_rtt_i;

        cnt.avg_rtt = avg_rtt[MAX_AVGS];
}


static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {

        /* Extract values for our own stats */
        json_parse_stats(json);

        if (stats_fp)
                fprintf(stats_fp, "%s\n", json);
        return 0;
}

#define _OTYPE_TAB     0x1 /* tabular format */
#define _OTYPE_SUMMARY 0x2 /* summary format */
#define _OTYPE_FORCE   0x4 /* force output regardless of interval timing */
static void
print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) {
        rd_ts_t now = rd_clock();
        rd_ts_t t_total;
        static int rows_written = 0;
        int print_header;
        double latency_avg = 0.0f;
        char extra[512];
        int extra_of = 0;
        *extra       = '\0';

        if (!(otype & _OTYPE_FORCE) &&
            (((otype & _OTYPE_SUMMARY) && verbosity == 0) ||
             cnt.t_last + dispintvl > now))
                return;

        print_header = !rows_written || (verbosity > 0 && !(rows_written % 20));

        if (cnt.t_end_send)
                t_total = cnt.t_end_send - cnt.t_start;
        else if (cnt.t_end)
                t_total = cnt.t_end - cnt.t_start;
        else if (cnt.t_start)
                t_total = now - cnt.t_start;
        else
                t_total = 1;

        if (latency_mode && cnt.latency_cnt)
                latency_avg = (double)cnt.latency_sum / (double)cnt.latency_cnt;

        if (mode == 'P') {

                if (otype & _OTYPE_TAB) {
#define ROW_START()                                                            \
        do {                                                                   \
        } while (0)
#define COL_HDR(NAME)       printf("| %10.10s ", (NAME))
#define COL_PR64(NAME, VAL) printf("| %10" PRIu64 " ", (VAL))
#define COL_PRF(NAME, VAL)  printf("| %10.2f ", (VAL))
#define ROW_END()                                                              \
        do {                                                                   \
                printf("\n");                                                  \
                rows_written++;                                                \
        } while (0)

                        if (print_header) {
                                /* First time, print header */
                                ROW_START();
                                COL_HDR("elapsed");
                                COL_HDR("msgs");
                                COL_HDR("bytes");
                                COL_HDR("rtt");
                                COL_HDR("dr");
                                COL_HDR("dr_m/s");
                                COL_HDR("dr_MB/s");
                                COL_HDR("dr_err");
                                COL_HDR("tx_err");
                                COL_HDR("outq");
                                COL_HDR("offset");
                                if (latency_mode) {
                                        COL_HDR("lat_curr");
                                        COL_HDR("lat_avg");
                                        COL_HDR("lat_lo");
                                        COL_HDR("lat_hi");
                                }

                                ROW_END();
                        }

                        ROW_START();
                        COL_PR64("elapsed", t_total / 1000);
                        COL_PR64("msgs", cnt.msgs);
                        COL_PR64("bytes", cnt.bytes);
                        COL_PR64("rtt", cnt.avg_rtt / 1000);
                        COL_PR64("dr", cnt.msgs_dr_ok);
                        COL_PR64("dr_m/s",
                                 ((cnt.msgs_dr_ok * 1000000) / t_total));
                        COL_PRF("dr_MB/s",
                                (float)((cnt.bytes_dr_ok) / (float)t_total));
                        COL_PR64("dr_err", cnt.msgs_dr_err);
                        COL_PR64("tx_err", cnt.tx_err);
                        COL_PR64("outq",
                                 rk ? (uint64_t)rd_kafka_outq_len(rk) : 0);
                        COL_PR64("offset", (uint64_t)cnt.last_offset);
                        if (latency_mode) {
                                COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
                                COL_PRF("lat_avg", latency_avg / 1000.0f);
                                COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
                                COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
                        }
                        ROW_END();
                }

                if (otype & _OTYPE_SUMMARY) {
                        printf("%% %" PRIu64
                               " messages produced "
                               "(%" PRIu64
                               " bytes), "
                               "%" PRIu64
                               " delivered "
                               "(offset %" PRId64 ", %" PRIu64
                               " failed) "
                               "in %" PRIu64 "ms: %" PRIu64
                               " msgs/s and "
                               "%.02f MB/s, "
                               "%" PRIu64
                               " produce failures, %i in queue, "
                               "%s compression\n",
                               cnt.msgs, cnt.bytes, cnt.msgs_dr_ok,
                               cnt.last_offset, cnt.msgs_dr_err, t_total / 1000,
                               ((cnt.msgs_dr_ok * 1000000) / t_total),
                               (float)((cnt.bytes_dr_ok) / (float)t_total),
                               cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0,
                               compression);
                }

        } else {

                if (otype & _OTYPE_TAB) {
                        if (print_header) {
                                /* First time, print header */
                                ROW_START();
                                COL_HDR("elapsed");
                                COL_HDR("msgs");
                                COL_HDR("bytes");
                                COL_HDR("rtt");
                                COL_HDR("m/s");
                                COL_HDR("MB/s");
                                COL_HDR("rx_err");
                                COL_HDR("offset");
                                if (latency_mode) {
                                        COL_HDR("lat_curr");
                                        COL_HDR("lat_avg");
                                        COL_HDR("lat_lo");
                                        COL_HDR("lat_hi");
                                }
                                ROW_END();
                        }

                        ROW_START();
                        COL_PR64("elapsed", t_total / 1000);
                        COL_PR64("msgs", cnt.msgs);
                        COL_PR64("bytes", cnt.bytes);
                        COL_PR64("rtt", cnt.avg_rtt / 1000);
                        COL_PR64("m/s", ((cnt.msgs * 1000000) / t_total));
                        COL_PRF("MB/s", (float)((cnt.bytes) / (float)t_total));
                        COL_PR64("rx_err", cnt.msgs_dr_err);
                        COL_PR64("offset", cnt.offset);
                        if (latency_mode) {
                                COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
                                COL_PRF("lat_avg", latency_avg / 1000.0f);
                                COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
                                COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
                        }
                        ROW_END();
                }

                if (otype & _OTYPE_SUMMARY) {
                        if (latency_avg >= 1.0f)
                                extra_of += rd_snprintf(
                                    extra + extra_of, sizeof(extra) - extra_of,
                                    ", latency "
                                    "curr/avg/lo/hi "
                                    "%.2f/%.2f/%.2f/%.2fms",
                                    cnt.latency_last / 1000.0f,
                                    latency_avg / 1000.0f,
                                    cnt.latency_lo / 1000.0f,
                                    cnt.latency_hi / 1000.0f);
                        printf("%% %" PRIu64 " messages (%" PRIu64
                               " bytes) "
                               "consumed in %" PRIu64 "ms: %" PRIu64
                               " msgs/s "
                               "(%.02f MB/s)"
                               "%s\n",
                               cnt.msgs, cnt.bytes, t_total / 1000,
                               ((cnt.msgs * 1000000) / t_total),
                               (float)((cnt.bytes) / (float)t_total), extra);
                }

                if (incremental_mode && now > cnt.t_last) {
                        uint64_t i_msgs  = cnt.msgs - cnt.msgs_last;
                        uint64_t i_bytes = cnt.bytes - cnt.bytes_last;
                        uint64_t i_time  = cnt.t_last ? now - cnt.t_last : 0;

                        printf("%% INTERVAL: %" PRIu64
                               " messages "
                               "(%" PRIu64
                               " bytes) "
                               "consumed in %" PRIu64 "ms: %" PRIu64
                               " msgs/s "
                               "(%.02f MB/s)"
                               "%s\n",
                               i_msgs, i_bytes, i_time / 1000,
                               ((i_msgs * 1000000) / i_time),
                               (float)((i_bytes) / (float)i_time), extra);
                }
        }

        cnt.t_last     = now;
        cnt.msgs_last  = cnt.msgs;
        cnt.bytes_last = cnt.bytes;
}


static void sig_usr1(int sig) {
        rd_kafka_dump(stdout, global_rk);
}


/**
 * @brief Read config from file
 * @returns -1 on error, else 0.
 */
static int read_conf_file(rd_kafka_conf_t *conf, const char *path) {
        FILE *fp;
        char buf[512];
        int line = 0;
        char errstr[512];

        if (!(fp = fopen(path, "r"))) {
                fprintf(stderr, "%% Failed to open %s: %s\n", path,
                        strerror(errno));
                return -1;
        }

        while (fgets(buf, sizeof(buf), fp)) {
                char *s = buf;
                char *t;
                rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;

                line++;

                while (isspace((int)*s))
                        s++;

                if (!*s || *s == '#')
                        continue;

                if ((t = strchr(buf, '\n')))
                        *t = '\0';

                t = strchr(buf, '=');
                if (!t || t == s || !*(t + 1)) {
                        fprintf(stderr, "%% %s:%d: expected key=value\n", path,
                                line);
                        fclose(fp);
                        return -1;
                }

                *(t++) = '\0';

                /* Try global config */
                r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));

                if (r == RD_KAFKA_CONF_OK)
                        continue;

                fprintf(stderr, "%% %s:%d: %s=%s: %s\n", path, line, s, t,
                        errstr);
                fclose(fp);
                return -1;
        }

        fclose(fp);

        return 0;
}


static rd_kafka_resp_err_t do_produce(rd_kafka_t *rk,
                                      rd_kafka_topic_t *rkt,
                                      int32_t partition,
                                      int msgflags,
                                      void *payload,
                                      size_t size,
                                      const void *key,
                                      size_t key_size,
                                      const rd_kafka_headers_t *hdrs) {

        /* Send/Produce message. */
        if (hdrs) {
                rd_kafka_headers_t *hdrs_copy;
                rd_kafka_resp_err_t err;

                hdrs_copy = rd_kafka_headers_copy(hdrs);

                err = rd_kafka_producev(
                    rk, RD_KAFKA_V_RKT(rkt), RD_KAFKA_V_PARTITION(partition),
                    RD_KAFKA_V_MSGFLAGS(msgflags),
                    RD_KAFKA_V_VALUE(payload, size),
                    RD_KAFKA_V_KEY(key, key_size),
                    RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_END);

                if (err)
                        rd_kafka_headers_destroy(hdrs_copy);

                return err;

        } else {
                if (rd_kafka_produce(rkt, partition, msgflags, payload, size,
                                     key, key_size, NULL) == -1)
                        return rd_kafka_last_error();
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
 * @brief Sleep for \p sleep_us microseconds.
 */
static void do_sleep(int sleep_us) {
        if (sleep_us > 100) {
#ifdef _WIN32
                Sleep(sleep_us / 1000);
#else
                usleep(sleep_us);
#endif
        } else {
                rd_ts_t next = rd_clock() + (rd_ts_t)sleep_us;
                while (next > rd_clock())
                        ;
        }
}


int main(int argc, char **argv) {
        char *brokers   = NULL;
        char mode       = 'C';
        char *topic     = NULL;
        const char *key = NULL;
        int *partitions = NULL;
        int opt;
        int sendflags     = 0;
        char *msgpattern  = "librdkafka_performance testing!";
        int msgsize       = -1;
        const char *debug = NULL;
        int do_conf_dump  = 0;
        rd_ts_t now;
        char errstr[512];
        uint64_t seq = 0;
        int seed     = (int)time(NULL);
        rd_kafka_t *rk;
        rd_kafka_topic_t *rkt;
        rd_kafka_conf_t *conf;
        rd_kafka_queue_t *rkqu  = NULL;
        const char *compression = "no";
        int64_t start_offset    = 0;
        int batch_size          = 0;
        int idle                = 0;
        const char *stats_cmd   = NULL;
        char *stats_intvlstr    = NULL;
        char tmp[128];
        char *tmp2;
        int otype = _OTYPE_SUMMARY;
        double dtmp;
        int rate_sleep = 0;
        rd_kafka_topic_partition_list_t *topics;
        int exitcode             = 0;
        rd_kafka_headers_t *hdrs = NULL;
        rd_kafka_resp_err_t err;

        /* Kafka configuration */
        conf = rd_kafka_conf_new();
        rd_kafka_conf_set_error_cb(conf, err_cb);
        rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
        rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);

#ifdef SIGIO
        /* Quick termination */
        rd_snprintf(tmp, sizeof(tmp), "%i", SIGIO);
        rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
#endif

        /* Producer config */
        rd_kafka_conf_set(conf, "linger.ms", "1000", NULL, 0);
        rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
        rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);

        /* Consumer config */
        /* Tell rdkafka to (try to) maintain 1M messages
         * in its internal receive buffers. This is to avoid
         * application -> rdkafka -> broker  per-message ping-pong
         * latency.
         * The larger the local queue, the higher the performance.
         * Try other values with: ... -X queued.min.messages=1000
         */
        rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
        rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
        rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);

        topics = rd_kafka_topic_partition_list_new(1);

        while ((opt = getopt(argc, argv,
                             "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
                             "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
                switch (opt) {
                case 'G':
                        if (rd_kafka_conf_set(conf, "group.id", optarg, errstr,
                                              sizeof(errstr)) !=
                            RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                        /* FALLTHRU */
                case 'P':
                case 'C':
                        mode = opt;
                        break;
                case 't':
                        rd_kafka_topic_partition_list_add(
                            topics, optarg, RD_KAFKA_PARTITION_UA);
                        break;
                case 'p':
                        partition_cnt++;
                        partitions = realloc(partitions, sizeof(*partitions) *
                                                             partition_cnt);
                        partitions[partition_cnt - 1] = atoi(optarg);
                        break;

                case 'b':
                        brokers = optarg;
                        break;
                case 's':
                        msgsize = atoi(optarg);
                        break;
                case 'k':
                        key = optarg;
                        break;
                case 'c':
                        msgcnt = atoi(optarg);
                        break;
                case 'D':
                        sendflags |= RD_KAFKA_MSG_F_FREE;
                        break;
                case 'i':
                        dispintvl = atoi(optarg);
                        break;
                case 'm':
                        msgpattern = optarg;
                        break;
                case 'S':
                        seq    = strtoull(optarg, NULL, 10);
                        do_seq = 1;
                        break;
                case 'x':
                        exit_after = atoi(optarg);
                        break;
                case 'R':
                        seed = atoi(optarg);
                        break;
                case 'a':
                        if (rd_kafka_conf_set(conf, "acks", optarg, errstr,
                                              sizeof(errstr)) !=
                            RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                        break;
                case 'B':
                        batch_size = atoi(optarg);
                        break;
                case 'z':
                        if (rd_kafka_conf_set(conf, "compression.codec", optarg,
                                              errstr, sizeof(errstr)) !=
                            RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                        compression = optarg;
                        break;
                case 'o':
                        if (!strcmp(optarg, "end"))
                                start_offset = RD_KAFKA_OFFSET_END;
                        else if (!strcmp(optarg, "beginning"))
                                start_offset = RD_KAFKA_OFFSET_BEGINNING;
                        else if (!strcmp(optarg, "stored"))
                                start_offset = RD_KAFKA_OFFSET_STORED;
                        else {
                                start_offset = strtoll(optarg, NULL, 10);

                                if (start_offset < 0)
                                        start_offset =
                                            RD_KAFKA_OFFSET_TAIL(-start_offset);
                        }

                        break;
                case 'e':
                        exit_eof = 1;
                        break;
                case 'd':
                        debug = optarg;
                        break;
                case 'H':
                        if (!strcmp(optarg, "parse"))
                                read_hdrs = 1;
                        else {
                                char *name, *val;
                                size_t name_sz = -1;

                                name = optarg;
                                val  = strchr(name, '=');
                                if (val) {
                                        name_sz = (size_t)(val - name);
                                        val++; /* past the '=' */
                                }

                                if (!hdrs)
                                        hdrs = rd_kafka_headers_new(8);

                                err = rd_kafka_header_add(hdrs, name, name_sz,
                                                          val, -1);
                                if (err) {
                                        fprintf(
                                            stderr,
                                            "%% Failed to add header %s: %s\n",
                                            name, rd_kafka_err2str(err));
                                        exit(1);
                                }
                        }
                        break;
                case 'X': {
                        char *name, *val;
                        rd_kafka_conf_res_t res;

                        if (!strcmp(optarg, "list") ||
                            !strcmp(optarg, "help")) {
                                rd_kafka_conf_properties_show(stdout);
                                exit(0);
                        }

                        if (!strcmp(optarg, "dump")) {
                                do_conf_dump = 1;
                                continue;
                        }

                        name = optarg;
                        if (!(val = strchr(name, '='))) {
                                fprintf(stderr,
                                        "%% Expected "
                                        "-X property=value, not %s\n",
                                        name);
                                exit(1);
                        }

                        *val = '\0';
                        val++;

                        if (!strcmp(name, "file")) {
                                if (read_conf_file(conf, val) == -1)
                                        exit(1);
                                break;
                        }

                        res = rd_kafka_conf_set(conf, name, val, errstr,
                                                sizeof(errstr));

                        if (res != RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                } break;

                case 'T':
                        stats_intvlstr = optarg;
                        break;
                case 'Y':
                        stats_cmd = optarg;
                        break;

                case 'q':
                        verbosity--;
                        break;

                case 'v':
                        verbosity++;
                        break;

                case 'I':
                        idle = 1;
                        break;

                case 'u':
                        otype = _OTYPE_TAB;
                        verbosity--; /* remove some fluff */
                        break;

                case 'r':
                        dtmp = strtod(optarg, &tmp2);
                        if (tmp2 == optarg ||
                            (dtmp >= -0.001 && dtmp <= 0.001)) {
                                fprintf(stderr, "%% Invalid rate: %s\n",
                                        optarg);
                                exit(1);
                        }

                        rate_sleep = (int)(1000000.0 / dtmp);
                        break;

                case 'l':
                        latency_mode = 1;
                        break;

                case 'A':
                        if (!(latency_fp = fopen(optarg, "w"))) {
                                fprintf(stderr, "%% Cant open %s: %s\n", optarg,
                                        strerror(errno));
                                exit(1);
                        }
                        break;

                case 'M':
                        incremental_mode = 1;
                        break;

                case 'N':
                        with_dr = 0;
                        break;

                default:
                        fprintf(stderr, "Unknown option: %c\n", opt);
                        goto usage;
                }
        }

        if (topics->cnt == 0 || optind != argc) {
                if (optind < argc)
                        fprintf(stderr, "Unknown argument: %s\n", argv[optind]);
        usage:
                fprintf(
                    stderr,
                    "Usage: %s [-C|-P] -t <topic> "
                    "[-p <partition>] [-b <broker,broker..>] [options..]\n"
                    "\n"
                    "librdkafka version %s (0x%08x)\n"
                    "\n"
                    " Options:\n"
                    "  -C | -P |    Consumer or Producer mode\n"
                    "  -G <groupid> High-level Kafka Consumer mode\n"
                    "  -t <topic>   Topic to consume / produce\n"
                    "  -p <num>     Partition (defaults to random). "
                    "Multiple partitions are allowed in -C consumer mode.\n"
                    "  -M           Print consumer interval stats\n"
                    "  -b <brokers> Broker address list (host[:port],..)\n"
                    "  -s <size>    Message size (producer)\n"
                    "  -k <key>     Message key (producer)\n"
                    "  -H <name[=value]> Add header to message (producer)\n"
                    "  -H parse     Read message headers (consumer)\n"
                    "  -c <cnt>     Messages to transmit/receive\n"
                    "  -x <cnt>     Hard exit after transmitting <cnt> "
                    "messages (producer)\n"
                    "  -D           Copy/Duplicate data buffer (producer)\n"
                    "  -i <ms>      Display interval\n"
                    "  -m <msg>     Message payload pattern\n"
                    "  -S <start>   Send a sequence number starting at "
                    "<start> as payload\n"
                    "  -R <seed>    Random seed value (defaults to time)\n"
                    "  -a <acks>    Required acks (producer): "
                    "-1, 0, 1, >1\n"
                    "  -B <size>    Consume batch size (# of msgs)\n"
                    "  -z <codec>   Enable compression:\n"
                    "               none|gzip|snappy\n"
                    "  -o <offset>  Start offset (consumer)\n"
                    "               beginning, end, NNNNN or -NNNNN\n"
                    "  -d [facs..]  Enable debugging contexts:\n"
                    "               %s\n"
                    "  -X <prop=name> Set arbitrary librdkafka "
                    "configuration property\n"
                    "  -X file=<path> Read config from file.\n"
                    "  -X list      Show full list of supported properties.\n"
                    "  -X dump      Show configuration\n"
                    "  -T <intvl>   Enable statistics from librdkafka at "
                    "specified interval (ms)\n"
                    "  -Y <command> Pipe statistics to <command>\n"
                    "  -I           Idle: dont produce any messages\n"
                    "  -q           Decrease verbosity\n"
                    "  -v           Increase verbosity (default 1)\n"
                    "  -u           Output stats in table format\n"
                    "  -r <rate>    Producer msg/s limit\n"
                    "  -l           Latency measurement.\n"
                    "               Needs two matching instances, one\n"
                    "               consumer and one producer, both\n"
                    "               running with the -l switch.\n"
                    "  -l           Producer: per-message latency stats\n"
                    "  -A <file>    Write per-message latency stats to "
                    "<file>. Requires -l\n"
                    "  -O           Report produced offset (producer)\n"
                    "  -N           No delivery reports (producer)\n"
                    "\n"
                    " In Consumer mode:\n"
                    "  consumes messages and prints thruput\n"
                    "  If -B <..> is supplied the batch consumer\n"
                    "  mode is used, else the callback mode is used.\n"
                    "\n"
                    " In Producer mode:\n"
                    "  writes messages of size -s <..> and prints thruput\n"
                    "\n",
                    argv[0], rd_kafka_version_str(), rd_kafka_version(),
                    RD_KAFKA_DEBUG_CONTEXTS);
                exit(1);
        }


        dispintvl *= 1000; /* us */

        if (verbosity > 1)
                printf("%% Using random seed %i, verbosity level %i\n", seed,
                       verbosity);
        srand(seed);
        signal(SIGINT, stop);
#ifdef SIGUSR1
        signal(SIGUSR1, sig_usr1);
#endif


        if (debug && rd_kafka_conf_set(conf, "debug", debug, errstr,
                                       sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                printf("%% Debug configuration failed: %s: %s\n", errstr,
                       debug);
                exit(1);
        }

        /* Always enable stats (for RTT extraction), and if user supplied
         * the -T <intvl> option we let her take part of the stats aswell. */
        rd_kafka_conf_set_stats_cb(conf, stats_cb);

        if (!stats_intvlstr) {
                /* if no user-desired stats, adjust stats interval
                 * to the display interval. */
                rd_snprintf(tmp, sizeof(tmp), "%" PRId64, dispintvl / 1000);
        }

        if (rd_kafka_conf_set(conf, "statistics.interval.ms",
                              stats_intvlstr ? stats_intvlstr : tmp, errstr,
                              sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% %s\n", errstr);
                exit(1);
        }

        if (do_conf_dump) {
                const char **arr;
                size_t cnt;
                int pass;

                for (pass = 0; pass < 2; pass++) {
                        int i;

                        if (pass == 0) {
                                arr = rd_kafka_conf_dump(conf, &cnt);
                                printf("# Global config\n");
                        } else {
                                rd_kafka_topic_conf_t *topic_conf =
                                    rd_kafka_conf_get_default_topic_conf(conf);

                                if (topic_conf) {
                                        printf("# Topic config\n");
                                        arr = rd_kafka_topic_conf_dump(
                                            topic_conf, &cnt);
                                } else {
                                        arr = NULL;
                                }
                        }

                        if (!arr)
                                continue;

                        for (i = 0; i < (int)cnt; i += 2)
                                printf("%s = %s\n", arr[i], arr[i + 1]);

                        printf("\n");

                        rd_kafka_conf_dump_free(arr, cnt);
                }

                exit(0);
        }

        if (latency_mode)
                do_seq = 0;

        if (stats_intvlstr) {
                /* User enabled stats (-T) */

#ifndef _WIN32
                if (stats_cmd) {
                        if (!(stats_fp = popen(stats_cmd,
#ifdef __linux__
                                               "we"
#else
                                               "w"
#endif
                                               ))) {
                                fprintf(stderr,
                                        "%% Failed to start stats command: "
                                        "%s: %s",
                                        stats_cmd, strerror(errno));
                                exit(1);
                        }
                } else
#endif
                        stats_fp = stdout;
        }

        if (msgcnt != -1)
                forever = 0;

        if (msgsize == -1)
                msgsize = (int)strlen(msgpattern);

        topic = topics->elems[0].topic;

        if (mode == 'C' || mode == 'G')
                rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,
                                  0);

        if (read_hdrs && mode == 'P') {
                fprintf(stderr, "%% producer can not read headers\n");
                exit(1);
        }

        if (hdrs && mode != 'P') {
                fprintf(stderr, "%% consumer can not add headers\n");
                exit(1);
        }

        /* Set bootstrap servers */
        if (brokers &&
            rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
                              sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% %s\n", errstr);
                exit(1);
        }

        if (mode == 'P') {
                /*
                 * Producer
                 */
                char *sbuf;
                char *pbuf;
                int outq;
                int keylen  = key ? (int)strlen(key) : 0;
                off_t rof   = 0;
                size_t plen = strlen(msgpattern);
                int partition =
                    partitions ? partitions[0] : RD_KAFKA_PARTITION_UA;

                if (latency_mode) {
                        int minlen = (int)(strlen("LATENCY:") +
                                           strlen("18446744073709551615 ") + 1);
                        msgsize    = RD_MAX(minlen, msgsize);
                        sendflags |= RD_KAFKA_MSG_F_COPY;
                } else if (do_seq) {
                        int minlen = (int)strlen("18446744073709551615 ") + 1;
                        if (msgsize < minlen)
                                msgsize = minlen;

                        /* Force duplication of payload */
                        sendflags |= RD_KAFKA_MSG_F_FREE;
                }

                sbuf = malloc(msgsize);

                /* Copy payload content to new buffer */
                while (rof < msgsize) {
                        size_t xlen = RD_MIN((size_t)msgsize - rof, plen);
                        memcpy(sbuf + rof, msgpattern, xlen);
                        rof += (off_t)xlen;
                }

                if (msgcnt == -1)
                        printf("%% Sending messages of size %i bytes\n",
                               msgsize);
                else
                        printf("%% Sending %i messages of size %i bytes\n",
                               msgcnt, msgsize);

                if (with_dr)
                        rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create Kafka producer: %s\n",
                                errstr);
                        exit(1);
                }

                global_rk = rk;

                /* Explicitly create topic to avoid per-msg lookups. */
                rkt = rd_kafka_topic_new(rk, topic, NULL);


                if (rate_sleep && verbosity >= 2)
                        fprintf(stderr,
                                "%% Inter message rate limiter sleep %ius\n",
                                rate_sleep);

                dr_disp_div = msgcnt / 50;
                if (dr_disp_div == 0)
                        dr_disp_div = 10;

                cnt.t_start = cnt.t_last = rd_clock();

                msgs_wait_produce_cnt = msgcnt;

                while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) {
                        /* Send/Produce message. */

                        if (idle) {
                                rd_kafka_poll(rk, 1000);
                                continue;
                        }

                        if (latency_mode) {
                                rd_snprintf(sbuf, msgsize - 1,
                                            "LATENCY:%" PRIu64, wall_clock());
                        } else if (do_seq) {
                                rd_snprintf(sbuf, msgsize - 1, "%" PRIu64 ": ",
                                            seq);
                                seq++;
                        }

                        if (sendflags & RD_KAFKA_MSG_F_FREE) {
                                /* Duplicate memory */
                                pbuf = malloc(msgsize);
                                memcpy(pbuf, sbuf, msgsize);
                        } else
                                pbuf = sbuf;

                        if (msgsize == 0)
                                pbuf = NULL;

                        cnt.tx++;
                        while (run && (err = do_produce(
                                           rk, rkt, partition, sendflags, pbuf,
                                           msgsize, key, keylen, hdrs))) {
                                if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
                                        printf(
                                            "%% No such partition: "
                                            "%" PRId32 "\n",
                                            partition);
                                else if (verbosity >= 3 ||
                                         (err !=
                                              RD_KAFKA_RESP_ERR__QUEUE_FULL &&
                                          verbosity >= 1))
                                        printf(
                                            "%% produce error: %s%s\n",
                                            rd_kafka_err2str(err),
                                            err == RD_KAFKA_RESP_ERR__QUEUE_FULL
                                                ? " (backpressure)"
                                                : "");

                                cnt.tx_err++;
                                if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                                        run = 0;
                                        break;
                                }
                                now = rd_clock();
                                if (verbosity >= 2 &&
                                    cnt.t_enobufs_last + dispintvl <= now) {
                                        printf(
                                            "%% Backpressure %i "
                                            "(tx %" PRIu64
                                            ", "
                                            "txerr %" PRIu64 ")\n",
                                            rd_kafka_outq_len(rk), cnt.tx,
                                            cnt.tx_err);
                                        cnt.t_enobufs_last = now;
                                }

                                /* Poll to handle delivery reports */
                                rd_kafka_poll(rk, 10);

                                print_stats(rk, mode, otype, compression);
                        }

                        msgs_wait_cnt++;
                        if (msgs_wait_produce_cnt != -1)
                                msgs_wait_produce_cnt--;
                        cnt.msgs++;
                        cnt.bytes += msgsize;

                        /* Must poll to handle delivery reports */
                        if (rate_sleep) {
                                rd_ts_t next = rd_clock() + (rd_ts_t)rate_sleep;
                                do {
                                        rd_kafka_poll(
                                            rk,
                                            (int)RD_MAX(0, (next - rd_clock()) /
                                                               1000));
                                } while (next > rd_clock());
                        } else if (cnt.msgs % 1000 == 0) {
                                rd_kafka_poll(rk, 0);
                        }

                        print_stats(rk, mode, otype, compression);
                }

                forever = 0;
                if (verbosity >= 2)
                        printf(
                            "%% All messages produced, "
                            "now waiting for %li deliveries\n",
                            msgs_wait_cnt);

                /* Wait for messages to be delivered */
                while (run && rd_kafka_poll(rk, 1000) != -1)
                        print_stats(rk, mode, otype, compression);


                outq = rd_kafka_outq_len(rk);
                if (verbosity >= 2)
                        printf("%% %i messages in outq\n", outq);
                cnt.msgs -= outq;
                cnt.t_end = t_end;

                if (cnt.tx_err > 0)
                        printf("%% %" PRIu64 " backpressures for %" PRIu64
                               " produce calls: %.3f%% backpressure rate\n",
                               cnt.tx_err, cnt.tx,
                               ((double)cnt.tx_err / (double)cnt.tx) * 100.0);

                /* Destroy topic */
                rd_kafka_topic_destroy(rkt);

                /* Destroy the handle */
                rd_kafka_destroy(rk);
                global_rk = rk = NULL;

                free(sbuf);

                exitcode = cnt.msgs == cnt.msgs_dr_ok ? 0 : 1;

        } else if (mode == 'C') {
                /*
                 * Consumer
                 */

                rd_kafka_message_t **rkmessages = NULL;
                size_t i                        = 0;

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create Kafka consumer: %s\n",
                                errstr);
                        exit(1);
                }

                global_rk = rk;

                /* Create topic to consume from */
                rkt = rd_kafka_topic_new(rk, topic, NULL);

                /* Batch consumer */
                if (batch_size)
                        rkmessages = malloc(sizeof(*rkmessages) * batch_size);

                /* Start consuming */
                rkqu = rd_kafka_queue_new(rk);
                for (i = 0; i < (size_t)partition_cnt; ++i) {
                        const int r = rd_kafka_consume_start_queue(
                            rkt, partitions[i], start_offset, rkqu);

                        if (r == -1) {
                                fprintf(
                                    stderr, "%% Error creating queue: %s\n",
                                    rd_kafka_err2str(rd_kafka_last_error()));
                                exit(1);
                        }
                }

                while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
                        /* Consume messages.
                         * A message may either be a real message, or
                         * an error signaling (if rkmessage->err is set).
                         */
                        uint64_t fetch_latency;
                        ssize_t r;

                        fetch_latency = rd_clock();

                        if (batch_size) {
                                int partition = partitions
                                                    ? partitions[0]
                                                    : RD_KAFKA_PARTITION_UA;

                                /* Batch fetch mode */
                                r = rd_kafka_consume_batch(rkt, partition, 1000,
                                                           rkmessages,
                                                           batch_size);
                                if (r != -1) {
                                        for (i = 0; (ssize_t)i < r; i++) {
                                                msg_consume(rkmessages[i],
                                                            NULL);
                                                rd_kafka_message_destroy(
                                                    rkmessages[i]);
                                        }
                                }
                        } else {
                                /* Queue mode */
                                r = rd_kafka_consume_callback_queue(
                                    rkqu, 1000, msg_consume, NULL);
                        }

                        cnt.t_fetch_latency += rd_clock() - fetch_latency;
                        if (r == -1)
                                fprintf(
                                    stderr, "%% Error: %s\n",
                                    rd_kafka_err2str(rd_kafka_last_error()));
                        else if (r > 0 && rate_sleep) {
                                /* Simulate processing time
                                 * if `-r <rate>` was set. */
                                do_sleep(rate_sleep);
                        }


                        print_stats(rk, mode, otype, compression);

                        /* Poll to handle stats callbacks */
                        rd_kafka_poll(rk, 0);
                }
                cnt.t_end = rd_clock();

                /* Stop consuming */
                for (i = 0; i < (size_t)partition_cnt; ++i) {
                        int r = rd_kafka_consume_stop(rkt, (int32_t)i);
                        if (r == -1) {
                                fprintf(
                                    stderr, "%% Error in consume_stop: %s\n",
                                    rd_kafka_err2str(rd_kafka_last_error()));
                        }
                }
                rd_kafka_queue_destroy(rkqu);

                /* Destroy topic */
                rd_kafka_topic_destroy(rkt);

                if (batch_size)
                        free(rkmessages);

                /* Destroy the handle */
                rd_kafka_destroy(rk);

                global_rk = rk = NULL;

        } else if (mode == 'G') {
                /*
                 * High-level balanced Consumer
                 */
                rd_kafka_message_t **rkmessages = NULL;

                rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create Kafka consumer: %s\n",
                                errstr);
                        exit(1);
                }

                /* Forward all events to consumer queue */
                rd_kafka_poll_set_consumer(rk);

                global_rk = rk;

                err = rd_kafka_subscribe(rk, topics);
                if (err) {
                        fprintf(stderr, "%% Subscribe failed: %s\n",
                                rd_kafka_err2str(err));
                        exit(1);
                }
                fprintf(stderr, "%% Waiting for group rebalance..\n");

                if (batch_size) {
                        rkmessages = malloc(sizeof(*rkmessages) * batch_size);
                } else {
                        rkmessages = malloc(sizeof(*rkmessages));
                }

                rkqu = rd_kafka_queue_get_consumer(rk);

                while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
                        /* Consume messages.
                         * A message may either be a real message, or
                         * an event (if rkmessage->err is set).
                         */
                        uint64_t fetch_latency;
                        ssize_t r;

                        fetch_latency = rd_clock();

                        if (batch_size) {
                                /* Batch fetch mode */
                                ssize_t i = 0;
                                r         = rd_kafka_consume_batch_queue(
                                    rkqu, 1000, rkmessages, batch_size);
                                if (r != -1) {
                                        for (i = 0; i < r; i++) {
                                                msg_consume(rkmessages[i],
                                                            NULL);
                                                rd_kafka_message_destroy(
                                                    rkmessages[i]);
                                        }
                                }

                                if (r == -1)
                                        fprintf(stderr, "%% Error: %s\n",
                                                rd_kafka_err2str(
                                                    rd_kafka_last_error()));
                                else if (r > 0 && rate_sleep) {
                                        /* Simulate processing time
                                         * if `-r <rate>` was set. */
                                        do_sleep(rate_sleep);
                                }

                        } else {
                                rkmessages[0] =
                                    rd_kafka_consumer_poll(rk, 1000);
                                if (rkmessages[0]) {
                                        msg_consume(rkmessages[0], NULL);
                                        rd_kafka_message_destroy(rkmessages[0]);

                                        /* Simulate processing time
                                         * if `-r <rate>` was set. */
                                        if (rate_sleep)
                                                do_sleep(rate_sleep);
                                }
                        }

                        cnt.t_fetch_latency += rd_clock() - fetch_latency;

                        print_stats(rk, mode, otype, compression);
                }
                cnt.t_end = rd_clock();

                err = rd_kafka_consumer_close(rk);
                if (err)
                        fprintf(stderr, "%% Failed to close consumer: %s\n",
                                rd_kafka_err2str(err));

                free(rkmessages);
                rd_kafka_queue_destroy(rkqu);
                rd_kafka_destroy(rk);
        }

        if (hdrs)
                rd_kafka_headers_destroy(hdrs);

        print_stats(NULL, mode, otype | _OTYPE_FORCE, compression);

        if (cnt.t_fetch_latency && cnt.msgs)
                printf("%% Average application fetch latency: %" PRIu64 "us\n",
                       cnt.t_fetch_latency / cnt.msgs);

        if (latency_fp)
                fclose(latency_fp);

        if (stats_fp) {
#ifndef _WIN32
                pclose(stats_fp);
#endif
                stats_fp = NULL;
        }

        if (partitions)
                free(partitions);

        rd_kafka_topic_partition_list_destroy(topics);

        /* Let background threads clean up and terminate cleanly. */
        rd_kafka_wait_destroyed(2000);

        return exitcode;
}
