/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Apache Kafka consumer & producer example programs
 * using the Kafka driver from librdkafka
 * (https://github.com/confluentinc/librdkafka)
 */

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */


static volatile sig_atomic_t run = 1;
static rd_kafka_t *rk;
static int exit_eof = 0;
static int quiet    = 0;
static enum {
        OUTPUT_HEXDUMP,
        OUTPUT_RAW,
} output = OUTPUT_HEXDUMP;

static void stop(int sig) {
        run = 0;
        fclose(stdin); /* abort fgets() */
}


static void hexdump(FILE *fp, const char *name, const void *ptr, size_t len) {
        const char *p = (const char *)ptr;
        size_t of     = 0;


        if (name)
                fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);

        for (of = 0; of < len; of += 16) {
                char hexen[16 * 3 + 1];
                char charen[16 + 1];
                int hof = 0;

                int cof = 0;
                int i;

                for (i = of; i < (int)of + 16 && i < (int)len; i++) {
                        hof += sprintf(hexen + hof, "%02x ", p[i] & 0xff);
                        cof += sprintf(charen + cof, "%c",
                                       isprint((int)p[i]) ? p[i] : '.');
                }
                fprintf(fp, "%08zx: %-48s %-16s\n", of, hexen, charen);
        }
}

/**
 * Kafka logger callback (optional)
 */
static void
logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
                (int)(tv.tv_usec / 1000), level, fac,
                rk ? rd_kafka_name(rk) : NULL, buf);
}


/**
 * Message delivery report callback using the richer rd_kafka_message_t object.
 */
static void msg_delivered(rd_kafka_t *rk,
                          const rd_kafka_message_t *rkmessage,
                          void *opaque) {
        if (rkmessage->err)
                fprintf(stderr,
                        "%% Message delivery failed (broker %" PRId32 "): %s\n",
                        rd_kafka_message_broker_id(rkmessage),
                        rd_kafka_err2str(rkmessage->err));
        else if (!quiet)
                fprintf(stderr,
                        "%% Message delivered (%zd bytes, offset %" PRId64
                        ", "
                        "partition %" PRId32 ", broker %" PRId32 "): %.*s\n",
                        rkmessage->len, rkmessage->offset, rkmessage->partition,
                        rd_kafka_message_broker_id(rkmessage),
                        (int)rkmessage->len, (const char *)rkmessage->payload);
}


static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
        if (rkmessage->err) {
                if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                        fprintf(stderr,
                                "%% Consumer reached end of %s [%" PRId32
                                "] "
                                "message queue at offset %" PRId64 "\n",
                                rd_kafka_topic_name(rkmessage->rkt),
                                rkmessage->partition, rkmessage->offset);

                        if (exit_eof)
                                run = 0;

                        return;
                }

                fprintf(stderr,
                        "%% Consume error for topic \"%s\" [%" PRId32
                        "] "
                        "offset %" PRId64 ": %s\n",
                        rd_kafka_topic_name(rkmessage->rkt),
                        rkmessage->partition, rkmessage->offset,
                        rd_kafka_message_errstr(rkmessage));

                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
                        run = 0;
                return;
        }

        if (!quiet) {
                rd_kafka_timestamp_type_t tstype;
                int64_t timestamp;
                rd_kafka_headers_t *hdrs;

                fprintf(stdout,
                        "%% Message (offset %" PRId64
                        ", %zd bytes, "
                        "broker %" PRId32 "):\n",
                        rkmessage->offset, rkmessage->len,
                        rd_kafka_message_broker_id(rkmessage));

                timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
                if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
                        const char *tsname = "?";
                        if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
                                tsname = "create time";
                        else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
                                tsname = "log append time";

                        fprintf(stdout,
                                "%% Message timestamp: %s %" PRId64
                                " (%ds ago)\n",
                                tsname, timestamp,
                                !timestamp ? 0
                                           : (int)time(NULL) -
                                                 (int)(timestamp / 1000));
                }

                if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
                        size_t idx = 0;
                        const char *name;
                        const void *val;
                        size_t size;

                        fprintf(stdout, "%% Headers:");

                        while (!rd_kafka_header_get_all(hdrs, idx++, &name,
                                                        &val, &size)) {
                                fprintf(stdout, "%s%s=", idx == 1 ? " " : ", ",
                                        name);
                                if (val)
                                        fprintf(stdout, "\"%.*s\"", (int)size,
                                                (const char *)val);
                                else
                                        fprintf(stdout, "NULL");
                        }
                        fprintf(stdout, "\n");
                }
        }

        if (rkmessage->key_len) {
                if (output == OUTPUT_HEXDUMP)
                        hexdump(stdout, "Message Key", rkmessage->key,
                                rkmessage->key_len);
                else
                        printf("Key: %.*s\n", (int)rkmessage->key_len,
                               (char *)rkmessage->key);
        }

        if (output == OUTPUT_HEXDUMP)
                hexdump(stdout, "Message Payload", rkmessage->payload,
                        rkmessage->len);
        else
                printf("%.*s\n", (int)rkmessage->len,
                       (char *)rkmessage->payload);
}


static void metadata_print(const char *topic,
                           const struct rd_kafka_metadata *metadata) {
        int i, j, k;
        int32_t controllerid;

        printf("Metadata for %s (from broker %" PRId32 ": %s):\n",
               topic ?: "all topics", metadata->orig_broker_id,
               metadata->orig_broker_name);

        controllerid = rd_kafka_controllerid(rk, 0);


        /* Iterate brokers */
        printf(" %i brokers:\n", metadata->broker_cnt);
        for (i = 0; i < metadata->broker_cnt; i++)
                printf("  broker %" PRId32 " at %s:%i%s\n",
                       metadata->brokers[i].id, metadata->brokers[i].host,
                       metadata->brokers[i].port,
                       controllerid == metadata->brokers[i].id ? " (controller)"
                                                               : "");

        /* Iterate topics */
        printf(" %i topics:\n", metadata->topic_cnt);
        for (i = 0; i < metadata->topic_cnt; i++) {
                const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
                printf("  topic \"%s\" with %i partitions:", t->topic,
                       t->partition_cnt);
                if (t->err) {
                        printf(" %s", rd_kafka_err2str(t->err));
                        if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
                                printf(" (try again)");
                }
                printf("\n");

                /* Iterate topic's partitions */
                for (j = 0; j < t->partition_cnt; j++) {
                        const struct rd_kafka_metadata_partition *p;
                        p = &t->partitions[j];
                        printf("    partition %" PRId32
                               ", "
                               "leader %" PRId32 ", replicas: ",
                               p->id, p->leader);

                        /* Iterate partition's replicas */
                        for (k = 0; k < p->replica_cnt; k++)
                                printf("%s%" PRId32, k > 0 ? "," : "",
                                       p->replicas[k]);

                        /* Iterate partition's ISRs */
                        printf(", isrs: ");
                        for (k = 0; k < p->isr_cnt; k++)
                                printf("%s%" PRId32, k > 0 ? "," : "",
                                       p->isrs[k]);
                        if (p->err)
                                printf(", %s\n", rd_kafka_err2str(p->err));
                        else
                                printf("\n");
                }
        }
}


static void sig_usr1(int sig) {
        rd_kafka_dump(stdout, rk);
}

int main(int argc, char **argv) {
        rd_kafka_topic_t *rkt;
        char *brokers = "localhost:9092";
        char mode     = 'C';
        char *topic   = NULL;
        int partition = RD_KAFKA_PARTITION_UA;
        int opt;
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *topic_conf;
        char errstr[512];
        int64_t start_offset = 0;
        int do_conf_dump     = 0;
        char tmp[16];
        int64_t seek_offset      = 0;
        int64_t tmp_offset       = 0;
        int get_wmarks           = 0;
        rd_kafka_headers_t *hdrs = NULL;
        rd_kafka_resp_err_t err;

        /* Kafka configuration */
        conf = rd_kafka_conf_new();

        /* Set logger */
        rd_kafka_conf_set_log_cb(conf, logger);

        /* Quick termination */
        snprintf(tmp, sizeof(tmp), "%i", SIGIO);
        rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

        /* Topic configuration */
        topic_conf = rd_kafka_topic_conf_new();

        while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
                switch (opt) {
                case 'P':
                case 'C':
                case 'L':
                        mode = opt;
                        break;
                case 't':
                        topic = optarg;
                        break;
                case 'p':
                        partition = atoi(optarg);
                        break;
                case 'b':
                        brokers = optarg;
                        break;
                case 'z':
                        if (rd_kafka_conf_set(conf, "compression.codec", optarg,
                                              errstr, sizeof(errstr)) !=
                            RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                        break;
                case 'o':
                case 's':
                        if (!strcmp(optarg, "end"))
                                tmp_offset = RD_KAFKA_OFFSET_END;
                        else if (!strcmp(optarg, "beginning"))
                                tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
                        else if (!strcmp(optarg, "stored"))
                                tmp_offset = RD_KAFKA_OFFSET_STORED;
                        else if (!strcmp(optarg, "wmark"))
                                get_wmarks = 1;
                        else {
                                tmp_offset = strtoll(optarg, NULL, 10);

                                if (tmp_offset < 0)
                                        tmp_offset =
                                            RD_KAFKA_OFFSET_TAIL(-tmp_offset);
                        }

                        if (opt == 'o')
                                start_offset = tmp_offset;
                        else if (opt == 's')
                                seek_offset = tmp_offset;
                        break;
                case 'e':
                        exit_eof = 1;
                        break;
                case 'd':
                        if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
                                              sizeof(errstr)) !=
                            RD_KAFKA_CONF_OK) {
                                fprintf(stderr,
                                        "%% Debug configuration failed: "
                                        "%s: %s\n",
                                        errstr, optarg);
                                exit(1);
                        }
                        break;
                case 'q':
                        quiet = 1;
                        break;
                case 'A':
                        output = OUTPUT_RAW;
                        break;
                case 'H': {
                        char *name, *val;
                        size_t name_sz = -1;

                        name = optarg;
                        val  = strchr(name, '=');
                        if (val) {
                                name_sz = (size_t)(val - name);
                                val++; /* past the '=' */
                        }

                        if (!hdrs)
                                hdrs = rd_kafka_headers_new(8);

                        err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
                        if (err) {
                                fprintf(stderr,
                                        "%% Failed to add header %s: %s\n",
                                        name, rd_kafka_err2str(err));
                                exit(1);
                        }
                } break;

                case 'X': {
                        char *name, *val;
                        rd_kafka_conf_res_t res;

                        if (!strcmp(optarg, "list") ||
                            !strcmp(optarg, "help")) {
                                rd_kafka_conf_properties_show(stdout);
                                exit(0);
                        }

                        if (!strcmp(optarg, "dump")) {
                                do_conf_dump = 1;
                                continue;
                        }

                        name = optarg;
                        if (!(val = strchr(name, '='))) {
                                char dest[512];
                                size_t dest_size = sizeof(dest);
                                /* Return current value for property. */

                                res = RD_KAFKA_CONF_UNKNOWN;
                                if (!strncmp(name, "topic.", strlen("topic.")))
                                        res = rd_kafka_topic_conf_get(
                                            topic_conf, name + strlen("topic."),
                                            dest, &dest_size);
                                if (res == RD_KAFKA_CONF_UNKNOWN)
                                        res = rd_kafka_conf_get(
                                            conf, name, dest, &dest_size);

                                if (res == RD_KAFKA_CONF_OK) {
                                        printf("%s = %s\n", name, dest);
                                        exit(0);
                                } else {
                                        fprintf(stderr, "%% %s property\n",
                                                res == RD_KAFKA_CONF_UNKNOWN
                                                    ? "Unknown"
                                                    : "Invalid");
                                        exit(1);
                                }
                        }

                        *val = '\0';
                        val++;

                        res = RD_KAFKA_CONF_UNKNOWN;
                        /* Try "topic." prefixed properties on topic
                         * conf first, and then fall through to global if
                         * it didnt match a topic configuration property. */
                        if (!strncmp(name, "topic.", strlen("topic.")))
                                res = rd_kafka_topic_conf_set(
                                    topic_conf, name + strlen("topic."), val,
                                    errstr, sizeof(errstr));

                        if (res == RD_KAFKA_CONF_UNKNOWN)
                                res = rd_kafka_conf_set(conf, name, val, errstr,
                                                        sizeof(errstr));

                        if (res != RD_KAFKA_CONF_OK) {
                                fprintf(stderr, "%% %s\n", errstr);
                                exit(1);
                        }
                } break;

                default:
                        goto usage;
                }
        }


        if (do_conf_dump) {
                const char **arr;
                size_t cnt;
                int pass;

                for (pass = 0; pass < 2; pass++) {
                        int i;

                        if (pass == 0) {
                                arr = rd_kafka_conf_dump(conf, &cnt);
                                printf("# Global config\n");
                        } else {
                                printf("# Topic config\n");
                                arr =
                                    rd_kafka_topic_conf_dump(topic_conf, &cnt);
                        }

                        for (i = 0; i < (int)cnt; i += 2)
                                printf("%s = %s\n", arr[i], arr[i + 1]);

                        printf("\n");

                        rd_kafka_conf_dump_free(arr, cnt);
                }

                exit(0);
        }


        if (optind != argc || (mode != 'L' && !topic)) {
        usage:
                fprintf(stderr,
                        "Usage: %s -C|-P|-L -t <topic> "
                        "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
                        "\n"
                        "librdkafka version %s (0x%08x)\n"
                        "\n"
                        " Options:\n"
                        "  -C | -P         Consumer or Producer mode\n"
                        "  -L              Metadata list mode\n"
                        "  -t <topic>      Topic to fetch / produce\n"
                        "  -p <num>        Partition (random partitioner)\n"
                        "  -b <brokers>    Broker address (localhost:9092)\n"
                        "  -z <codec>      Enable compression:\n"
                        "                  none|gzip|snappy|lz4|zstd\n"
                        "  -o <offset>     Start offset (consumer):\n"
                        "                  beginning, end, NNNNN or -NNNNN\n"
                        "                  wmark returns the current hi&lo "
                        "watermarks.\n"
                        "  -e              Exit consumer when last message\n"
                        "                  in partition has been received.\n"
                        "  -d [facs..]     Enable debugging contexts:\n"
                        "                  %s\n"
                        "  -q              Be quiet\n"
                        "  -A              Raw payload output (consumer)\n"
                        "  -H <name[=value]> Add header to message (producer)\n"
                        "  -X <prop=name>  Set arbitrary librdkafka "
                        "configuration property\n"
                        "                  Properties prefixed with \"topic.\" "
                        "will be set on topic object.\n"
                        "  -X list         Show full list of supported "
                        "properties.\n"
                        "  -X dump         Show configuration\n"
                        "  -X <prop>       Get single property value\n"
                        "\n"
                        " In Consumer mode:\n"
                        "  writes fetched messages to stdout\n"
                        " In Producer mode:\n"
                        "  reads messages from stdin and sends to broker\n"
                        " In List mode:\n"
                        "  queries broker for metadata information, "
                        "topic is optional.\n"
                        "\n"
                        "\n"
                        "\n",
                        argv[0], rd_kafka_version_str(), rd_kafka_version(),
                        RD_KAFKA_DEBUG_CONTEXTS);
                exit(1);
        }

        if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
            (mode != 'C' && !isatty(STDOUT_FILENO)))
                quiet = 1;


        signal(SIGINT, stop);
        signal(SIGUSR1, sig_usr1);

        /* Set bootstrap servers */
        if (brokers &&
            rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
                              sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% %s\n", errstr);
                exit(1);
        }

        if (mode == 'P') {
                /*
                 * Producer
                 */
                char buf[2048];
                int sendcnt = 0;

                /* Set up a message delivery report callback.
                 * It will be called once for each message, either on successful
                 * delivery to broker, or upon failure to deliver to broker. */
                rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create new producer: %s\n",
                                errstr);
                        exit(1);
                }

                /* Create topic */
                rkt        = rd_kafka_topic_new(rk, topic, topic_conf);
                topic_conf = NULL; /* Now owned by topic */

                if (!quiet)
                        fprintf(stderr,
                                "%% Type stuff and hit enter to send\n");

                while (run && fgets(buf, sizeof(buf), stdin)) {
                        size_t len = strlen(buf);
                        if (buf[len - 1] == '\n')
                                buf[--len] = '\0';

                        err = RD_KAFKA_RESP_ERR_NO_ERROR;

                        /* Send/Produce message. */
                        if (hdrs) {
                                rd_kafka_headers_t *hdrs_copy;

                                hdrs_copy = rd_kafka_headers_copy(hdrs);

                                err = rd_kafka_producev(
                                    rk, RD_KAFKA_V_RKT(rkt),
                                    RD_KAFKA_V_PARTITION(partition),
                                    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                    RD_KAFKA_V_VALUE(buf, len),
                                    RD_KAFKA_V_HEADERS(hdrs_copy),
                                    RD_KAFKA_V_END);

                                if (err)
                                        rd_kafka_headers_destroy(hdrs_copy);

                        } else {
                                if (rd_kafka_produce(
                                        rkt, partition, RD_KAFKA_MSG_F_COPY,
                                        /* Payload and length */
                                        buf, len,
                                        /* Optional key and its length */
                                        NULL, 0,
                                        /* Message opaque, provided in
                                         * delivery report callback as
                                         * msg_opaque. */
                                        NULL) == -1) {
                                        err = rd_kafka_last_error();
                                }
                        }

                        if (err) {
                                fprintf(stderr,
                                        "%% Failed to produce to topic %s "
                                        "partition %i: %s\n",
                                        rd_kafka_topic_name(rkt), partition,
                                        rd_kafka_err2str(err));

                                /* Poll to handle delivery reports */
                                rd_kafka_poll(rk, 0);
                                continue;
                        }

                        if (!quiet)
                                fprintf(stderr,
                                        "%% Sent %zd bytes to topic "
                                        "%s partition %i\n",
                                        len, rd_kafka_topic_name(rkt),
                                        partition);
                        sendcnt++;
                        /* Poll to handle delivery reports */
                        rd_kafka_poll(rk, 0);
                }

                /* Poll to handle delivery reports */
                rd_kafka_poll(rk, 0);

                /* Wait for messages to be delivered */
                while (run && rd_kafka_outq_len(rk) > 0)
                        rd_kafka_poll(rk, 100);

                /* Destroy topic */
                rd_kafka_topic_destroy(rkt);

                /* Destroy the handle */
                rd_kafka_destroy(rk);

        } else if (mode == 'C') {
                /*
                 * Consumer
                 */

                rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,
                                  0);

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create new consumer: %s\n",
                                errstr);
                        exit(1);
                }

                if (get_wmarks) {
                        int64_t lo, hi;

                        /* Only query for hi&lo partition watermarks */

                        if ((err = rd_kafka_query_watermark_offsets(
                                 rk, topic, partition, &lo, &hi, 5000))) {
                                fprintf(stderr,
                                        "%% query_watermark_offsets() "
                                        "failed: %s\n",
                                        rd_kafka_err2str(err));
                                exit(1);
                        }

                        printf(
                            "%s [%d]: low - high offsets: "
                            "%" PRId64 " - %" PRId64 "\n",
                            topic, partition, lo, hi);

                        rd_kafka_destroy(rk);
                        exit(0);
                }


                /* Create topic */
                rkt        = rd_kafka_topic_new(rk, topic, topic_conf);
                topic_conf = NULL; /* Now owned by topic */

                /* Start consuming */
                if (rd_kafka_consume_start(rkt, partition, start_offset) ==
                    -1) {
                        err = rd_kafka_last_error();
                        fprintf(stderr, "%% Failed to start consuming: %s\n",
                                rd_kafka_err2str(err));
                        if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
                                fprintf(stderr,
                                        "%% Broker based offset storage "
                                        "requires a group.id, "
                                        "add: -X group.id=yourGroup\n");
                        exit(1);
                }

                while (run) {
                        rd_kafka_message_t *rkmessage;

                        /* Poll for errors, etc. */
                        rd_kafka_poll(rk, 0);

                        /* Consume single message.
                         * See rdkafka_performance.c for high speed
                         * consuming of messages. */
                        rkmessage = rd_kafka_consume(rkt, partition, 1000);
                        if (!rkmessage) /* timeout */
                                continue;

                        msg_consume(rkmessage, NULL);

                        /* Return message to rdkafka */
                        rd_kafka_message_destroy(rkmessage);

                        if (seek_offset) {
                                err = rd_kafka_seek(rkt, partition, seek_offset,
                                                    2000);
                                if (err)
                                        printf("Seek failed: %s\n",
                                               rd_kafka_err2str(err));
                                else
                                        printf("Seeked to %" PRId64 "\n",
                                               seek_offset);
                                seek_offset = 0;
                        }
                }

                /* Stop consuming */
                rd_kafka_consume_stop(rkt, partition);

                while (rd_kafka_outq_len(rk) > 0)
                        rd_kafka_poll(rk, 10);

                /* Destroy topic */
                rd_kafka_topic_destroy(rkt);

                /* Destroy handle */
                rd_kafka_destroy(rk);

        } else if (mode == 'L') {
                err = RD_KAFKA_RESP_ERR_NO_ERROR;

                /* Create Kafka handle */
                if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
                                        sizeof(errstr)))) {
                        fprintf(stderr,
                                "%% Failed to create new producer: %s\n",
                                errstr);
                        exit(1);
                }

                /* Create topic */
                if (topic) {
                        rkt        = rd_kafka_topic_new(rk, topic, topic_conf);
                        topic_conf = NULL; /* Now owned by topic */
                } else
                        rkt = NULL;

                while (run) {
                        const struct rd_kafka_metadata *metadata;

                        /* Fetch metadata */
                        err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt, &metadata,
                                                5000);
                        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
                                fprintf(stderr,
                                        "%% Failed to acquire metadata: %s\n",
                                        rd_kafka_err2str(err));
                                run = 0;
                                break;
                        }

                        metadata_print(topic, metadata);

                        rd_kafka_metadata_destroy(metadata);
                        run = 0;
                }

                /* Destroy topic */
                if (rkt)
                        rd_kafka_topic_destroy(rkt);

                /* Destroy the handle */
                rd_kafka_destroy(rk);

                if (topic_conf)
                        rd_kafka_topic_conf_destroy(topic_conf);


                /* Exit right away, dont wait for background cleanup, we haven't
                 * done anything important anyway. */
                exit(err ? 2 : 0);
        }

        if (hdrs)
                rd_kafka_headers_destroy(hdrs);

        if (topic_conf)
                rd_kafka_topic_conf_destroy(topic_conf);

        /* Let background threads clean up and terminate cleanly. */
        run = 5;
        while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
                printf("Waiting for librdkafka to decommission\n");
        if (run <= 0)
                rd_kafka_dump(stdout, rk);

        return 0;
}
