/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2018-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * @name Test rd_kafka_destroy_flags()
 */


#include "test.h"


static RD_TLS int rebalance_cnt = 0;

static void destroy_flags_rebalance_cb(rd_kafka_t *rk,
                                       rd_kafka_resp_err_t err,
                                       rd_kafka_topic_partition_list_t *parts,
                                       void *opaque) {
        rebalance_cnt++;

        TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
                 rd_kafka_err2str(err), parts->cnt);

        switch (err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                test_consumer_assign("rebalance", rk, parts);
                break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                test_consumer_unassign("rebalance", rk);
                break;

        default:
                TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
        }
}

struct df_args {
        rd_kafka_type_t client_type;
        int produce_cnt;
        int consumer_subscribe;
        int consumer_unsubscribe;
};

static void do_test_destroy_flags(const char *topic,
                                  int destroy_flags,
                                  int local_mode,
                                  const struct df_args *args) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        test_timing_t t_destroy;

        TEST_SAY(_C_MAG
                 "[ test destroy_flags 0x%x for client_type %d, "
                 "produce_cnt %d, subscribe %d, unsubscribe %d, "
                 "%s mode ]\n" _C_CLR,
                 destroy_flags, args->client_type, args->produce_cnt,
                 args->consumer_subscribe, args->consumer_unsubscribe,
                 local_mode ? "local" : "broker");

        test_conf_init(&conf, NULL, 20);

        if (local_mode)
                test_conf_set(conf, "bootstrap.servers", "");

        if (args->client_type == RD_KAFKA_PRODUCER) {

                rk = test_create_handle(args->client_type, conf);

                if (args->produce_cnt > 0) {
                        rd_kafka_topic_t *rkt;
                        int msgcounter = 0;

                        rkt = test_create_producer_topic(rk, topic, NULL);
                        test_produce_msgs_nowait(
                            rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0,
                            args->produce_cnt, NULL, 100, 0, &msgcounter);
                        rd_kafka_topic_destroy(rkt);
                }

        } else {
                int i;

                TEST_ASSERT(args->client_type == RD_KAFKA_CONSUMER);

                rk = test_create_consumer(topic, destroy_flags_rebalance_cb,
                                          conf, NULL);

                if (args->consumer_subscribe) {
                        test_consumer_subscribe(rk, topic);

                        if (!local_mode) {
                                TEST_SAY("Waiting for assignment\n");
                                while (rebalance_cnt == 0)
                                        test_consumer_poll_once(rk, NULL, 1000);
                        }
                }

                for (i = 0; i < 5; i++)
                        test_consumer_poll_once(rk, NULL, 100);

                if (args->consumer_unsubscribe) {
                        /* Test that calling rd_kafka_unsubscribe immediately
                         * prior to rd_kafka_destroy_flags doesn't cause the
                         * latter to hang. */
                        TEST_SAY(_C_YEL "Calling rd_kafka_unsubscribe\n"_C_CLR);
                        rd_kafka_unsubscribe(rk);
                }
        }

        rebalance_cnt = 0;
        TEST_SAY(_C_YEL "Calling rd_kafka_destroy_flags(0x%x)\n" _C_CLR,
                 destroy_flags);
        TIMING_START(&t_destroy, "rd_kafka_destroy_flags(0x%x)", destroy_flags);
        rd_kafka_destroy_flags(rk, destroy_flags);
        TIMING_STOP(&t_destroy);

        if (destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)
                TIMING_ASSERT_LATER(&t_destroy, 0, 200);
        else
                TIMING_ASSERT_LATER(&t_destroy, 0, 1000);

        if (args->consumer_subscribe &&
            !(destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)) {
                if (!local_mode)
                        TEST_ASSERT(rebalance_cnt > 0,
                                    "expected final rebalance callback");
        } else
                TEST_ASSERT(rebalance_cnt == 0,
                            "expected no rebalance callbacks, got %d",
                            rebalance_cnt);

        TEST_SAY(_C_GRN
                 "[ test destroy_flags 0x%x for client_type %d, "
                 "produce_cnt %d, subscribe %d, unsubscribe %d, "
                 "%s mode: PASS ]\n" _C_CLR,
                 destroy_flags, args->client_type, args->produce_cnt,
                 args->consumer_subscribe, args->consumer_unsubscribe,
                 local_mode ? "local" : "broker");
}


/**
 * @brief Destroy with flags
 */
static void destroy_flags(int local_mode) {
        const struct df_args args[] = {
            {RD_KAFKA_PRODUCER, 0, 0, 0},
            {RD_KAFKA_PRODUCER, test_quick ? 100 : 10000, 0, 0},
            {RD_KAFKA_CONSUMER, 0, 1, 0},
            {RD_KAFKA_CONSUMER, 0, 1, 1},
            {RD_KAFKA_CONSUMER, 0, 0, 0}};
        const int flag_combos[] = {0, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE};
        const char *topic       = test_mk_topic_name(__FUNCTION__, 1);
        const rd_bool_t can_subscribe =
            test_broker_version >= TEST_BRKVER(0, 9, 0, 0);
        int i, j;

        /* Create the topic to avoid not-yet-auto-created-topics being
         * subscribed to (and thus raising an error). */
        if (!local_mode) {
                test_create_topic(NULL, topic, 3, 1);
                test_wait_topic_exists(NULL, topic, 5000);
        }

        for (i = 0; i < (int)RD_ARRAYSIZE(args); i++) {
                for (j = 0; j < (int)RD_ARRAYSIZE(flag_combos); j++) {
                        if (!can_subscribe && (args[i].consumer_subscribe ||
                                               args[i].consumer_unsubscribe))
                                continue;
                        do_test_destroy_flags(topic, flag_combos[j], local_mode,
                                              &args[i]);
                }
        }
}



int main_0084_destroy_flags_local(int argc, char **argv) {
        destroy_flags(1 /*no brokers*/);
        return 0;
}

int main_0084_destroy_flags(int argc, char **argv) {
        destroy_flags(0 /*with brokers*/);
        return 0;
}
