/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2018-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Tests the queue callback IO event signalling.
 */


#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */


/**
 * @brief Thread safe event counter */
static struct {
        mtx_t lock;
        int count;
} event_receiver;

/**
 * @brief Event callback function. Check the opaque pointer and
 *        increase the count of received event. */
static void event_cb(rd_kafka_t *rk_p, void *opaque) {
        TEST_ASSERT(opaque == (void *)0x1234,
                    "Opaque pointer is not as expected (got: %p)", opaque);
        mtx_lock(&event_receiver.lock);
        event_receiver.count += 1;
        mtx_unlock(&event_receiver.lock);
}

/**
 * @brief Wait for one or more events to be received.
 *        Return 0 if no event was received within the timeout. */
static int wait_event_cb(int timeout_secs) {
        int event_count = 0;
        for (; timeout_secs >= 0; timeout_secs--) {
                mtx_lock(&event_receiver.lock);
                event_count          = event_receiver.count;
                event_receiver.count = 0;
                mtx_unlock(&event_receiver.lock);
                if (event_count > 0 || timeout_secs == 0)
                        return event_count;
                rd_sleep(1);
        }
        return 0;
}


int main_0083_cb_event(int argc, char **argv) {
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *tconf;
        rd_kafka_t *rk_p, *rk_c;
        const char *topic;
        rd_kafka_topic_t *rkt_p;
        rd_kafka_queue_t *queue;
        uint64_t testid;
        int msgcnt          = 100;
        int recvd           = 0;
        int wait_multiplier = 1;
        rd_kafka_resp_err_t err;
        enum { _NOPE, _YEP, _REBALANCE } expecting_io = _REBALANCE;
        int callback_event_count;
        rd_kafka_event_t *rkev;
        int eventcnt = 0;

        mtx_init(&event_receiver.lock, mtx_plain);

        testid = test_id_generate();
        topic  = test_mk_topic_name(__FUNCTION__, 1);

        rk_p  = test_create_producer();
        rkt_p = test_create_producer_topic(rk_p, topic, NULL);
        err   = test_auto_create_topic_rkt(rk_p, rkt_p, tmout_multip(5000));
        TEST_ASSERT(!err, "Topic auto creation failed: %s",
                    rd_kafka_err2str(err));

        test_conf_init(&conf, &tconf, 0);
        rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
        test_conf_set(conf, "session.timeout.ms", "6000");
        test_conf_set(conf, "enable.partition.eof", "false");
        /* Speed up propagation of new topics */
        test_conf_set(conf, "metadata.max.age.ms", "5000");
        test_topic_conf_set(tconf, "auto.offset.reset", "earliest");
        rk_c = test_create_consumer(topic, NULL, conf, tconf);

        queue = rd_kafka_queue_get_consumer(rk_c);

        test_consumer_subscribe(rk_c, topic);

        rd_kafka_queue_cb_event_enable(queue, event_cb, (void *)0x1234);

        /**
         * 1) Wait for rebalance event
         * 2) Wait 1 interval (1s) expecting no IO (nothing produced).
         * 3) Produce half the messages
         * 4) Expect CB
         * 5) Consume the available messages
         * 6) Wait 1 interval expecting no CB.
         * 7) Produce remaing half
         * 8) Expect CB
         * 9) Done.
         */
        while (recvd < msgcnt) {
                TEST_SAY("Waiting for event\n");
                callback_event_count = wait_event_cb(1 * wait_multiplier);
                TEST_ASSERT(callback_event_count <= 1,
                            "Event cb called %d times", callback_event_count);

                if (callback_event_count == 1) {
                        TEST_SAY("Events received: %d\n", callback_event_count);

                        while ((rkev = rd_kafka_queue_poll(queue, 0))) {
                                eventcnt++;
                                switch (rd_kafka_event_type(rkev)) {
                                case RD_KAFKA_EVENT_REBALANCE:
                                        TEST_SAY(
                                            "Got %s: %s\n",
                                            rd_kafka_event_name(rkev),
                                            rd_kafka_err2str(
                                                rd_kafka_event_error(rkev)));
                                        if (expecting_io != _REBALANCE)
                                                TEST_FAIL(
                                                    "Got Rebalance when "
                                                    "expecting message\n");
                                        if (rd_kafka_event_error(rkev) ==
                                            RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
                                                rd_kafka_assign(
                                                    rk_c,
                                                    rd_kafka_event_topic_partition_list(
                                                        rkev));
                                                expecting_io = _NOPE;
                                        } else
                                                rd_kafka_assign(rk_c, NULL);
                                        break;

                                case RD_KAFKA_EVENT_FETCH:
                                        if (expecting_io != _YEP)
                                                TEST_FAIL(
                                                    "Did not expect more "
                                                    "messages at %d/%d\n",
                                                    recvd, msgcnt);
                                        recvd++;
                                        if (recvd == (msgcnt / 2) ||
                                            recvd == msgcnt)
                                                expecting_io = _NOPE;
                                        break;

                                case RD_KAFKA_EVENT_ERROR:
                                        TEST_FAIL(
                                            "Error: %s\n",
                                            rd_kafka_event_error_string(rkev));
                                        break;

                                default:
                                        TEST_SAY("Ignoring event %s\n",
                                                 rd_kafka_event_name(rkev));
                                }

                                rd_kafka_event_destroy(rkev);
                        }
                        TEST_SAY("%d events, Consumed %d/%d messages\n",
                                 eventcnt, recvd, msgcnt);

                        wait_multiplier = 1;

                } else {
                        if (expecting_io == _REBALANCE) {
                                continue;
                        } else if (expecting_io == _YEP) {
                                TEST_FAIL(
                                    "Did not see expected IO after %d/%d "
                                    "msgs\n",
                                    recvd, msgcnt);
                        }

                        TEST_SAY("Event wait timeout (good)\n");
                        TEST_SAY("Got idle period, producing\n");
                        test_produce_msgs(rk_p, rkt_p, testid, 0, recvd,
                                          msgcnt / 2, NULL, 10);

                        expecting_io = _YEP;
                        /* When running slowly (e.g., valgrind) it might take
                         * some time before the first message is received
                         * after producing. */
                        wait_multiplier = 3;
                }
        }
        TEST_SAY("Done\n");

        rd_kafka_topic_destroy(rkt_p);
        rd_kafka_destroy(rk_p);

        rd_kafka_queue_destroy(queue);
        rd_kafka_consumer_close(rk_c);
        rd_kafka_destroy(rk_c);

        mtx_destroy(&event_receiver.lock);

        return 0;
}
