/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"

#include <stdarg.h>

/**
 * Various partitioner tests
 *
 * - Issue #797 - deadlock on failed partitioning
 * - Verify that partitioning works across partitioners.
 */

int32_t my_invalid_partitioner(const rd_kafka_topic_t *rkt,
                               const void *keydata,
                               size_t keylen,
                               int32_t partition_cnt,
                               void *rkt_opaque,
                               void *msg_opaque) {
        int32_t partition = partition_cnt + 10;
        TEST_SAYL(4, "partition \"%.*s\" to %" PRId32 "\n", (int)keylen,
                  (const char *)keydata, partition);
        return partition;
}


/* FIXME: This doesn't seem to trigger the bug in #797.
 *        Still a useful test though. */
static void do_test_failed_partitioning(void) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        rd_kafka_topic_t *rkt;
        rd_kafka_topic_conf_t *tconf;
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);
        int i;
        int msgcnt = test_quick ? 100 : 10000;

        test_conf_init(&conf, &tconf, 0);
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
        test_conf_set(conf, "sticky.partitioning.linger.ms", "0");
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        rd_kafka_topic_conf_set_partitioner_cb(tconf, my_invalid_partitioner);
        test_topic_conf_set(tconf, "message.timeout.ms",
                            tsprintf("%d", tmout_multip(10000)));
        rkt = rd_kafka_topic_new(rk, topic, tconf);
        TEST_ASSERT(rkt != NULL, "%s", rd_kafka_err2str(rd_kafka_last_error()));

        /* Produce some messages (to p 0) to create topic */
        test_produce_msgs(rk, rkt, 0, 0, 0, 2, NULL, 0);

        /* Now use partitioner */
        for (i = 0; i < msgcnt; i++) {
                rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
                if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, 0, NULL, 0,
                                     NULL, 0, NULL) == -1)
                        err = rd_kafka_last_error();
                if (err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
                        TEST_FAIL(
                            "produce(): "
                            "Expected UNKNOWN_PARTITION, got %s\n",
                            rd_kafka_err2str(err));
        }
        test_flush(rk, 5000);

        rd_kafka_topic_destroy(rkt);
        rd_kafka_destroy(rk);
}


static void part_dr_msg_cb(rd_kafka_t *rk,
                           const rd_kafka_message_t *rkmessage,
                           void *opaque) {
        int32_t *partp = rkmessage->_private;
        int *remainsp  = opaque;

        if (rkmessage->err) {
                /* Will fail later */
                TEST_WARN("Delivery failed: %s\n",
                          rd_kafka_err2str(rkmessage->err));
                *partp = -1;
        } else {
                *partp = rkmessage->partition;
        }

        (*remainsp)--;
}

/**
 * @brief Test single \p partitioner
 */
static void do_test_partitioner(const char *topic,
                                const char *partitioner,
                                int msgcnt,
                                const char **keys,
                                const int32_t *exp_part) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        int i;
        int32_t *parts;
        int remains = msgcnt;
        int randcnt = 0;
        int fails   = 0;

        TEST_SAY(_C_MAG "Test partitioner \"%s\"\n", partitioner);

        test_conf_init(&conf, NULL, 30);
        rd_kafka_conf_set_opaque(conf, &remains);
        rd_kafka_conf_set_dr_msg_cb(conf, part_dr_msg_cb);
        test_conf_set(conf, "partitioner", partitioner);
        test_conf_set(conf, "sticky.partitioning.linger.ms", "0");

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        parts = malloc(msgcnt * sizeof(*parts));
        for (i = 0; i < msgcnt; i++)
                parts[i] = -1;

        /*
         * Produce messages
         */
        for (i = 0; i < msgcnt; i++) {
                rd_kafka_resp_err_t err;

                err = rd_kafka_producev(
                    rk, RD_KAFKA_V_TOPIC(topic),
                    RD_KAFKA_V_KEY(keys[i], keys[i] ? strlen(keys[i]) : 0),
                    RD_KAFKA_V_OPAQUE(&parts[i]), RD_KAFKA_V_END);
                TEST_ASSERT(!err, "producev() failed: %s",
                            rd_kafka_err2str(err));

                randcnt += exp_part[i] == -1;
        }

        rd_kafka_flush(rk, tmout_multip(10000));

        TEST_ASSERT(remains == 0, "Expected remains=%d, not %d for %d messages",
                    0, remains, msgcnt);

        /*
         * Verify produced partitions to expected partitions.
         */

        /* First look for produce failures */
        for (i = 0; i < msgcnt; i++) {
                if (parts[i] == -1) {
                        TEST_WARN("Message #%d (exp part %" PRId32
                                  ") "
                                  "was not successfully produced\n",
                                  i, exp_part[i]);
                        fails++;
                }
        }

        TEST_ASSERT(!fails, "See %d previous failure(s)", fails);


        if (randcnt == msgcnt) {
                /* If all expected partitions are random make sure
                 * the produced partitions have some form of
                 * random distribution */
                int32_t last_part = parts[0];
                int samecnt       = 0;

                for (i = 0; i < msgcnt; i++) {
                        samecnt += parts[i] == last_part;
                        last_part = parts[i];
                }

                TEST_ASSERT(samecnt < msgcnt,
                            "No random distribution, all on partition %" PRId32,
                            last_part);
        } else {
                for (i = 0; i < msgcnt; i++) {
                        if (exp_part[i] != -1 && parts[i] != exp_part[i]) {
                                TEST_WARN(
                                    "Message #%d expected partition "
                                    "%" PRId32 " but got %" PRId32 ": %s\n",
                                    i, exp_part[i], parts[i], keys[i]);
                                fails++;
                        }
                }


                TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
        }

        free(parts);

        rd_kafka_destroy(rk);

        TEST_SAY(_C_GRN "Test partitioner \"%s\": PASS\n", partitioner);
}

extern uint32_t rd_crc32(const char *, size_t);

/**
 * @brief Test all builtin partitioners
 */
static void do_test_partitioners(void) {
        int part_cnt = test_quick ? 7 : 17;
#define _MSG_CNT 5
        const char *unaligned = "123456";
        /* Message keys */
        const char *keys[_MSG_CNT] = {
            NULL,
            "",  // empty
            unaligned + 1,
            "this is another string with more length to it perhaps", "hejsan"};
        struct {
                const char *partitioner;
                /* Expected partition per message (see keys above) */
                int32_t exp_part[_MSG_CNT];
        } ptest[] = {{"random", {-1, -1, -1, -1, -1}},
                     {"consistent",
                      {/* These constants were acquired using
                        * the 'crc32' command on OSX */
                       0x0 % part_cnt, 0x0 % part_cnt, 0xb1b451d7 % part_cnt,
                       0xb0150df7 % part_cnt, 0xd077037e % part_cnt}},
                     {"consistent_random",
                      {-1, -1, 0xb1b451d7 % part_cnt, 0xb0150df7 % part_cnt,
                       0xd077037e % part_cnt}},
                     {"murmur2",
                      {/* .. using tests/java/Murmur2Cli */
                       0x106e08d9 % part_cnt, 0x106e08d9 % part_cnt,
                       0x058d780f % part_cnt, 0x4f7703da % part_cnt,
                       0x5ec19395 % part_cnt}},
                     {"murmur2_random",
                      {-1, 0x106e08d9 % part_cnt, 0x058d780f % part_cnt,
                       0x4f7703da % part_cnt, 0x5ec19395 % part_cnt}},
                     {"fnv1a",
                      {/* .. using https://play.golang.org/p/hRkA4xtYyJ6 */
                       0x7ee3623b % part_cnt, 0x7ee3623b % part_cnt,
                       0x27e6f469 % part_cnt, 0x155e3e5f % part_cnt,
                       0x17b1e27a % part_cnt}},
                     {"fnv1a_random",
                      {-1, 0x7ee3623b % part_cnt, 0x27e6f469 % part_cnt,
                       0x155e3e5f % part_cnt, 0x17b1e27a % part_cnt}},
                     {NULL}};
        int pi;
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);

        test_create_topic(NULL, topic, part_cnt, 1);

        for (pi = 0; ptest[pi].partitioner; pi++) {
                do_test_partitioner(topic, ptest[pi].partitioner, _MSG_CNT,
                                    keys, ptest[pi].exp_part);
        }
}

int main_0048_partitioner(int argc, char **argv) {
        if (test_can_create_topics(0))
                do_test_partitioners();
        do_test_failed_partitioning();
        return 0;
}
