/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2016-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#include "rdkafka_int.h"
#include "rdkafka_feature.h"

#include <stdlib.h>

static const char *rd_kafka_feature_names[] = {"MsgVer1",
                                               "ApiVersion",
                                               "BrokerBalancedConsumer",
                                               "ThrottleTime",
                                               "Sasl",
                                               "SaslHandshake",
                                               "BrokerGroupCoordinator",
                                               "LZ4",
                                               "OffsetTime",
                                               "MsgVer2",
                                               "IdempotentProducer",
                                               "ZSTD",
                                               "SaslAuthReq",
                                               "UnitTest",
                                               NULL};


static const struct rd_kafka_feature_map {
        /* RD_KAFKA_FEATURE_... */
        int feature;

        /* Depends on the following ApiVersions overlapping with
         * what the broker supports: */
        struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];

} rd_kafka_feature_map[] = {
    /**
     * @brief List of features and the ApiVersions they depend on.
     *
     * The dependency list consists of the ApiKey followed by this
     * client's supported minimum and maximum API versions.
     * As long as this list and its versions overlaps with the
     * broker supported API versions the feature will be enabled.
     */
    {

        /* @brief >=0.10.0: Message.MagicByte version 1:
         * Relative offsets (KIP-31) and message timestamps (KIP-32). */
        .feature = RD_KAFKA_FEATURE_MSGVER1,
        .depends =
            {
                {RD_KAFKAP_Produce, 2, 2},
                {RD_KAFKAP_Fetch, 2, 2},
                {-1},
            },
    },
    {
        /* @brief >=0.11.0: Message.MagicByte version 2 */
        .feature = RD_KAFKA_FEATURE_MSGVER2,
        .depends =
            {
                {RD_KAFKAP_Produce, 3, 3},
                {RD_KAFKAP_Fetch, 4, 4},
                {-1},
            },
    },
    {
        /* @brief >=0.10.0: ApiVersionQuery support.
         * @remark This is a bit of chicken-and-egg problem but needs to be
         *         set by feature_check() to avoid the feature being cleared
         *         even when broker supports it. */
        .feature = RD_KAFKA_FEATURE_APIVERSION,
        .depends =
            {
                {RD_KAFKAP_ApiVersion, 0, 0},
                {-1},
            },
    },
    {
        /* @brief >=0.8.2.0: Broker-based Group coordinator */
        .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
        .depends =
            {
                {RD_KAFKAP_FindCoordinator, 0, 0},
                {-1},
            },
    },
    {
        /* @brief >=0.9.0: Broker-based balanced consumer groups. */
        .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
        .depends =
            {
                {RD_KAFKAP_FindCoordinator, 0, 0},
                {RD_KAFKAP_OffsetCommit, 1, 2},
                {RD_KAFKAP_OffsetFetch, 1, 1},
                {RD_KAFKAP_JoinGroup, 0, 0},
                {RD_KAFKAP_SyncGroup, 0, 0},
                {RD_KAFKAP_Heartbeat, 0, 0},
                {RD_KAFKAP_LeaveGroup, 0, 0},
                {-1},
            },
    },
    {
        /* @brief >=0.9.0: ThrottleTime */
        .feature = RD_KAFKA_FEATURE_THROTTLETIME,
        .depends =
            {
                {RD_KAFKAP_Produce, 1, 2},
                {RD_KAFKAP_Fetch, 1, 2},
                {-1},
            },

    },
    {
        /* @brief >=0.9.0: SASL (GSSAPI) authentication.
         * Since SASL is not using the Kafka protocol
         * we must use something else to map us to the
         * proper broker version support:
         * JoinGroup was released along with SASL in 0.9.0. */
        .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
        .depends =
            {
                {RD_KAFKAP_JoinGroup, 0, 0},
                {-1},
            },
    },
    {
        /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
         *                  to automatically support other mechanisms
         *                  than GSSAPI, such as PLAIN. */
        .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
        .depends =
            {
                {RD_KAFKAP_SaslHandshake, 0, 0},
                {-1},
            },
    },
    {
        /* @brief >=0.8.2: LZ4 compression.
         * Since LZ4 initially did not rely on a specific API
         * type or version (it does in >=0.10.0)
         * we must use something else to map us to the
         * proper broker version support:
         * GrooupCoordinator was released in 0.8.2 */
        .feature = RD_KAFKA_FEATURE_LZ4,
        .depends =
            {
                {RD_KAFKAP_FindCoordinator, 0, 0},
                {-1},
            },
    },
    {/* @brief >=0.10.1.0: Offset v1 (KIP-79)
      * Time-based offset requests */
     .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
     .depends =
         {
             {RD_KAFKAP_ListOffsets, 1, 1},
             {-1},
         }},
    {/* @brief >=0.11.0.0: Idempotent Producer*/
     .feature = RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER,
     .depends =
         {
             {RD_KAFKAP_InitProducerId, 0, 0},
             {-1},
         }},
    {
        /* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */
        .feature = RD_KAFKA_FEATURE_ZSTD,
        .depends =
            {
                {RD_KAFKAP_Produce, 7, 7},
                {RD_KAFKAP_Fetch, 10, 10},
                {-1},
            },
    },
    {
        /* @brief >=1.0.0: SaslAuthenticateRequest */
        .feature = RD_KAFKA_FEATURE_SASL_AUTH_REQ,
        .depends =
            {
                {RD_KAFKAP_SaslHandshake, 1, 1},
                {RD_KAFKAP_SaslAuthenticate, 0, 1},
                {-1},
            },
    },
    {.feature = 0}, /* sentinel */
};



/**
 * @brief In absence of KIP-35 support in earlier broker versions we provide
 * hardcoded lists that corresponds to older broker versions.
 */

/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
    {RD_KAFKAP_ApiVersion, 0, 0}};


/* =~ 0.9.0 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
    {RD_KAFKAP_Produce, 0, 1},         {RD_KAFKAP_Fetch, 0, 1},
    {RD_KAFKAP_ListOffsets, 0, 0},     {RD_KAFKAP_Metadata, 0, 0},
    {RD_KAFKAP_OffsetCommit, 0, 2},    {RD_KAFKAP_OffsetFetch, 0, 1},
    {RD_KAFKAP_FindCoordinator, 0, 0}, {RD_KAFKAP_JoinGroup, 0, 0},
    {RD_KAFKAP_Heartbeat, 0, 0},       {RD_KAFKAP_LeaveGroup, 0, 0},
    {RD_KAFKAP_SyncGroup, 0, 0},       {RD_KAFKAP_DescribeGroups, 0, 0},
    {RD_KAFKAP_ListGroups, 0, 0}};

/* =~ 0.8.2 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
    {RD_KAFKAP_Produce, 0, 0},        {RD_KAFKAP_Fetch, 0, 0},
    {RD_KAFKAP_ListOffsets, 0, 0},    {RD_KAFKAP_Metadata, 0, 0},
    {RD_KAFKAP_OffsetCommit, 0, 1},   {RD_KAFKAP_OffsetFetch, 0, 1},
    {RD_KAFKAP_FindCoordinator, 0, 0}};

/* =~ 0.8.1 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
    {RD_KAFKAP_Produce, 0, 0},      {RD_KAFKAP_Fetch, 0, 0},
    {RD_KAFKAP_ListOffsets, 0, 0},  {RD_KAFKAP_Metadata, 0, 0},
    {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 0}};

/* =~ 0.8.0 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
    {RD_KAFKAP_Produce, 0, 0},
    {RD_KAFKAP_Fetch, 0, 0},
    {RD_KAFKAP_ListOffsets, 0, 0},
    {RD_KAFKAP_Metadata, 0, 0}};


/**
 * @brief Returns the ApiVersion list for legacy broker versions that do not
 *        support the ApiVersionQuery request. E.g., brokers <0.10.0.
 *
 * @param broker_version Broker version to match (longest prefix matching).
 * @param use_default If no match is found return the default APIs (but return
 * 0).
 *
 * @returns 1 if \p broker_version was recognized: \p *apisp will point to
 *          the ApiVersion list and *api_cntp will be set to its element count.
 *          0 if \p broker_version was not recognized: \p *apisp remains
 * unchanged.
 *
 */
int rd_kafka_get_legacy_ApiVersions(const char *broker_version,
                                    struct rd_kafka_ApiVersion **apisp,
                                    size_t *api_cntp,
                                    const char *fallback) {
        static const struct {
                const char *pfx;
                struct rd_kafka_ApiVersion *apis;
                size_t api_cnt;
        } vermap[] = {
#define _VERMAP(PFX, APIS) {PFX, APIS, RD_ARRAYSIZE(APIS)}
            _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
            _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
            _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
            _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
            {"0.7.", NULL}, /* Unsupported */
            {"0.6.", NULL}, /* Unsupported */
            _VERMAP("", rd_kafka_ApiVersion_Queryable),
            {NULL}};
        int i;
        int fallback_i = -1;
        int ret        = 0;

        *apisp    = NULL;
        *api_cntp = 0;

        for (i = 0; vermap[i].pfx; i++) {
                if (!strncmp(vermap[i].pfx, broker_version,
                             strlen(vermap[i].pfx))) {
                        if (!vermap[i].apis)
                                return 0;
                        *apisp    = vermap[i].apis;
                        *api_cntp = vermap[i].api_cnt;
                        ret       = 1;
                        break;
                } else if (fallback && !strcmp(vermap[i].pfx, fallback))
                        fallback_i = i;
        }

        if (!*apisp && fallback) {
                rd_kafka_assert(NULL, fallback_i != -1);
                *apisp    = vermap[fallback_i].apis;
                *api_cntp = vermap[fallback_i].api_cnt;
        }

        return ret;
}


/**
 * @returns 1 if the provided broker version (probably)
 *          supports api.version.request.
 */
int rd_kafka_ApiVersion_is_queryable(const char *broker_version) {
        struct rd_kafka_ApiVersion *apis;
        size_t api_cnt;


        if (!rd_kafka_get_legacy_ApiVersions(broker_version, &apis, &api_cnt,
                                             0))
                return 0;

        return apis == rd_kafka_ApiVersion_Queryable;
}



/**
 * @brief Check if match's versions overlaps with \p apis.
 *
 * @returns 1 if true, else 0.
 * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
 */
static RD_INLINE int
rd_kafka_ApiVersion_check(const struct rd_kafka_ApiVersion *apis,
                          size_t api_cnt,
                          const struct rd_kafka_ApiVersion *match) {
        const struct rd_kafka_ApiVersion *api;

        api = bsearch(match, apis, api_cnt, sizeof(*apis),
                      rd_kafka_ApiVersion_key_cmp);
        if (unlikely(!api))
                return 0;

        return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
}


/**
 * @brief Compare broker's supported API versions to our feature request map
 *        and enable/disable features accordingly.
 *
 * @param broker_apis Broker's supported APIs. If NULL the
 *        \p broker.version.fallback configuration property will specify a
 *        default legacy version to use.
 * @param broker_api_cnt Number of elements in \p broker_apis
 *
 * @returns the supported features (bitmask) to enable.
 */
int rd_kafka_features_check(rd_kafka_broker_t *rkb,
                            struct rd_kafka_ApiVersion *broker_apis,
                            size_t broker_api_cnt) {
        int features = 0;
        int i;

        /* Scan through features. */
        for (i = 0; rd_kafka_feature_map[i].feature != 0; i++) {
                const struct rd_kafka_ApiVersion *match;
                int fails = 0;

                /* For each feature check that all its API dependencies
                 * can be fullfilled. */

                for (match = &rd_kafka_feature_map[i].depends[0];
                     match->ApiKey != -1; match++) {
                        int r;

                        r = rd_kafka_ApiVersion_check(broker_apis,
                                                      broker_api_cnt, match);

                        rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
                                   " Feature %s: %s (%hd..%hd) "
                                   "%ssupported by broker",
                                   rd_kafka_features2str(
                                       rd_kafka_feature_map[i].feature),
                                   rd_kafka_ApiKey2str(match->ApiKey),
                                   match->MinVer, match->MaxVer,
                                   r ? "" : "NOT ");

                        fails += !r;
                }

                rd_rkb_dbg(
                    rkb, FEATURE, "APIVERSION", "%s feature %s",
                    fails ? "Disabling" : "Enabling",
                    rd_kafka_features2str(rd_kafka_feature_map[i].feature));


                if (!fails)
                        features |= rd_kafka_feature_map[i].feature;
        }

        return features;
}



/**
 * @brief Make an allocated and sorted copy of \p src.
 */
void rd_kafka_ApiVersions_copy(const struct rd_kafka_ApiVersion *src,
                               size_t src_cnt,
                               struct rd_kafka_ApiVersion **dstp,
                               size_t *dst_cntp) {
        *dstp     = rd_memdup(src, sizeof(*src) * src_cnt);
        *dst_cntp = src_cnt;
        qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
}



/**
 * @returns a human-readable feature flag string.
 */
const char *rd_kafka_features2str(int features) {
        static RD_TLS char ret[4][256];
        size_t of              = 0;
        static RD_TLS int reti = 0;
        int i;

        reti = (reti + 1) % 4;

        *ret[reti] = '\0';
        for (i = 0; rd_kafka_feature_names[i]; i++) {
                int r;
                if (!(features & (1 << i)))
                        continue;

                r = rd_snprintf(ret[reti] + of, sizeof(ret[reti]) - of, "%s%s",
                                of == 0 ? "" : ",", rd_kafka_feature_names[i]);
                if ((size_t)r > sizeof(ret[reti]) - of) {
                        /* Out of space */
                        memcpy(&ret[reti][sizeof(ret[reti]) - 3], "..", 3);
                        break;
                }

                of += r;
        }

        return ret[reti];
}
