/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2016-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */


/**
 * KafkaConsumer: regex topic subscriptions
 */



struct expect {
        char *name;         /* sub-test name */
        const char *sub[4]; /* subscriptions */
        const char *exp[4]; /* expected topics */
        int exp_err;        /* expected error from subscribe() */
        int stat[4];        /* per exp status */
        int fails;
        enum { _EXP_NONE,
               _EXP_FAIL,
               _EXP_OK,
               _EXP_ASSIGN,
               _EXP_REVOKE,
               _EXP_ASSIGNED,
               _EXP_REVOKED,
        } result;
};

static struct expect *exp_curr;

static uint64_t testid;

static void expect_match(struct expect *exp,
                         const rd_kafka_topic_partition_list_t *parts) {
        int i;
        int e     = 0;
        int fails = 0;

        memset(exp->stat, 0, sizeof(exp->stat));

        for (i = 0; i < parts->cnt; i++) {
                int found = 0;
                e         = 0;
                while (exp->exp[e]) {
                        if (!strcmp(parts->elems[i].topic, exp->exp[e])) {
                                exp->stat[e]++;
                                found++;
                        }
                        e++;
                }

                if (!found) {
                        TEST_WARN("%s: got unexpected topic match: %s\n",
                                  exp->name, parts->elems[i].topic);
                        fails++;
                }
        }


        e = 0;
        while (exp->exp[e]) {
                if (!exp->stat[e]) {
                        TEST_WARN(
                            "%s: expected topic not "
                            "found in assignment: %s\n",
                            exp->name, exp->exp[e]);
                        fails++;
                } else {
                        TEST_SAY("%s: expected topic %s seen in assignment\n",
                                 exp->name, exp->exp[e]);
                }
                e++;
        }

        exp->fails += fails;
        if (fails) {
                TEST_WARN("%s: see %d previous failures\n", exp->name, fails);
                exp->result = _EXP_FAIL;
        } else {
                TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name);
                exp->result = _EXP_OK;
        }
}

static void rebalance_cb(rd_kafka_t *rk,
                         rd_kafka_resp_err_t err,
                         rd_kafka_topic_partition_list_t *parts,
                         void *opaque) {
        struct expect *exp = exp_curr;

        TEST_ASSERT(exp_curr, "exp_curr not set");

        TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
                 rd_kafka_err2str(err), parts->cnt);
        test_print_partition_list(parts);

        switch (err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                /* Check that provided partitions match our expectations */
                if (exp->result != _EXP_ASSIGN) {
                        TEST_WARN(
                            "%s: rebalance called while expecting %d: "
                            "too many or undesired assignment(s?\n",
                            exp->name, exp->result);
                }
                expect_match(exp, parts);
                test_consumer_assign("rebalance", rk, parts);
                exp->result = _EXP_ASSIGNED;
                break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                if (exp->result != _EXP_REVOKE) {
                        TEST_WARN(
                            "%s: rebalance called while expecting %d: "
                            "too many or undesired assignment(s?\n",
                            exp->name, exp->result);
                }

                test_consumer_unassign("rebalance", rk);
                exp->result = _EXP_REVOKED;
                break;

        default:
                TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
        }
}


/**
 * @brief Poll the consumer once.
 */
static void consumer_poll_once(rd_kafka_t *rk) {
        rd_kafka_message_t *rkmessage;

        rkmessage = rd_kafka_consumer_poll(rk, 1000);
        if (!rkmessage)
                return;

        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                TEST_SAY("%s [%" PRId32
                         "] reached EOF at "
                         "offset %" PRId64 "\n",
                         rd_kafka_topic_name(rkmessage->rkt),
                         rkmessage->partition, rkmessage->offset);

        } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
                /* Test segfault associated with this call is solved */
                int32_t leader_epoch = rd_kafka_message_leader_epoch(rkmessage);
                TEST_ASSERT(leader_epoch == -1,
                            "rd_kafka_message_leader_epoch should be -1"
                            ", got %" PRId32,
                            leader_epoch);

                if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST"))
                        TEST_SAY("%s: %s: error is expected for this topic\n",
                                 rd_kafka_topic_name(rkmessage->rkt),
                                 rd_kafka_message_errstr(rkmessage));
                else
                        TEST_FAIL(
                            "%s [%" PRId32 "] error (offset %" PRId64 "): %s",
                            rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
                                           : "(no-topic)",
                            rkmessage->partition, rkmessage->offset,
                            rd_kafka_message_errstr(rkmessage));
        }

        rd_kafka_message_destroy(rkmessage);
}



static int test_subscribe(rd_kafka_t *rk, struct expect *exp) {
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *tlist;
        int i;
        test_timing_t t_sub, t_assign, t_unsub;

        exp_curr = exp;

        test_timeout_set((test_session_timeout_ms / 1000) * 3);

        tlist = rd_kafka_topic_partition_list_new(4);
        TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name);
        i = 0;
        TEST_SAY("Topic subscription:\n");
        while (exp->sub[i]) {
                TEST_SAY("%s:  %s\n", exp->name, exp->sub[i]);
                rd_kafka_topic_partition_list_add(tlist, exp->sub[i],
                                                  RD_KAFKA_PARTITION_UA);
                i++;
        }

        /* Subscribe */
        TIMING_START(&t_sub, "subscribe");
        err = rd_kafka_subscribe(rk, tlist);
        TIMING_STOP(&t_sub);
        TEST_ASSERT(err == exp->exp_err, "subscribe() failed: %s (expected %s)",
                    rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err));

        if (exp->exp[0]) {
                /* Wait for assignment, actual messages are ignored. */
                exp->result = _EXP_ASSIGN;
                TEST_SAY("%s: waiting for assignment\n", exp->name);
                TIMING_START(&t_assign, "assignment");
                while (exp->result == _EXP_ASSIGN)
                        consumer_poll_once(rk);
                TIMING_STOP(&t_assign);
                TEST_ASSERT(exp->result == _EXP_ASSIGNED,
                            "got %d instead of assignment", exp->result);

        } else {
                /* Not expecting any assignment */
                int64_t ts_end = test_clock() + 5000;
                exp->result    = _EXP_NONE; /* Not expecting a rebalance */
                while (exp->result == _EXP_NONE && test_clock() < ts_end)
                        consumer_poll_once(rk);
                TEST_ASSERT(exp->result == _EXP_NONE);
        }

        /* Unsubscribe */
        TIMING_START(&t_unsub, "unsubscribe");
        err = rd_kafka_unsubscribe(rk);
        TIMING_STOP(&t_unsub);
        TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err));

        rd_kafka_topic_partition_list_destroy(tlist);

        if (exp->exp[0]) {
                /* Wait for revoke, actual messages are ignored. */
                TEST_SAY("%s: waiting for revoke\n", exp->name);
                exp->result = _EXP_REVOKE;
                TIMING_START(&t_assign, "revoke");
                while (exp->result != _EXP_REVOKED)
                        consumer_poll_once(rk);
                TIMING_STOP(&t_assign);
                TEST_ASSERT(exp->result == _EXP_REVOKED,
                            "got %d instead of revoke", exp->result);
        } else {
                /* Not expecting any revoke */
                int64_t ts_end = test_clock() + 5000;
                exp->result    = _EXP_NONE; /* Not expecting a rebalance */
                while (exp->result == _EXP_NONE && test_clock() < ts_end)
                        consumer_poll_once(rk);
                TEST_ASSERT(exp->result == _EXP_NONE);
        }

        TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name,
                 exp->fails);

        return exp->fails;
}


static int do_test(const char *assignor) {
        static char topics[3][128];
        static char nonexist_topic[128];
        const int topic_cnt = 3;
        rd_kafka_t *rk;
        const int msgcnt = 10;
        int i;
        char groupid[64];
        int fails = 0;
        rd_kafka_conf_t *conf;

        if (!test_check_builtin("regex")) {
                TEST_SKIP("regex support not built in\n");
                return 0;
        }

        testid = test_id_generate();
        test_str_id_generate(groupid, sizeof(groupid));

        rd_snprintf(topics[0], sizeof(topics[0]), "%s_%s",
                    test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0),
                    groupid);
        rd_snprintf(topics[1], sizeof(topics[1]), "%s_%s",
                    test_mk_topic_name("regex_subscribe_topic_0002_dup", 0),
                    groupid);
        rd_snprintf(topics[2], sizeof(topics[2]), "%s_%s",
                    test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0),
                    groupid);

        /* To avoid auto topic creation to kick in we use
         * an invalid topic name. */
        rd_snprintf(
            nonexist_topic, sizeof(nonexist_topic), "%s_%s",
            test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!", 0),
            groupid);

        /* Produce messages to topics to ensure creation. */
        for (i = 0; i < topic_cnt; i++)
                test_produce_msgs_easy(topics[i], testid, RD_KAFKA_PARTITION_UA,
                                       msgcnt);

        test_conf_init(&conf, NULL, 20);
        test_conf_set(conf, "partition.assignment.strategy", assignor);
        /* Speed up propagation of new topics */
        test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
        test_conf_set(conf, "allow.auto.create.topics", "true");

        /* Create a single consumer to handle all subscriptions.
         * Has the nice side affect of testing multiple subscriptions. */
        rk = test_create_consumer(groupid, rebalance_cb, conf, NULL);

        /*
         * Test cases
         */
        {
                struct expect expect = {.name = rd_strdup(tsprintf(
                                            "%s: no regexps (0&1)", assignor)),
                                        .sub  = {topics[0], topics[1], NULL},
                                        .exp  = {topics[0], topics[1], NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
        }

        {
                struct expect expect = {.name =
                                            rd_strdup(tsprintf("%s: no regexps "
                                                               "(no matches)",
                                                               assignor)),
                                        .sub = {nonexist_topic, NULL},
                                        .exp = {NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
        }

        {
                struct expect expect = {
                    .name = rd_strdup(tsprintf("%s: regex all", assignor)),
                    .sub  = {rd_strdup(tsprintf("^.*_%s", groupid)), NULL},
                    .exp  = {topics[0], topics[1], topics[2], NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
                rd_free((void *)expect.sub[0]);
        }

        {
                struct expect expect = {
                    .name = rd_strdup(tsprintf("%s: regex 0&1", assignor)),
                    .sub  = {rd_strdup(tsprintf(
                                "^.*[tToOpPiIcC]_0+[12]_[^_]+_%s", groupid)),
                            NULL},
                    .exp  = {topics[0], topics[1], NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
                rd_free((void *)expect.sub[0]);
        }

        {
                struct expect expect = {
                    .name = rd_strdup(tsprintf("%s: regex 2", assignor)),
                    .sub  = {rd_strdup(
                                tsprintf("^.*TOOTHPIC_000._._%s", groupid)),
                            NULL},
                    .exp  = {topics[2], NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
                rd_free((void *)expect.sub[0]);
        }

        {
                struct expect expect = {
                    .name = rd_strdup(tsprintf("%s: regex 2 and "
                                               "nonexistent(not seen)",
                                               assignor)),
                    .sub  = {rd_strdup(tsprintf("^.*_000[34]_..?_%s", groupid)),
                            NULL},
                    .exp  = {topics[2], NULL}};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
                rd_free((void *)expect.sub[0]);
        }

        {
                struct expect expect = {
                    .name = rd_strdup(
                        tsprintf("%s: broken regex (no matches)", assignor)),
                    .sub     = {"^.*[0", NULL},
                    .exp     = {NULL},
                    .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG};

                fails += test_subscribe(rk, &expect);
                rd_free(expect.name);
        }


        test_consumer_close(rk);

        rd_kafka_destroy(rk);

        if (fails)
                TEST_FAIL("See %d previous failures", fails);

        return 0;
}


int main_0033_regex_subscribe(int argc, char **argv) {
        do_test("range");
        do_test("roundrobin");
        return 0;
}


/**
 * @brief Subscription API tests that dont require a broker
 */
int main_0033_regex_subscribe_local(int argc, char **argv) {
        rd_kafka_topic_partition_list_t *valids, *invalids, *none, *empty,
            *alot;
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        rd_kafka_resp_err_t err;
        char errstr[256];
        int i;

        valids   = rd_kafka_topic_partition_list_new(0);
        invalids = rd_kafka_topic_partition_list_new(100);
        none     = rd_kafka_topic_partition_list_new(1000);
        empty    = rd_kafka_topic_partition_list_new(5);
        alot     = rd_kafka_topic_partition_list_new(1);

        rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0);
        rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0);
        rd_kafka_topic_partition_list_add(valids, "^another_one$", 55);

        rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0);
        rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0);
        rd_kafka_topic_partition_list_add(invalids, "^a[b", 99);

        rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0);
        rd_kafka_topic_partition_list_add(empty, "", 0);
        rd_kafka_topic_partition_list_add(empty, "^ok", 0);

        for (i = 0; i < 10000; i++) {
                char topic[32];
                rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i);
                rd_kafka_topic_partition_list_add(alot, topic, i);
        }

        conf = rd_kafka_conf_new();
        test_conf_set(conf, "group.id", "group");
        test_conf_set(conf, "client.id", test_curr->name);

        rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
        if (!rk)
                TEST_FAIL("Failed to create consumer: %s", errstr);

        err = rd_kafka_subscribe(rk, valids);
        TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err));

        err = rd_kafka_subscribe(rk, invalids);
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
                    "invalids failed with wrong return: %s",
                    rd_kafka_err2str(err));

        err = rd_kafka_subscribe(rk, none);
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
                    "none failed with wrong return: %s", rd_kafka_err2str(err));

        err = rd_kafka_subscribe(rk, empty);
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
                    "empty failed with wrong return: %s",
                    rd_kafka_err2str(err));

        err = rd_kafka_subscribe(rk, alot);
        TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err));

        rd_kafka_consumer_close(rk);
        rd_kafka_destroy(rk);

        rd_kafka_topic_partition_list_destroy(valids);
        rd_kafka_topic_partition_list_destroy(invalids);
        rd_kafka_topic_partition_list_destroy(none);
        rd_kafka_topic_partition_list_destroy(empty);
        rd_kafka_topic_partition_list_destroy(alot);

        return 0;
}
