/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2020-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

#include "../src/rdkafka_proto.h"


/**
 * @name Verify that producer and consumer resumes operation after
 *       a topic has been deleted and recreated.
 */

/**
 * The message value to produce, one of:
 *   "before"  - before topic deletion
 *   "during"  - during topic deletion
 *   "after"   - after topic has been re-created
 *   "end"     - stop producing
 */
static mtx_t value_mtx;
static char *value;

static const int msg_rate = 10; /**< Messages produced per second */

static struct test *this_test; /**< Exposes current test struct (in TLS) to
                                *   producer thread. */


/**
 * @brief Treat all error_cb as non-test-fatal.
 */
static int
is_error_fatal(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
        return rd_false;
}

/**
 * @brief Producing thread
 */
static int run_producer(void *arg) {
        const char *topic    = arg;
        rd_kafka_t *producer = test_create_producer();
        int ret              = 0;

        test_curr = this_test;

        /* Don't check message status */
        test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;

        while (1) {
                rd_kafka_resp_err_t err;

                mtx_lock(&value_mtx);
                if (!strcmp(value, "end")) {
                        mtx_unlock(&value_mtx);
                        break;
                } else if (strcmp(value, "before")) {
                        /* Ignore Delivery report errors after topic
                         * has been deleted and eventually re-created,
                         * we rely on the consumer to verify that
                         * messages are produced. */
                        test_curr->ignore_dr_err = rd_true;
                }

                err = rd_kafka_producev(
                    producer, RD_KAFKA_V_TOPIC(topic),
                    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                    RD_KAFKA_V_VALUE(value, strlen(value)), RD_KAFKA_V_END);

                if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
                    err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
                        TEST_SAY("Produce failed (expectedly): %s\n",
                                 rd_kafka_err2name(err));
                else
                        TEST_ASSERT(!err, "producev() failed: %s",
                                    rd_kafka_err2name(err));

                mtx_unlock(&value_mtx);

                rd_usleep(1000000 / msg_rate, NULL);

                rd_kafka_poll(producer, 0);
        }

        if (rd_kafka_flush(producer, 5000)) {
                TEST_WARN("Failed to flush all message(s), %d remain\n",
                          rd_kafka_outq_len(producer));
                /* Purge the messages to see which partition they were for */
                rd_kafka_purge(producer, RD_KAFKA_PURGE_F_QUEUE |
                                             RD_KAFKA_PURGE_F_INFLIGHT);
                rd_kafka_flush(producer, 5000);
                TEST_SAY("%d message(s) in queue after purge\n",
                         rd_kafka_outq_len(producer));

                ret = 1; /* Fail test from main thread */
        }

        rd_kafka_destroy(producer);

        return ret;
}


/**
 * @brief Expect at least \p cnt messages with value matching \p exp_value,
 *        else fail the current test.
 */
static void
expect_messages(rd_kafka_t *consumer, int cnt, const char *exp_value) {
        int match_cnt = 0, other_cnt = 0, err_cnt = 0;
        size_t exp_len = strlen(exp_value);

        TEST_SAY("Expecting >= %d messages with value \"%s\"...\n", cnt,
                 exp_value);

        while (match_cnt < cnt) {
                rd_kafka_message_t *rkmessage;

                rkmessage = rd_kafka_consumer_poll(consumer, 1000);
                if (!rkmessage)
                        continue;

                if (rkmessage->err) {
                        TEST_SAY("Consume error: %s\n",
                                 rd_kafka_message_errstr(rkmessage));
                        err_cnt++;
                } else if (rkmessage->len == exp_len &&
                           !memcmp(rkmessage->payload, exp_value, exp_len)) {
                        match_cnt++;
                } else {
                        TEST_SAYL(3,
                                  "Received \"%.*s\", expected \"%s\": "
                                  "ignored\n",
                                  (int)rkmessage->len,
                                  (const char *)rkmessage->payload, exp_value);
                        other_cnt++;
                }

                rd_kafka_message_destroy(rkmessage);
        }

        TEST_SAY(
            "Consumed %d messages matching \"%s\", "
            "ignored %d others, saw %d error(s)\n",
            match_cnt, exp_value, other_cnt, err_cnt);
}


/**
 * @brief Test topic create + delete + create with first topic having
 *        \p part_cnt_1 partitions and second topic having \p part_cnt_2 .
 */
static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {
        rd_kafka_t *consumer;
        thrd_t producer_thread;
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);
        int ret           = 0;

        TEST_SAY(_C_MAG
                 "[ Test topic create(%d parts)+delete+create(%d parts) ]\n",
                 part_cnt_1, part_cnt_2);

        consumer = test_create_consumer(topic, NULL, NULL, NULL);

        /* Create topic */
        test_create_topic(consumer, topic, part_cnt_1, 3);

        /* Start consumer */
        test_consumer_subscribe(consumer, topic);
        test_consumer_wait_assignment(consumer, rd_true);

        mtx_lock(&value_mtx);
        value = "before";
        mtx_unlock(&value_mtx);

        /* Create producer thread */
        if (thrd_create(&producer_thread, run_producer, (void *)topic) !=
            thrd_success)
                TEST_FAIL("thrd_create failed");

        /* Consume messages for 5s */
        expect_messages(consumer, msg_rate * 5, value);

        /* Delete topic */
        mtx_lock(&value_mtx);
        value = "during";
        mtx_unlock(&value_mtx);

        test_delete_topic(consumer, topic);
        rd_sleep(5);

        /* Re-create topic */
        test_create_topic(consumer, topic, part_cnt_2, 3);

        mtx_lock(&value_mtx);
        value = "after";
        mtx_unlock(&value_mtx);

        /* Consume for 5 more seconds, should see new messages */
        expect_messages(consumer, msg_rate * 5, value);

        rd_kafka_destroy(consumer);

        /* Wait for producer to exit */
        mtx_lock(&value_mtx);
        value = "end";
        mtx_unlock(&value_mtx);

        if (thrd_join(producer_thread, &ret) != thrd_success || ret != 0)
                TEST_FAIL("Producer failed: see previous errors");

        TEST_SAY(_C_GRN
                 "[ Test topic create(%d parts)+delete+create(%d parts): "
                 "PASS ]\n",
                 part_cnt_1, part_cnt_2);
}


int main_0107_topic_recreate(int argc, char **argv) {
        this_test = test_curr; /* Need to expose current test struct (in TLS)
                                * to producer thread. */

        this_test->is_fatal_cb = is_error_fatal;

        mtx_init(&value_mtx, mtx_plain);

        test_conf_init(NULL, NULL, 60);

        do_test_create_delete_create(10, 3);
        do_test_create_delete_create(3, 6);

        return 0;
}
