/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2019-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

#include "rdkafka.h"

/**
 * @name Producer transaction tests
 *
 */


/**
 * @brief Produce messages using batch interface.
 */
void do_produce_batch(rd_kafka_t *rk,
                      const char *topic,
                      uint64_t testid,
                      int32_t partition,
                      int msg_base,
                      int cnt) {
        rd_kafka_message_t *messages;
        rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
        int i;
        int ret;
        int remains = cnt;

        TEST_SAY("Batch-producing %d messages to partition %" PRId32 "\n", cnt,
                 partition);

        messages = rd_calloc(sizeof(*messages), cnt);
        for (i = 0; i < cnt; i++) {
                char key[128];
                char value[128];

                test_prepare_msg(testid, partition, msg_base + i, value,
                                 sizeof(value), key, sizeof(key));
                messages[i].key      = rd_strdup(key);
                messages[i].key_len  = strlen(key);
                messages[i].payload  = rd_strdup(value);
                messages[i].len      = strlen(value);
                messages[i]._private = &remains;
        }

        ret = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_COPY,
                                     messages, cnt);

        rd_kafka_topic_destroy(rkt);

        TEST_ASSERT(ret == cnt,
                    "Failed to batch-produce: %d/%d messages produced", ret,
                    cnt);

        for (i = 0; i < cnt; i++) {
                TEST_ASSERT(!messages[i].err, "Failed to produce message: %s",
                            rd_kafka_err2str(messages[i].err));
                rd_free(messages[i].key);
                rd_free(messages[i].payload);
        }
        rd_free(messages);

        /* Wait for deliveries */
        test_wait_delivery(rk, &remains);
}



/**
 * @brief Basic producer transaction testing without consumed input
 *        (only consumed output for verification).
 *        e.g., no consumer offsets to commit with transaction.
 */
static void do_test_basic_producer_txn(rd_bool_t enable_compression) {
        const char *topic       = test_mk_topic_name("0103_transactions", 1);
        const int partition_cnt = 4;
#define _TXNCNT 6
        struct {
                const char *desc;
                uint64_t testid;
                int msgcnt;
                rd_bool_t abort;
                rd_bool_t sync;
                rd_bool_t batch;
                rd_bool_t batch_any;
        } txn[_TXNCNT] = {
            {"Commit transaction, sync producing", 0, 100, rd_false, rd_true},
            {"Commit transaction, async producing", 0, 1000, rd_false,
             rd_false},
            {"Commit transaction, sync batch producing to any partition", 0,
             100, rd_false, rd_true, rd_true, rd_true},
            {"Abort transaction, sync producing", 0, 500, rd_true, rd_true},
            {"Abort transaction, async producing", 0, 5000, rd_true, rd_false},
            {"Abort transaction, sync batch producing to one partition", 0, 500,
             rd_true, rd_true, rd_true, rd_false},

        };
        rd_kafka_t *p, *c;
        rd_kafka_conf_t *conf, *p_conf, *c_conf;
        int i;

        /* Mark one of run modes as quick so we don't run both when
         * in a hurry.*/
        SUB_TEST0(enable_compression /* quick */, "with%s compression",
                  enable_compression ? "" : "out");

        test_conf_init(&conf, NULL, 30);

        /* Create producer */
        p_conf = rd_kafka_conf_dup(conf);
        rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
        test_conf_set(p_conf, "transactional.id", topic);
        if (enable_compression)
                test_conf_set(p_conf, "compression.type", "lz4");
        p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);

        // FIXME: add testing were the txn id is reused (and thus fails)

        /* Create topic */
        test_create_topic(p, topic, partition_cnt, 3);

        /* Create consumer */
        c_conf = conf;
        test_conf_set(conf, "auto.offset.reset", "earliest");
        /* Make sure default isolation.level is transaction aware */
        TEST_ASSERT(
            !strcmp(test_conf_get(c_conf, "isolation.level"), "read_committed"),
            "expected isolation.level=read_committed, not %s",
            test_conf_get(c_conf, "isolation.level"));

        c = test_create_consumer(topic, NULL, c_conf, NULL);

        /* Wait for topic to propagate to avoid test flakyness */
        test_wait_topic_exists(c, topic, tmout_multip(5000));

        /* Subscribe to topic */
        test_consumer_subscribe(c, topic);

        /* Wait for assignment to make sure consumer is fetching messages
         * below, so we can use the poll_no_msgs() timeout to
         * determine that messages were indeed aborted. */
        test_consumer_wait_assignment(c, rd_true);

        /* Init transactions */
        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));

        for (i = 0; i < _TXNCNT; i++) {
                int wait_msgcnt = 0;

                TEST_SAY(_C_BLU "txn[%d]: Begin transaction: %s\n" _C_CLR, i,
                         txn[i].desc);

                /* Begin a transaction */
                TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

                /* If the transaction is aborted it is okay if
                 * messages fail producing, since they'll be
                 * purged from queues. */
                test_curr->ignore_dr_err = txn[i].abort;

                /* Produce messages */
                txn[i].testid = test_id_generate();
                TEST_SAY(
                    "txn[%d]: Produce %d messages %ssynchronously "
                    "with testid %" PRIu64 "\n",
                    i, txn[i].msgcnt, txn[i].sync ? "" : "a", txn[i].testid);

                if (!txn[i].batch) {
                        if (txn[i].sync)
                                test_produce_msgs2(p, topic, txn[i].testid,
                                                   RD_KAFKA_PARTITION_UA, 0,
                                                   txn[i].msgcnt, NULL, 0);
                        else
                                test_produce_msgs2_nowait(
                                    p, topic, txn[i].testid,
                                    RD_KAFKA_PARTITION_UA, 0, txn[i].msgcnt,
                                    NULL, 0, &wait_msgcnt);
                } else if (txn[i].batch_any) {
                        /* Batch: use any partition */
                        do_produce_batch(p, topic, txn[i].testid,
                                         RD_KAFKA_PARTITION_UA, 0,
                                         txn[i].msgcnt);
                } else {
                        /* Batch: specific partition */
                        do_produce_batch(p, topic, txn[i].testid,
                                         1 /* partition */, 0, txn[i].msgcnt);
                }


                /* Abort or commit transaction */
                TEST_SAY("txn[%d]: %s" _C_CLR " transaction\n", i,
                         txn[i].abort ? _C_RED "Abort" : _C_GRN "Commit");
                if (txn[i].abort) {
                        test_curr->ignore_dr_err = rd_true;
                        TEST_CALL_ERROR__(
                            rd_kafka_abort_transaction(p, 30 * 1000));
                } else {
                        test_curr->ignore_dr_err = rd_false;
                        TEST_CALL_ERROR__(
                            rd_kafka_commit_transaction(p, 30 * 1000));
                }

                if (!txn[i].sync)
                        /* Wait for delivery reports */
                        test_wait_delivery(p, &wait_msgcnt);

                /* Consume messages */
                if (txn[i].abort)
                        test_consumer_poll_no_msgs(txn[i].desc, c,
                                                   txn[i].testid, 3000);
                else
                        test_consumer_poll(txn[i].desc, c, txn[i].testid,
                                           partition_cnt, 0, txn[i].msgcnt,
                                           NULL);

                TEST_SAY(_C_GRN "txn[%d]: Finished successfully: %s\n" _C_CLR,
                         i, txn[i].desc);
        }

        rd_kafka_destroy(p);

        test_consumer_close(c);
        rd_kafka_destroy(c);

        SUB_TEST_PASS();
}


/**
 * @brief Consumes \p cnt messages and returns them in the provided array
 *        which must be pre-allocated.
 */
static void
consume_messages(rd_kafka_t *c, rd_kafka_message_t **msgs, int msgcnt) {
        int i = 0;
        while (i < msgcnt) {
                msgs[i] = rd_kafka_consumer_poll(c, 1000);
                if (!msgs[i])
                        continue;

                if (msgs[i]->err) {
                        TEST_SAY("%s consumer error: %s\n", rd_kafka_name(c),
                                 rd_kafka_message_errstr(msgs[i]));
                        rd_kafka_message_destroy(msgs[i]);
                        continue;
                }

                TEST_SAYL(3, "%s: consumed message %s [%d] @ %" PRId64 "\n",
                          rd_kafka_name(c), rd_kafka_topic_name(msgs[i]->rkt),
                          msgs[i]->partition, msgs[i]->offset);


                i++;
        }
}

static void destroy_messages(rd_kafka_message_t **msgs, int msgcnt) {
        while (msgcnt-- > 0)
                rd_kafka_message_destroy(msgs[msgcnt]);
}


/**
 * @brief Test a transactional consumer + transactional producer combo,
 *        mimicing a streams job.
 *
 * One input topic produced to by transactional producer 1,
 * consumed by transactional consumer 1, which forwards messages
 * to transactional producer 2 that writes messages to output topic,
 * which is consumed and verified by transactional consumer 2.
 *
 * Every 3rd transaction is aborted.
 */
void do_test_consumer_producer_txn(void) {
        char *input_topic =
            rd_strdup(test_mk_topic_name("0103-transactions-input", 1));
        char *output_topic =
            rd_strdup(test_mk_topic_name("0103-transactions-output", 1));
        const char *c1_groupid = input_topic;
        const char *c2_groupid = output_topic;
        rd_kafka_t *p1, *p2, *c1, *c2;
        rd_kafka_conf_t *conf, *tmpconf;
        uint64_t testid;
#define _MSGCNT (10 * 30)
        const int txncnt = 10;
        const int msgcnt = _MSGCNT;
        int txn;
        int committed_msgcnt = 0;
        test_msgver_t expect_mv, actual_mv;

        SUB_TEST_QUICK("transactional test with %d transactions", txncnt);

        test_conf_init(&conf, NULL, 30);

        testid = test_id_generate();

        /*
         *
         * Producer 1
         *     |
         *     v
         * input topic
         *     |
         *     v
         * Consumer 1    }
         *     |         } transactional streams job
         *     v         }
         * Producer 2    }
         *     |
         *     v
         * output tpic
         *     |
         *     v
         * Consumer 2
         */


        /* Create Producer 1 and seed input topic */
        tmpconf = rd_kafka_conf_dup(conf);
        test_conf_set(tmpconf, "transactional.id", input_topic);
        rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
        p1 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);

        /* Create input and output topics */
        test_create_topic(p1, input_topic, 4, 3);
        test_create_topic(p1, output_topic, 4, 3);

        /* Seed input topic with messages */
        TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30 * 1000));
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));
        test_produce_msgs2(p1, input_topic, testid, RD_KAFKA_PARTITION_UA, 0,
                           msgcnt, NULL, 0);
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(p1, 30 * 1000));

        rd_kafka_destroy(p1);

        /* Create Consumer 1: reading msgs from input_topic (Producer 1) */
        tmpconf = rd_kafka_conf_dup(conf);
        test_conf_set(tmpconf, "isolation.level", "read_committed");
        test_conf_set(tmpconf, "auto.offset.reset", "earliest");
        test_conf_set(tmpconf, "enable.auto.commit", "false");
        c1 = test_create_consumer(c1_groupid, NULL, tmpconf, NULL);
        test_consumer_subscribe(c1, input_topic);

        /* Create Producer 2 */
        tmpconf = rd_kafka_conf_dup(conf);
        test_conf_set(tmpconf, "transactional.id", output_topic);
        rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
        p2 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);
        TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30 * 1000));

        /* Create Consumer 2: reading msgs from output_topic (Producer 2) */
        tmpconf = rd_kafka_conf_dup(conf);
        test_conf_set(tmpconf, "isolation.level", "read_committed");
        test_conf_set(tmpconf, "auto.offset.reset", "earliest");
        c2 = test_create_consumer(c2_groupid, NULL, tmpconf, NULL);
        test_consumer_subscribe(c2, output_topic);

        /* Keep track of what messages to expect on the output topic */
        test_msgver_init(&expect_mv, testid);

        for (txn = 0; txn < txncnt; txn++) {
                int msgcnt2 = 10 * (1 + (txn % 3));
                rd_kafka_message_t *msgs[_MSGCNT];
                int i;
                rd_bool_t do_abort = !(txn % 3);
                rd_bool_t recreate_consumer =
                    (do_abort && txn == 3) || (!do_abort && txn == 2);
                rd_kafka_topic_partition_list_t *offsets,
                    *expected_offsets = NULL;
                rd_kafka_resp_err_t err;
                rd_kafka_consumer_group_metadata_t *c1_cgmetadata;
                int remains = msgcnt2;

                TEST_SAY(_C_BLU
                         "Begin transaction #%d/%d "
                         "(msgcnt=%d, do_abort=%s, recreate_consumer=%s)\n",
                         txn, txncnt, msgcnt2, do_abort ? "true" : "false",
                         recreate_consumer ? "true" : "false");

                consume_messages(c1, msgs, msgcnt2);

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(p2));

                for (i = 0; i < msgcnt2; i++) {
                        rd_kafka_message_t *msg = msgs[i];

                        if (!do_abort) {
                                /* The expected msgver based on the input topic
                                 * will be compared to the actual msgver based
                                 * on the output topic, so we need to
                                 * override the topic name to match
                                 * the actual msgver's output topic. */
                                test_msgver_add_msg0(
                                    __FUNCTION__, __LINE__, rd_kafka_name(p2),
                                    &expect_mv, msg, output_topic);
                                committed_msgcnt++;
                        }

                        err = rd_kafka_producev(
                            p2, RD_KAFKA_V_TOPIC(output_topic),
                            RD_KAFKA_V_KEY(msg->key, msg->key_len),
                            RD_KAFKA_V_VALUE(msg->payload, msg->len),
                            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                            RD_KAFKA_V_OPAQUE(&remains), RD_KAFKA_V_END);
                        TEST_ASSERT(!err, "produce failed: %s",
                                    rd_kafka_err2str(err));

                        rd_kafka_poll(p2, 0);
                }

                destroy_messages(msgs, msgcnt2);

                err = rd_kafka_assignment(c1, &offsets);
                TEST_ASSERT(!err, "failed to get consumer assignment: %s",
                            rd_kafka_err2str(err));

                err = rd_kafka_position(c1, offsets);
                TEST_ASSERT(!err, "failed to get consumer position: %s",
                            rd_kafka_err2str(err));

                c1_cgmetadata = rd_kafka_consumer_group_metadata(c1);
                TEST_ASSERT(c1_cgmetadata != NULL,
                            "failed to get consumer group metadata");

                TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
                    p2, offsets, c1_cgmetadata, -1));

                if (recreate_consumer && !do_abort) {
                        expected_offsets =
                            rd_kafka_topic_partition_list_new(offsets->cnt);

                        /* Cannot use rd_kafka_topic_partition_list_copy
                         * as it needs to be destroyed before closing the
                         * consumer, because of the _private field holding
                         * a reference to the internal toppar */
                        for (i = 0; i < offsets->cnt; i++) {
                                rd_kafka_topic_partition_t *rktpar =
                                    &offsets->elems[i];
                                rd_kafka_topic_partition_t *rktpar_new;
                                rktpar_new = rd_kafka_topic_partition_list_add(
                                    expected_offsets, rktpar->topic,
                                    rktpar->partition);
                                rktpar_new->offset = rktpar->offset;
                                rd_kafka_topic_partition_set_leader_epoch(
                                    rktpar_new,
                                    rd_kafka_topic_partition_get_leader_epoch(
                                        rktpar));
                        }
                }

                rd_kafka_consumer_group_metadata_destroy(c1_cgmetadata);

                rd_kafka_topic_partition_list_destroy(offsets);


                if (do_abort) {
                        test_curr->ignore_dr_err = rd_true;
                        TEST_CALL_ERROR__(
                            rd_kafka_abort_transaction(p2, 30 * 1000));
                } else {
                        test_curr->ignore_dr_err = rd_false;
                        TEST_CALL_ERROR__(
                            rd_kafka_commit_transaction(p2, 30 * 1000));
                }

                TEST_ASSERT(remains == 0,
                            "expected no remaining messages "
                            "in-flight/in-queue, got %d",
                            remains);


                if (recreate_consumer) {
                        /* Recreate the consumer to pick up
                         * on the committed offset. */
                        TEST_SAY("Recreating consumer 1\n");
                        rd_kafka_consumer_close(c1);
                        rd_kafka_destroy(c1);

                        tmpconf = rd_kafka_conf_dup(conf);
                        test_conf_set(tmpconf, "isolation.level",
                                      "read_committed");
                        test_conf_set(tmpconf, "auto.offset.reset", "earliest");
                        test_conf_set(tmpconf, "enable.auto.commit", "false");
                        c1 = test_create_consumer(c1_groupid, NULL, tmpconf,
                                                  NULL);
                        test_consumer_subscribe(c1, input_topic);


                        if (expected_offsets) {
                                rd_kafka_topic_partition_list_t
                                    *committed_offsets =
                                        rd_kafka_topic_partition_list_copy(
                                            expected_offsets);
                                /* Set committed offsets and epochs to a
                                 * different value before requesting them. */
                                for (i = 0; i < committed_offsets->cnt; i++) {
                                        rd_kafka_topic_partition_t *rktpar =
                                            &committed_offsets->elems[i];
                                        rktpar->offset = -100;
                                        rd_kafka_topic_partition_set_leader_epoch(
                                            rktpar, -100);
                                }

                                TEST_CALL_ERR__(rd_kafka_committed(
                                    c1, committed_offsets, -1));

                                if (test_partition_list_and_offsets_cmp(
                                        expected_offsets, committed_offsets)) {
                                        TEST_SAY("expected list:\n");
                                        test_print_partition_list(
                                            expected_offsets);
                                        TEST_SAY("committed() list:\n");
                                        test_print_partition_list(
                                            committed_offsets);
                                        TEST_FAIL(
                                            "committed offsets don't match");
                                }

                                rd_kafka_topic_partition_list_destroy(
                                    committed_offsets);

                                rd_kafka_topic_partition_list_destroy(
                                    expected_offsets);
                        }
                }
        }

        rd_kafka_conf_destroy(conf);

        test_msgver_init(&actual_mv, testid);

        test_consumer_poll("Verify output topic", c2, testid, -1, 0,
                           committed_msgcnt, &actual_mv);

        test_msgver_verify_compare("Verify output topic", &actual_mv,
                                   &expect_mv, TEST_MSGVER_ALL);

        test_msgver_clear(&actual_mv);
        test_msgver_clear(&expect_mv);

        rd_kafka_consumer_close(c1);
        rd_kafka_consumer_close(c2);
        rd_kafka_destroy(c1);
        rd_kafka_destroy(c2);
        rd_kafka_destroy(p2);

        rd_free(input_topic);
        rd_free(output_topic);

        SUB_TEST_PASS();
}


/**
 * @brief Testing misuse of the transaction API.
 */
static void do_test_misuse_txn(void) {
        const char *topic = test_mk_topic_name("0103-test_misuse_txn", 1);
        rd_kafka_t *p;
        rd_kafka_conf_t *conf;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t fatal_err;
        char errstr[512];
        int i;

        /*
         * transaction.timeout.ms out of range (from broker's point of view)
         */
        SUB_TEST_QUICK();

        test_conf_init(&conf, NULL, 10);

        test_conf_set(conf, "transactional.id", topic);
        test_conf_set(conf, "transaction.timeout.ms", "2147483647");

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        error = rd_kafka_init_transactions(p, 10 * 1000);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) ==
                        RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
                    "Expected error ERR_INVALID_TRANSACTION_TIMEOUT, "
                    "not %s: %s",
                    rd_kafka_error_name(error),
                    error ? rd_kafka_error_string(error) : "");
        TEST_ASSERT(rd_kafka_error_is_fatal(error),
                    "Expected error to have is_fatal() set");
        rd_kafka_error_destroy(error);
        /* Check that a fatal error is raised */
        fatal_err = rd_kafka_fatal_error(p, errstr, sizeof(errstr));
        TEST_ASSERT(fatal_err == RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
                    "Expected fatal error ERR_INVALID_TRANSACTION_TIMEOUT, "
                    "not %s: %s",
                    rd_kafka_err2name(fatal_err), fatal_err ? errstr : "");

        rd_kafka_destroy(p);


        /*
         * Multiple calls to init_transactions(): finish on first.
         */
        TEST_SAY("[ Test multiple init_transactions(): finish on first ]\n");
        test_conf_init(&conf, NULL, 10);

        test_conf_set(conf, "transactional.id", topic);

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));

        error = rd_kafka_init_transactions(p, 1);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
                    "Expected ERR__STATE error, not %s",
                    rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        error = rd_kafka_init_transactions(p, 3 * 1000);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
                    "Expected ERR__STATE error, not %s",
                    rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        rd_kafka_destroy(p);


        /*
         * Multiple calls to init_transactions(): timeout on first.
         */
        TEST_SAY("[ Test multiple init_transactions(): timeout on first ]\n");
        test_conf_init(&conf, NULL, 10);

        test_conf_set(conf, "transactional.id", topic);

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        error = rd_kafka_init_transactions(p, 1);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_SAY("error: %s, %d\n", rd_kafka_error_string(error),
                 rd_kafka_error_is_retriable(error));
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected ERR__TIMED_OUT, not %s: %s",
                    rd_kafka_error_name(error), rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_is_retriable(error),
                    "Expected error to be retriable");
        rd_kafka_error_destroy(error);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));

        rd_kafka_destroy(p);


        /*
         * Multiple calls to init_transactions(): hysterical amounts
         */
        TEST_SAY("[ Test multiple init_transactions(): hysterical amounts ]\n");
        test_conf_init(&conf, NULL, 10);

        test_conf_set(conf, "transactional.id", topic);

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        /* Call until init succeeds */
        for (i = 0; i < 5000; i++) {
                if (!(error = rd_kafka_init_transactions(p, 1)))
                        break;

                TEST_ASSERT(rd_kafka_error_is_retriable(error),
                            "Expected error to be retriable");
                rd_kafka_error_destroy(error);

                error = rd_kafka_begin_transaction(p);
                TEST_ASSERT(error, "Expected begin_transactions() to fail");
                TEST_ASSERT(rd_kafka_error_code(error) ==
                                RD_KAFKA_RESP_ERR__CONFLICT,
                            "Expected begin_transactions() to fail "
                            "with CONFLICT, not %s",
                            rd_kafka_error_name(error));

                rd_kafka_error_destroy(error);
        }

        TEST_ASSERT(i <= 5000,
                    "init_transactions() did not succeed after %d calls\n", i);

        TEST_SAY("init_transactions() succeeded after %d call(s)\n", i + 1);

        /* Make sure a sub-sequent init call fails. */
        error = rd_kafka_init_transactions(p, 5 * 1000);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
                    "Expected init_transactions() to fail with STATE, not %s",
                    rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        /* But begin.. should work now */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        rd_kafka_destroy(p);

        SUB_TEST_PASS();
}


/**
 * @brief is_fatal_cb for fenced_txn test.
 */
static int fenced_txn_is_fatal_cb(rd_kafka_t *rk,
                                  rd_kafka_resp_err_t err,
                                  const char *reason) {
        TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
        if (err == RD_KAFKA_RESP_ERR__FENCED) {
                TEST_SAY("Saw the expected fatal error\n");
                return 0;
        }
        return 1;
}


/**
 * @brief Check that transaction fencing is handled correctly.
 */
static void do_test_fenced_txn(rd_bool_t produce_after_fence) {
        const char *topic = test_mk_topic_name("0103_fenced_txn", 1);
        rd_kafka_conf_t *conf;
        rd_kafka_t *p1, *p2;
        rd_kafka_error_t *error;
        uint64_t testid;

        SUB_TEST_QUICK("%sproduce after fence",
                       produce_after_fence ? "" : "do not ");

        if (produce_after_fence)
                test_curr->is_fatal_cb = fenced_txn_is_fatal_cb;

        test_curr->ignore_dr_err = rd_false;

        testid = test_id_generate();

        test_conf_init(&conf, NULL, 30);

        test_conf_set(conf, "transactional.id", topic);
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

        p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
        p2 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
        rd_kafka_conf_destroy(conf);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30 * 1000));

        /* Begin a transaction */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));

        /* Produce some messages */
        test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA, 0, 10,
                           NULL, 0);

        /* Initialize transactions on producer 2, this should
         * fence off producer 1. */
        TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30 * 1000));

        if (produce_after_fence) {
                /* This will fail hard since the epoch was bumped. */
                TEST_SAY("Producing after producing fencing\n");
                test_curr->ignore_dr_err = rd_true;
                test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA, 0,
                                   10, NULL, 0);
        }


        error = rd_kafka_commit_transaction(p1, 30 * 1000);

        TEST_ASSERT(error, "Expected commit to fail");
        TEST_ASSERT(rd_kafka_fatal_error(p1, NULL, 0),
                    "Expected a fatal error to have been raised");
        TEST_ASSERT(error, "Expected commit_transaction() to fail");
        TEST_ASSERT(rd_kafka_error_is_fatal(error),
                    "Expected commit_transaction() to return a "
                    "fatal error");
        TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
                    "Expected commit_transaction() not to return an "
                    "abortable error");
        TEST_ASSERT(!rd_kafka_error_is_retriable(error),
                    "Expected commit_transaction() not to return a "
                    "retriable error");
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__FENCED,
                    "Expected commit_transaction() to return %s, "
                    "not %s: %s",
                    rd_kafka_err2name(RD_KAFKA_RESP_ERR__FENCED),
                    rd_kafka_error_name(error), rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);

        rd_kafka_destroy(p1);
        rd_kafka_destroy(p2);

        /* Make sure no messages were committed. */
        test_consume_txn_msgs_easy(
            topic, topic, testid,
            test_get_partition_count(NULL, topic, 10 * 1000), 0, NULL);

        SUB_TEST_PASS();
}



/**
 * @brief Check that fatal idempotent producer errors are also fatal
 *        transactional errors when KIP-360 is not supported.
 */
static void do_test_fatal_idempo_error_without_kip360(void) {
        const char *topic       = test_mk_topic_name("0103_fatal_idempo", 1);
        const int32_t partition = 0;
        rd_kafka_conf_t *conf, *c_conf;
        rd_kafka_t *p, *c;
        rd_kafka_error_t *error;
        uint64_t testid;
        const int msgcnt[3] = {6, 4, 1};
        rd_kafka_topic_partition_list_t *records;
        test_msgver_t expect_mv, actual_mv;
        /* This test triggers UNKNOWN_PRODUCER_ID on AK <2.4 and >2.4, but
         * not on AK 2.4.
         * On AK <2.5 (pre KIP-360) these errors are unrecoverable,
         * on AK >2.5 (with KIP-360) we can recover.
         * Since 2.4 is not behaving as the other releases we skip it here. */
        rd_bool_t expect_fail = test_broker_version < TEST_BRKVER(2, 5, 0, 0);

        SUB_TEST_QUICK(
            "%s", expect_fail ? "expecting failure since broker is < 2.5"
                              : "not expecting failure since broker is >= 2.5");

        if (test_broker_version >= TEST_BRKVER(2, 4, 0, 0) &&
            test_broker_version < TEST_BRKVER(2, 5, 0, 0))
                SUB_TEST_SKIP("can't trigger UNKNOWN_PRODUCER_ID on AK 2.4");

        if (expect_fail)
                test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
        test_curr->ignore_dr_err = expect_fail;

        testid = test_id_generate();

        /* Keep track of what messages to expect on the output topic */
        test_msgver_init(&expect_mv, testid);

        test_conf_init(&conf, NULL, 30);

        test_conf_set(conf, "transactional.id", topic);
        test_conf_set(conf, "batch.num.messages", "1");
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        test_create_topic(p, topic, 1, 3);


        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));

        /*
         * 3 transactions:
         *  1. Produce some messages, commit.
         *  2. Produce some messages, then delete the messages from txn 1 and
         *     then produce some more messages: UNKNOWN_PRODUCER_ID should be
         *     raised as a fatal error.
         *  3. Start a new transaction, produce and commit some new messages.
         *     (this step is only performed when expect_fail is false).
         */

        /*
         * Transaction 1
         */
        TEST_SAY(_C_BLU "Transaction 1: %d msgs\n", msgcnt[0]);
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
        test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[0], NULL, 0);
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));


        /*
         * Transaction 2
         */
        TEST_SAY(_C_BLU "Transaction 2: %d msgs\n", msgcnt[1]);
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        /* Now delete the messages from txn1 */
        TEST_SAY("Deleting records < %s [%" PRId32 "] offset %d+1\n", topic,
                 partition, msgcnt[0]);
        records = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(records, topic, partition)->offset =
            msgcnt[0]; /* include the control message too */

        TEST_CALL_ERR__(test_DeleteRecords_simple(p, NULL, records, NULL));
        rd_kafka_topic_partition_list_destroy(records);

        /* Wait for deletes to propagate */
        rd_sleep(2);

        if (!expect_fail)
                test_curr->dr_mv = &expect_mv;

        /* Produce more messages, should now fail */
        test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[1], NULL, 0);

        error = rd_kafka_commit_transaction(p, -1);

        TEST_SAY_ERROR(error, "commit_transaction() returned: ");

        if (expect_fail) {
                TEST_ASSERT(error != NULL, "Expected transaction to fail");
                TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                            "Expected abortable error");
                rd_kafka_error_destroy(error);

                /* Now abort transaction, which should raise the fatal error
                 * since it is the abort that performs the PID reinitialization.
                 */
                error = rd_kafka_abort_transaction(p, -1);
                TEST_SAY_ERROR(error, "abort_transaction() returned: ");
                TEST_ASSERT(error != NULL, "Expected abort to fail");
                TEST_ASSERT(rd_kafka_error_is_fatal(error),
                            "Expecting fatal error");
                TEST_ASSERT(!rd_kafka_error_is_retriable(error),
                            "Did not expect retriable error");
                TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
                            "Did not expect abortable error");

                rd_kafka_error_destroy(error);

        } else {
                TEST_ASSERT(!error, "Did not expect commit to fail: %s",
                            rd_kafka_error_string(error));
        }


        if (!expect_fail) {
                /*
                 * Transaction 3
                 */
                TEST_SAY(_C_BLU "Transaction 3: %d msgs\n", msgcnt[2]);
                test_curr->dr_mv = &expect_mv;
                TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
                test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[2],
                                   NULL, 0);
                TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
        }

        rd_kafka_destroy(p);

        /* Consume messages.
         * On AK<2.5 (expect_fail=true) we do not expect to see any messages
         * since the producer will have failed with a fatal error.
         * On AK>=2.5 (expect_fail=false) we should only see messages from
         * txn 3 which are sent after the producer has recovered.
         */

        test_conf_init(&c_conf, NULL, 0);
        test_conf_set(c_conf, "enable.partition.eof", "true");
        c = test_create_consumer(topic, NULL, c_conf, NULL);
        test_consumer_assign_partition("consume", c, topic, partition,
                                       RD_KAFKA_OFFSET_BEGINNING);

        test_msgver_init(&actual_mv, testid);
        test_msgver_ignore_eof(&actual_mv);

        test_consumer_poll("Verify output topic", c, testid, 1, 0, -1,
                           &actual_mv);

        test_msgver_verify_compare("Verify output topic", &actual_mv,
                                   &expect_mv, TEST_MSGVER_ALL);

        test_msgver_clear(&actual_mv);
        test_msgver_clear(&expect_mv);

        rd_kafka_destroy(c);

        SUB_TEST_PASS();
}


/**
 * @brief Check that empty transactions, with no messages produced, work
 *        as expected.
 */
static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) {
        const char *topic = test_mk_topic_name("0103_empty_txn", 1);
        rd_kafka_conf_t *conf, *c_conf;
        rd_kafka_t *p, *c;
        uint64_t testid;
        const int msgcnt = 10;
        rd_kafka_topic_partition_list_t *committed;
        int64_t offset;

        SUB_TEST_QUICK("%ssend offsets, %s", send_offsets ? "" : "don't ",
                       do_commit ? "commit" : "abort");

        testid = test_id_generate();

        test_conf_init(&conf, NULL, 30);
        c_conf = rd_kafka_conf_dup(conf);

        test_conf_set(conf, "transactional.id", topic);
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        test_create_topic(p, topic, 1, 3);

        /* Produce some non-txnn messages for the consumer to read and commit */
        test_produce_msgs_easy(topic, testid, 0, msgcnt);

        /* Create consumer and subscribe to the topic */
        test_conf_set(c_conf, "auto.offset.reset", "earliest");
        test_conf_set(c_conf, "enable.auto.commit", "false");
        c = test_create_consumer(topic, NULL, c_conf, NULL);
        test_consumer_subscribe(c, topic);
        test_consumer_wait_assignment(c, rd_false);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        /* send_offsets? Consume messages and send those offsets to the txn */
        if (send_offsets) {
                rd_kafka_topic_partition_list_t *offsets;
                rd_kafka_consumer_group_metadata_t *cgmetadata;

                test_consumer_poll("consume", c, testid, -1, 0, msgcnt, NULL);

                TEST_CALL_ERR__(rd_kafka_assignment(c, &offsets));
                TEST_CALL_ERR__(rd_kafka_position(c, offsets));

                cgmetadata = rd_kafka_consumer_group_metadata(c);
                TEST_ASSERT(cgmetadata != NULL,
                            "failed to get consumer group metadata");

                TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
                    p, offsets, cgmetadata, -1));

                rd_kafka_consumer_group_metadata_destroy(cgmetadata);

                rd_kafka_topic_partition_list_destroy(offsets);
        }


        if (do_commit)
                TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
        else
                TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1));

        /* Wait before checking the committed offsets (Kafka < 2.5.0) */
        if (test_broker_version < TEST_BRKVER(2, 5, 0, 0))
                rd_usleep(tmout_multip(5000 * 1000), NULL);

        /* Get the committed offsets */
        TEST_CALL_ERR__(rd_kafka_assignment(c, &committed));
        TEST_CALL_ERR__(rd_kafka_committed(c, committed, 10 * 1000));

        TEST_ASSERT(committed->cnt == 1,
                    "expected one committed offset, not %d", committed->cnt);
        offset = committed->elems[0].offset;
        TEST_SAY("Committed offset is %" PRId64 "\n", offset);

        if (do_commit && send_offsets)
                TEST_ASSERT(offset >= msgcnt,
                            "expected committed offset >= %d, got %" PRId64,
                            msgcnt, offset);
        else
                TEST_ASSERT(offset < 0,
                            "expected no committed offset, got %" PRId64,
                            offset);

        rd_kafka_topic_partition_list_destroy(committed);

        rd_kafka_destroy(c);
        rd_kafka_destroy(p);

        SUB_TEST_PASS();
}


/**
 * @brief A control message should increase stored offset and
 *        that stored offset should have correct leader epoch
 *        and be included in commit.
 *        See #4384.
 */
static void do_test_txn_abort_control_message_leader_epoch(void) {
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);

        rd_kafka_t *p, *c;
        rd_kafka_conf_t *p_conf, *c_conf;
        test_msgver_t mv;
        int exp_msg_cnt = 0;
        uint64_t testid = test_id_generate();
        rd_kafka_topic_partition_list_t *offsets;
        int r;

        SUB_TEST_QUICK();

        test_conf_init(&p_conf, NULL, 30);
        c_conf = rd_kafka_conf_dup(p_conf);

        test_conf_set(p_conf, "transactional.id", topic);
        rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
        p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);

        test_create_topic(p, topic, 1, 3);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        /* Produce one message */
        test_produce_msgs2(p, topic, testid, RD_KAFKA_PARTITION_UA, 0, 1, NULL,
                           0);

        /* Abort the transaction */
        TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1));

        /**
         * Create consumer.
         */
        test_conf_set(c_conf, "enable.auto.commit", "false");
        test_conf_set(c_conf, "group.id", topic);
        test_conf_set(c_conf, "enable.partition.eof", "true");
        test_conf_set(c_conf, "auto.offset.reset", "earliest");
        test_msgver_init(&mv, testid);
        c = test_create_consumer(topic, NULL, c_conf, NULL);


        test_consumer_subscribe(c, topic);
        /* Expect 0 messages and 1 EOF */
        r = test_consumer_poll("consume.nothing", c, testid,
                               /* exp_eof_cnt */ 1,
                               /* exp_msg_base */ 0, exp_msg_cnt, &mv);
        test_msgver_clear(&mv);

        TEST_ASSERT(r == exp_msg_cnt, "expected %d messages, got %d",
                    exp_msg_cnt, r);

        /* Commits offset 2 (1 aborted message + 1 control message) */
        TEST_CALL_ERR__(rd_kafka_commit(c, NULL, rd_false));

        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, topic, 0);
        rd_kafka_committed(c, offsets, -1);

        /* Committed offset must be 2 */
        TEST_ASSERT(offsets->cnt == 1, "expected 1 partition, got %d",
                    offsets->cnt);
        TEST_ASSERT(offsets->elems[0].offset == 2,
                    "expected offset 2, got %" PRId64,
                    offsets->elems[0].offset);

        /* All done */
        test_consumer_close(c);
        rd_kafka_topic_partition_list_destroy(offsets);
        rd_kafka_destroy(c);
        rd_kafka_destroy(p);

        SUB_TEST_PASS();
}

/**
 * @returns the high watermark for the given partition.
 */
int64_t
query_hi_wmark0(int line, rd_kafka_t *c, const char *topic, int32_t partition) {
        rd_kafka_resp_err_t err;
        int64_t lo = -1, hi = -1;

        err = rd_kafka_query_watermark_offsets(c, topic, partition, &lo, &hi,
                                               tmout_multip(5 * 1000));
        TEST_ASSERT(!err, "%d: query_watermark_offsets(%s) failed: %s", line,
                    topic, rd_kafka_err2str(err));

        return hi;
}
#define query_hi_wmark(c, topic, part) query_hi_wmark0(__LINE__, c, topic, part)

/**
 * @brief Check that isolation.level works as expected for query_watermark..().
 */
static void do_test_wmark_isolation_level(void) {
        const char *topic = test_mk_topic_name("0103_wmark_isol", 1);
        rd_kafka_conf_t *conf, *c_conf;
        rd_kafka_t *p, *c1, *c2;
        uint64_t testid;
        int64_t hw_uncommitted, hw_committed;

        SUB_TEST_QUICK();

        testid = test_id_generate();

        test_conf_init(&conf, NULL, 30);
        c_conf = rd_kafka_conf_dup(conf);

        test_conf_set(conf, "transactional.id", topic);
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
        p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));

        test_create_topic(p, topic, 1, 3);

        /* Produce some non-txn messages to avoid 0 as the committed hwmark */
        test_produce_msgs_easy(topic, testid, 0, 100);

        /* Create consumer and subscribe to the topic */
        test_conf_set(c_conf, "isolation.level", "read_committed");
        c1 = test_create_consumer(topic, NULL, rd_kafka_conf_dup(c_conf), NULL);
        test_conf_set(c_conf, "isolation.level", "read_uncommitted");
        c2 = test_create_consumer(topic, NULL, c_conf, NULL);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

        /* Produce some txn messages */
        test_produce_msgs2(p, topic, testid, 0, 0, 100, NULL, 0);

        test_flush(p, 10 * 1000);

        hw_committed   = query_hi_wmark(c1, topic, 0);
        hw_uncommitted = query_hi_wmark(c2, topic, 0);

        TEST_SAY("Pre-commit hwmarks: committed %" PRId64
                 ", uncommitted %" PRId64 "\n",
                 hw_committed, hw_uncommitted);

        TEST_ASSERT(hw_committed > 0 && hw_committed < hw_uncommitted,
                    "Committed hwmark %" PRId64
                    " should be lower than "
                    "uncommitted hwmark %" PRId64 " for %s [0]",
                    hw_committed, hw_uncommitted, topic);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));

        /* Re-create the producer and re-init transactions to make
         * sure the transaction is fully committed in the cluster. */
        rd_kafka_destroy(p);
        p = test_create_handle(RD_KAFKA_PRODUCER, conf);
        TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
        rd_kafka_destroy(p);


        /* Now query wmarks again */
        hw_committed   = query_hi_wmark(c1, topic, 0);
        hw_uncommitted = query_hi_wmark(c2, topic, 0);

        TEST_SAY("Post-commit hwmarks: committed %" PRId64
                 ", uncommitted %" PRId64 "\n",
                 hw_committed, hw_uncommitted);

        TEST_ASSERT(hw_committed == hw_uncommitted,
                    "Committed hwmark %" PRId64
                    " should be equal to "
                    "uncommitted hwmark %" PRId64 " for %s [0]",
                    hw_committed, hw_uncommitted, topic);

        rd_kafka_destroy(c1);
        rd_kafka_destroy(c2);

        SUB_TEST_PASS();
}



int main_0103_transactions(int argc, char **argv) {

        do_test_misuse_txn();
        do_test_basic_producer_txn(rd_false /* without compression */);
        do_test_basic_producer_txn(rd_true /* with compression */);
        do_test_consumer_producer_txn();
        do_test_fenced_txn(rd_false /* no produce after fencing */);
        do_test_fenced_txn(rd_true /* produce after fencing */);
        do_test_fatal_idempo_error_without_kip360();
        do_test_empty_txn(rd_false /*don't send offsets*/, rd_true /*commit*/);
        do_test_empty_txn(rd_false /*don't send offsets*/, rd_false /*abort*/);
        do_test_empty_txn(rd_true /*send offsets*/, rd_true /*commit*/);
        do_test_empty_txn(rd_true /*send offsets*/, rd_false /*abort*/);
        do_test_wmark_isolation_level();
        do_test_txn_abort_control_message_leader_epoch();
        return 0;
}



/**
 * @brief Transaction tests that don't require a broker.
 */
static void do_test_txn_local(void) {
        rd_kafka_conf_t *conf;
        rd_kafka_t *p;
        rd_kafka_error_t *error;
        test_timing_t t_init;
        int timeout_ms = 7 * 1000;

        SUB_TEST_QUICK();

        /*
         * No transactional.id, init_transactions() should fail.
         */
        test_conf_init(&conf, NULL, 0);
        test_conf_set(conf, "bootstrap.servers", NULL);

        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        error = rd_kafka_init_transactions(p, 10);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(
            rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
            "Expected ERR__NOT_CONFIGURED, not %s", rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        rd_kafka_destroy(p);


        /*
         * No brokers, init_transactions() should time out according
         * to the timeout.
         */
        test_conf_init(&conf, NULL, 0);
        test_conf_set(conf, "bootstrap.servers", NULL);
        test_conf_set(conf, "transactional.id", "test");
        p = test_create_handle(RD_KAFKA_PRODUCER, conf);

        TEST_SAY("Waiting for init_transactions() timeout %d ms\n", timeout_ms);

        test_timeout_set((timeout_ms + 2000) / 1000);

        TIMING_START(&t_init, "init_transactions()");
        error = rd_kafka_init_transactions(p, timeout_ms);
        TIMING_STOP(&t_init);
        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected RD_KAFKA_RESP_ERR__TIMED_OUT, "
                    "not %s: %s",
                    rd_kafka_error_name(error), rd_kafka_error_string(error));

        TEST_SAY("init_transactions() failed as expected: %s\n",
                 rd_kafka_error_string(error));

        rd_kafka_error_destroy(error);

        TIMING_ASSERT(&t_init, timeout_ms - 2000, timeout_ms + 5000);

        rd_kafka_destroy(p);

        SUB_TEST_PASS();
}


int main_0103_transactions_local(int argc, char **argv) {

        do_test_txn_local();

        return 0;
}
