/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rd.h"
#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_broker.h"
#include "rdkafka_cgrp.h"
#include "rdkafka_metadata.h"
#include "rdkafka_offset.h"
#include "rdlog.h"
#include "rdsysqueue.h"
#include "rdtime.h"
#include "rdregex.h"
#include "rdkafka_fetcher.h"

#if WITH_ZSTD
#include <zstd.h>
#endif


const char *rd_kafka_topic_state_names[] = {"unknown", "exists", "notexists",
                                            "error"};


static int
rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
                               const struct rd_kafka_metadata_topic *mdt,
                               const rd_kafka_metadata_topic_internal_t *mdit,
                               rd_ts_t ts_age);


/**
 * @brief Increases the app's topic reference count.
 *
 * The app refcounts are implemented separately from the librdkafka refcounts,
 * they are increased/decreased in a separate rkt_app_refcnt to keep track of
 * its use.
 *
 * This only covers topic_new() & topic_destroy().
 * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled
 * like a standard internal -> app pointer conversion (keep_a()).
 */
static void rd_kafka_topic_keep_app(rd_kafka_topic_t *rkt) {
        if (rd_refcnt_add(&rkt->rkt_app_refcnt) == 1)
                rd_kafka_topic_keep(rkt);
}

/**
 * @brief drop rkt app reference
 */
static void rd_kafka_topic_destroy_app(rd_kafka_topic_t *app_rkt) {
        rd_kafka_topic_t *rkt = app_rkt;

        rd_assert(!rd_kafka_rkt_is_lw(app_rkt));

        if (unlikely(rd_refcnt_sub(&rkt->rkt_app_refcnt) == 0))
                rd_kafka_topic_destroy0(rkt); /* final app reference lost,
                                               * loose reference from
                                               * keep_app() */
}


/**
 * Final destructor for topic. Refcnt must be 0.
 */
void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt) {
        rd_kafka_partition_msgid_t *partmsgid, *partmsgid_tmp;

        rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0);

        rd_kafka_wrlock(rkt->rkt_rk);
        TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link);
        rkt->rkt_rk->rk_topic_cnt--;
        rd_kafka_wrunlock(rkt->rkt_rk);

        TAILQ_FOREACH_SAFE(partmsgid, &rkt->rkt_saved_partmsgids, link,
                           partmsgid_tmp) {
                rd_free(partmsgid);
        }

        rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp));
        rd_list_destroy(&rkt->rkt_desp);

        rd_avg_destroy(&rkt->rkt_avg_batchsize);
        rd_avg_destroy(&rkt->rkt_avg_batchcnt);

        if (rkt->rkt_topic)
                rd_kafkap_str_destroy(rkt->rkt_topic);

        rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf);

        rwlock_destroy(&rkt->rkt_lock);
        rd_refcnt_destroy(&rkt->rkt_app_refcnt);
        rd_refcnt_destroy(&rkt->rkt_refcnt);

        rd_free(rkt);
}

/**
 * @brief Application topic object destroy.
 * @warning MUST ONLY BE CALLED BY THE APPLICATION.
 *          Use rd_kafka_topic_destroy0() for all internal use.
 */
void rd_kafka_topic_destroy(rd_kafka_topic_t *app_rkt) {
        rd_kafka_lwtopic_t *lrkt;
        if (unlikely((lrkt = rd_kafka_rkt_get_lw(app_rkt)) != NULL))
                rd_kafka_lwtopic_destroy(lrkt);
        else
                rd_kafka_topic_destroy_app(app_rkt);
}


/**
 * Finds and returns a topic based on its name, or NULL if not found.
 * The 'rkt' refcount is increased by one and the caller must call
 * rd_kafka_topic_destroy() when it is done with the topic to decrease
 * the refcount.
 *
 * Locality: any thread
 */
rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func,
                                         int line,
                                         rd_kafka_t *rk,
                                         const char *topic,
                                         int do_lock) {
        rd_kafka_topic_t *rkt;

        if (do_lock)
                rd_kafka_rdlock(rk);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) {
                        rd_kafka_topic_keep(rkt);
                        break;
                }
        }
        if (do_lock)
                rd_kafka_rdunlock(rk);

        return rkt;
}

/**
 * Same semantics as ..find() but takes a Kafka protocol string instead.
 */
rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func,
                                          int line,
                                          rd_kafka_t *rk,
                                          const rd_kafkap_str_t *topic) {
        rd_kafka_topic_t *rkt;

        rd_kafka_rdlock(rk);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) {
                        rd_kafka_topic_keep(rkt);
                        break;
                }
        }
        rd_kafka_rdunlock(rk);

        return rkt;
}


/**
 * @brief rd_kafka_topic_t comparator.
 */
int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b) {
        rd_kafka_topic_t *rkt_a = (void *)_a, *rkt_b = (void *)_b;

        if (rkt_a == rkt_b)
                return 0;

        return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic);
}


/**
 * @brief Destroy/free a light-weight topic object.
 */
void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt) {
        rd_assert(rd_kafka_rkt_is_lw((const rd_kafka_topic_t *)lrkt));
        if (rd_refcnt_sub(&lrkt->lrkt_refcnt) > 0)
                return;

        rd_refcnt_destroy(&lrkt->lrkt_refcnt);
        rd_free(lrkt);
}


/**
 * @brief Create a new light-weight topic name-only handle.
 *
 * This type of object is a light-weight non-linked alternative
 * to the proper rd_kafka_itopic_t for outgoing APIs
 * (such as rd_kafka_message_t) when there is no full topic object available.
 */
rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic) {
        rd_kafka_lwtopic_t *lrkt;
        size_t topic_len = strlen(topic);

        lrkt = rd_malloc(sizeof(*lrkt) + topic_len + 1);

        memcpy(lrkt->lrkt_magic, "LRKT", 4);
        lrkt->lrkt_rk = rk;
        rd_refcnt_init(&lrkt->lrkt_refcnt, 1);
        lrkt->lrkt_topic = (char *)(lrkt + 1);
        memcpy(lrkt->lrkt_topic, topic, topic_len + 1);

        return lrkt;
}


/**
 * @returns a proper rd_kafka_topic_t object (not light-weight)
 *          based on the input rd_kafka_topic_t app object which may
 *          either be a proper topic (which is then returned) or a light-weight
 *          topic in which case it will look up or create the proper topic
 *          object.
 *
 *          This allows the application to (unknowingly) pass a light-weight
 *          topic object to any proper-aware public API.
 */
rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt) {
        rd_kafka_lwtopic_t *lrkt;

        if (likely(!(lrkt = rd_kafka_rkt_get_lw(app_rkt))))
                return app_rkt;

        /* Create proper topic object */
        return rd_kafka_topic_new0(lrkt->lrkt_rk, lrkt->lrkt_topic, NULL, NULL,
                                   0);
}


/**
 * @brief Create new topic handle.
 *
 * @locality any
 */
rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk,
                                      const char *topic,
                                      rd_kafka_topic_conf_t *conf,
                                      int *existing,
                                      int do_lock) {
        rd_kafka_topic_t *rkt;
        const struct rd_kafka_metadata_cache_entry *rkmce;
        const char *conf_err;
        const char *used_conf_str;

        /* Verify configuration.
         * Maximum topic name size + headers must never exceed message.max.bytes
         * which is min-capped to 1000.
         * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */
        if (!topic || strlen(topic) > 512) {
                if (conf)
                        rd_kafka_topic_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }

        if (do_lock)
                rd_kafka_wrlock(rk);
        if ((rkt = rd_kafka_topic_find(rk, topic, 0 /*no lock*/))) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
                if (conf)
                        rd_kafka_topic_conf_destroy(conf);
                if (existing)
                        *existing = 1;
                return rkt;
        }

        if (!conf) {
                if (rk->rk_conf.topic_conf) {
                        conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
                        used_conf_str = "default_topic_conf";
                } else {
                        conf          = rd_kafka_topic_conf_new();
                        used_conf_str = "empty";
                }
        } else {
                used_conf_str = "user-supplied";
        }


        /* Verify and finalize topic configuration */
        if ((conf_err = rd_kafka_topic_conf_finalize(rk->rk_type, &rk->rk_conf,
                                                     conf))) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
                /* Incompatible configuration settings */
                rd_kafka_log(rk, LOG_ERR, "TOPICCONF",
                             "Incompatible configuration settings "
                             "for topic \"%s\": %s",
                             topic, conf_err);
                rd_kafka_topic_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }

        if (existing)
                *existing = 0;

        rkt = rd_calloc(1, sizeof(*rkt));

        memcpy(rkt->rkt_magic, "IRKT", 4);

        rkt->rkt_topic = rd_kafkap_str_new(topic, -1);
        rkt->rkt_rk    = rk;

        rkt->rkt_ts_create = rd_clock();

        rkt->rkt_conf = *conf;
        rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
                        * since we dont want to rd_free internal members,
                        * just the placeholder. The internal members
                        * were copied on the line above. */

        /* Partitioner */
        if (!rkt->rkt_conf.partitioner) {
                const struct {
                        const char *str;
                        void *part;
                } part_map[] = {
                    {"random", (void *)rd_kafka_msg_partitioner_random},
                    {"consistent", (void *)rd_kafka_msg_partitioner_consistent},
                    {"consistent_random",
                     (void *)rd_kafka_msg_partitioner_consistent_random},
                    {"murmur2", (void *)rd_kafka_msg_partitioner_murmur2},
                    {"murmur2_random",
                     (void *)rd_kafka_msg_partitioner_murmur2_random},
                    {"fnv1a", (void *)rd_kafka_msg_partitioner_fnv1a},
                    {"fnv1a_random",
                     (void *)rd_kafka_msg_partitioner_fnv1a_random},
                    {NULL}};
                int i;

                /* Use "partitioner" configuration property string, if set */
                for (i = 0; rkt->rkt_conf.partitioner_str && part_map[i].str;
                     i++) {
                        if (!strcmp(rkt->rkt_conf.partitioner_str,
                                    part_map[i].str)) {
                                rkt->rkt_conf.partitioner = part_map[i].part;
                                break;
                        }
                }

                /* Default partitioner: consistent_random */
                if (!rkt->rkt_conf.partitioner) {
                        /* Make sure part_map matched something, otherwise
                         * there is a discreprency between this code
                         * and the validator in rdkafka_conf.c */
                        assert(!rkt->rkt_conf.partitioner_str);

                        rkt->rkt_conf.partitioner =
                            rd_kafka_msg_partitioner_consistent_random;
                }
        }

        if (rkt->rkt_rk->rk_conf.sticky_partition_linger_ms > 0 &&
            rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_consistent &&
            rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_murmur2 &&
            rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_fnv1a) {
                rkt->rkt_conf.random_partitioner = rd_false;
        } else {
                rkt->rkt_conf.random_partitioner = rd_true;
        }

        /* Sticky partition assignment interval */
        rd_interval_init(&rkt->rkt_sticky_intvl);

        if (rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO)
                rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid;
        else
                rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid_lifo;

        if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
                rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;

        /* Translate compression level to library-specific level and check
         * upper bound */
        switch (rkt->rkt_conf.compression_codec) {
#if WITH_ZLIB
        case RD_KAFKA_COMPRESSION_GZIP:
                if (rkt->rkt_conf.compression_level ==
                    RD_KAFKA_COMPLEVEL_DEFAULT)
                        rkt->rkt_conf.compression_level = Z_DEFAULT_COMPRESSION;
                else if (rkt->rkt_conf.compression_level >
                         RD_KAFKA_COMPLEVEL_GZIP_MAX)
                        rkt->rkt_conf.compression_level =
                            RD_KAFKA_COMPLEVEL_GZIP_MAX;
                break;
#endif
        case RD_KAFKA_COMPRESSION_LZ4:
                if (rkt->rkt_conf.compression_level ==
                    RD_KAFKA_COMPLEVEL_DEFAULT)
                        /* LZ4 has no notion of system-wide default compression
                         * level, use zero in this case */
                        rkt->rkt_conf.compression_level = 0;
                else if (rkt->rkt_conf.compression_level >
                         RD_KAFKA_COMPLEVEL_LZ4_MAX)
                        rkt->rkt_conf.compression_level =
                            RD_KAFKA_COMPLEVEL_LZ4_MAX;
                break;
#if WITH_ZSTD
        case RD_KAFKA_COMPRESSION_ZSTD:
                if (rkt->rkt_conf.compression_level ==
                    RD_KAFKA_COMPLEVEL_DEFAULT)
                        rkt->rkt_conf.compression_level = 3;
                else if (rkt->rkt_conf.compression_level >
                         RD_KAFKA_COMPLEVEL_ZSTD_MAX)
                        rkt->rkt_conf.compression_level =
                            RD_KAFKA_COMPLEVEL_ZSTD_MAX;
                break;
#endif
        case RD_KAFKA_COMPRESSION_SNAPPY:
        default:
                /* Compression level has no effect in this case */
                rkt->rkt_conf.compression_level = RD_KAFKA_COMPLEVEL_DEFAULT;
        }

        rd_avg_init(&rkt->rkt_avg_batchsize, RD_AVG_GAUGE, 0,
                    rk->rk_conf.max_msg_size, 2,
                    rk->rk_conf.stats_interval_ms ? 1 : 0);
        rd_avg_init(&rkt->rkt_avg_batchcnt, RD_AVG_GAUGE, 0,
                    rk->rk_conf.batch_num_messages, 2,
                    rk->rk_conf.stats_interval_ms ? 1 : 0);

        rd_kafka_dbg(rk, TOPIC, "TOPIC", "New local topic: %.*s",
                     RD_KAFKAP_STR_PR(rkt->rkt_topic));

        rd_list_init(&rkt->rkt_desp, 16, NULL);
        rd_interval_init(&rkt->rkt_desp_refresh_intvl);
        TAILQ_INIT(&rkt->rkt_saved_partmsgids);
        rd_refcnt_init(&rkt->rkt_refcnt, 0);
        rd_refcnt_init(&rkt->rkt_app_refcnt, 0);

        rd_kafka_topic_keep(rkt);

        rwlock_init(&rkt->rkt_lock);

        /* Create unassigned partition */
        rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);

        TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
        rk->rk_topic_cnt++;

        /* Populate from metadata cache. */
        if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1 /*valid*/)) &&
            !rkmce->rkmce_mtopic.err) {
                if (existing)
                        *existing = 1;

                rd_kafka_topic_metadata_update(
                    rkt, &rkmce->rkmce_mtopic,
                    &rkmce->rkmce_metadata_internal_topic,
                    rkmce->rkmce_ts_insert);
        }

        if (do_lock)
                rd_kafka_wrunlock(rk);

        if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
                char desc[256];
                rd_snprintf(desc, sizeof(desc),
                            "Topic \"%s\" configuration (%s)", topic,
                            used_conf_str);
                rd_kafka_anyconf_dump_dbg(rk, _RK_TOPIC, &rkt->rkt_conf, desc);
        }

        return rkt;
}



/**
 * @brief Create new app topic handle.
 *
 * @locality application thread
 */
rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk,
                                     const char *topic,
                                     rd_kafka_topic_conf_t *conf) {
        rd_kafka_topic_t *rkt;
        int existing;

        rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1 /*lock*/);
        if (!rkt)
                return NULL;

        /* Increase application refcount. */
        rd_kafka_topic_keep_app(rkt);

        /* Query for the topic leader (async) */
        if (!existing)
                rd_kafka_topic_leader_query(rk, rkt);

        /* Drop our reference since there is already/now an app refcnt */
        rd_kafka_topic_destroy0(rkt);

        return rkt;
}



/**
 * Sets the state for topic.
 * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held
 */
static void rd_kafka_topic_set_state(rd_kafka_topic_t *rkt, int state) {

        if ((int)rkt->rkt_state == state)
                return;

        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE",
                     "Topic %s changed state %s -> %s", rkt->rkt_topic->str,
                     rd_kafka_topic_state_names[rkt->rkt_state],
                     rd_kafka_topic_state_names[state]);

        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR)
                rkt->rkt_err = RD_KAFKA_RESP_ERR_NO_ERROR;

        rkt->rkt_state = state;
}

/**
 * Returns the name of a topic.
 * NOTE:
 *   The topic Kafka String representation is crafted with an extra byte
 *   at the end for the Nul that is not included in the length, this way
 *   we can use the topic's String directly.
 *   This is not true for Kafka Strings read from the network.
 */
const char *rd_kafka_topic_name(const rd_kafka_topic_t *app_rkt) {
        if (rd_kafka_rkt_is_lw(app_rkt))
                return rd_kafka_rkt_lw_const(app_rkt)->lrkt_topic;
        else
                return app_rkt->rkt_topic->str;
}


/**
 * @brief Update the broker that a topic+partition is delegated to.
 *
 * @param broker_id The id of the broker to associate the toppar with.
 * @param rkb A reference to the broker to delegate to (must match
 *        broker_id) or NULL if the toppar should be undelegated for
 *        any reason.
 * @param reason Human-readable reason for the update, included in debug log.
 *
 * @returns 1 if the broker delegation was changed, -1 if the broker
 *          delegation was changed and is now undelegated, else 0.
 *
 * @locks caller must have rd_kafka_toppar_lock(rktp)
 * @locality any
 */
int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp,
                                  int32_t broker_id,
                                  rd_kafka_broker_t *rkb,
                                  const char *reason) {

        rktp->rktp_broker_id = broker_id;

        if (!rkb) {
                int had_broker = rktp->rktp_broker ? 1 : 0;
                rd_kafka_toppar_broker_delegate(rktp, NULL);
                return had_broker ? -1 : 0;
        }

        if (rktp->rktp_broker) {
                if (rktp->rktp_broker == rkb) {
                        /* No change in broker */
                        return 0;
                }

                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC | RD_KAFKA_DBG_FETCH,
                             "TOPICUPD",
                             "Topic %s [%" PRId32
                             "]: migrating from "
                             "broker %" PRId32 " to %" PRId32
                             " (leader is "
                             "%" PRId32 "): %s",
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition,
                             rktp->rktp_broker->rkb_nodeid, rkb->rkb_nodeid,
                             rktp->rktp_leader_id, reason);
        }

        rd_kafka_toppar_broker_delegate(rktp, rkb);

        return 1;
}


/**
 * @brief Update a topic+partition for a new leader.
 *
 * @remark If a toppar is currently delegated to a preferred replica,
 *         it will not be delegated to the leader broker unless there
 *         has been a leader change.
 *
 * @param leader_id The id of the new leader broker.
 * @param leader A reference to the leader broker or NULL if the
 *        toppar should be undelegated for any reason.
 * @param leader_epoch Partition leader's epoch (KIP-320), or -1 if not known.
 *
 * @returns 1 if the broker delegation was changed, -1 if the broker
 *        delegation was changed and is now undelegated, else 0.
 *
 * @locks caller must have rd_kafka_topic_wrlock(rkt)
 *        AND NOT rd_kafka_toppar_lock(rktp)
 * @locality any
 */
static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
                                         int32_t partition,
                                         int32_t leader_id,
                                         rd_kafka_broker_t *leader,
                                         int32_t leader_epoch) {
        rd_kafka_toppar_t *rktp;
        rd_bool_t fetching_from_follower, need_epoch_validation = rd_false;
        int r = 0;

        rktp = rd_kafka_toppar_get(rkt, partition, 0);
        if (unlikely(!rktp)) {
                /* Have only seen this in issue #132.
                 * Probably caused by corrupt broker state. */
                rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "BROKER",
                             "%s [%" PRId32
                             "] is unknown "
                             "(partition_cnt %i): "
                             "ignoring leader (%" PRId32 ") update",
                             rkt->rkt_topic->str, partition,
                             rkt->rkt_partition_cnt, leader_id);
                return -1;
        }

        rd_kafka_toppar_lock(rktp);

        if (leader_epoch < rktp->rktp_leader_epoch) {
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
                             "%s [%" PRId32
                             "]: ignoring outdated metadata update with "
                             "leader epoch %" PRId32
                             " which is older than "
                             "our cached epoch %" PRId32,
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition, leader_epoch,
                             rktp->rktp_leader_epoch);
                if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
                        rd_kafka_toppar_unlock(rktp);
                        rd_kafka_toppar_destroy(rktp); /* from get() */
                        return 0;
                }
        }

        if (leader_epoch > rktp->rktp_leader_epoch) {
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
                             "%s [%" PRId32 "]: leader %" PRId32
                             " epoch %" PRId32 " -> leader %" PRId32
                             " epoch %" PRId32,
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition, rktp->rktp_leader_id,
                             rktp->rktp_leader_epoch, leader_id, leader_epoch);
                rktp->rktp_leader_epoch = leader_epoch;
                need_epoch_validation   = rd_true;
        } else if (rktp->rktp_fetch_state ==
                   RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
                need_epoch_validation = rd_true;

        fetching_from_follower =
            leader != NULL && rktp->rktp_broker != NULL &&
            rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL &&
            rktp->rktp_broker != leader;

        if (fetching_from_follower && rktp->rktp_leader_id == leader_id) {
                rd_kafka_dbg(
                    rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
                    "Topic %s [%" PRId32 "]: leader %" PRId32
                    " unchanged, "
                    "not migrating away from preferred replica %" PRId32,
                    rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
                    leader_id, rktp->rktp_broker_id);
                r = 0;

        } else {

                if (rktp->rktp_leader_id != leader_id ||
                    rktp->rktp_leader != leader) {
                        /* Update leader if it has changed */
                        rktp->rktp_leader_id = leader_id;
                        if (rktp->rktp_leader)
                                rd_kafka_broker_destroy(rktp->rktp_leader);
                        if (leader)
                                rd_kafka_broker_keep(leader);
                        rktp->rktp_leader = leader;
                }

                /* Update handling broker */
                r = rd_kafka_toppar_broker_update(rktp, leader_id, leader,
                                                  "leader updated");
        }

        if (need_epoch_validation) {
                /* Set offset validation position,
                 * depending it if should continue with current position or
                 * with next fetch start position. */
                if (rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(
                        rktp)) {
                        rd_kafka_toppar_set_offset_validation_position(
                            rktp, rktp->rktp_next_fetch_start);
                } else {
                        rd_kafka_toppar_set_offset_validation_position(
                            rktp, rktp->rktp_offsets.fetch_pos);
                }
                rd_kafka_offset_validate(rktp, "epoch updated from metadata");
        }

        rd_kafka_toppar_unlock(rktp);

        rd_kafka_toppar_destroy(rktp); /* from get() */

        return r;
}


/**
 * @brief Revert the topic+partition delegation to the leader from
 *        a preferred replica.
 *
 * @returns 1 if the broker delegation was changed, -1 if the broker
 *          delegation was changed and is now undelegated, else 0.
 *
 * @locks none
 * @locality any
 */
int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp) {
        rd_kafka_broker_t *leader;
        int r;

        rd_kafka_rdlock(rktp->rktp_rkt->rkt_rk);
        rd_kafka_toppar_lock(rktp);

        rd_assert(rktp->rktp_leader_id != rktp->rktp_broker_id);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
                     "Topic %s [%" PRId32
                     "]: Reverting from preferred "
                     "replica %" PRId32 " to leader %" PRId32,
                     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
                     rktp->rktp_broker_id, rktp->rktp_leader_id);

        leader = rd_kafka_broker_find_by_nodeid(rktp->rktp_rkt->rkt_rk,
                                                rktp->rktp_leader_id);

        rd_kafka_toppar_unlock(rktp);
        rd_kafka_rdunlock(rktp->rktp_rkt->rkt_rk);

        rd_kafka_toppar_lock(rktp);
        r = rd_kafka_toppar_broker_update(
            rktp, rktp->rktp_leader_id, leader,
            "reverting from preferred replica to leader");
        rd_kafka_toppar_unlock(rktp);

        if (leader)
                rd_kafka_broker_destroy(leader);

        return r;
}



/**
 * @brief Save idempotent producer state for a partition that is about to
 *        be removed.
 *
 * @locks_required rd_kafka_wrlock(rkt), rd_kafka_toppar_lock(rktp)
 */
static void rd_kafka_toppar_idemp_msgid_save(rd_kafka_topic_t *rkt,
                                             const rd_kafka_toppar_t *rktp) {
        rd_kafka_partition_msgid_t *partmsgid = rd_malloc(sizeof(*partmsgid));
        partmsgid->partition                  = rktp->rktp_partition;
        partmsgid->msgid                      = rktp->rktp_msgid;
        partmsgid->pid                        = rktp->rktp_eos.pid;
        partmsgid->epoch_base_msgid           = rktp->rktp_eos.epoch_base_msgid;
        partmsgid->ts                         = rd_clock();

        TAILQ_INSERT_TAIL(&rkt->rkt_saved_partmsgids, partmsgid, link);
}


/**
 * @brief Restore idempotent producer state for a new/resurfacing partition.
 *
 * @locks_required rd_kafka_wrlock(rkt), rd_kafka_toppar_lock(rktp)
 */
static void rd_kafka_toppar_idemp_msgid_restore(rd_kafka_topic_t *rkt,
                                                rd_kafka_toppar_t *rktp) {
        rd_kafka_partition_msgid_t *partmsgid;

        TAILQ_FOREACH(partmsgid, &rkt->rkt_saved_partmsgids, link) {
                if (partmsgid->partition == rktp->rktp_partition)
                        break;
        }

        if (!partmsgid)
                return;

        rktp->rktp_msgid                = partmsgid->msgid;
        rktp->rktp_eos.pid              = partmsgid->pid;
        rktp->rktp_eos.epoch_base_msgid = partmsgid->epoch_base_msgid;

        rd_kafka_dbg(rkt->rkt_rk, EOS | RD_KAFKA_DBG_TOPIC, "MSGID",
                     "Topic %s [%" PRId32 "]: restored %s with MsgId %" PRIu64
                     " and "
                     "epoch base MsgId %" PRIu64
                     " that was saved upon removal %dms ago",
                     rkt->rkt_topic->str, rktp->rktp_partition,
                     rd_kafka_pid2str(partmsgid->pid), partmsgid->msgid,
                     partmsgid->epoch_base_msgid,
                     (int)((rd_clock() - partmsgid->ts) / 1000));

        TAILQ_REMOVE(&rkt->rkt_saved_partmsgids, partmsgid, link);
        rd_free(partmsgid);
}


/**
 * @brief Update the number of partitions for a topic and takes actions
 *        accordingly.
 *
 * @returns 1 if the partition count changed, else 0.
 *
 * @locks rd_kafka_topic_wrlock(rkt) MUST be held.
 */
static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt,
                                               int32_t partition_cnt) {
        rd_kafka_t *rk = rkt->rkt_rk;
        rd_kafka_toppar_t **rktps;
        rd_kafka_toppar_t *rktp;
        rd_bool_t is_idempodent = rd_kafka_is_idempotent(rk);
        int32_t i;

        if (likely(rkt->rkt_partition_cnt == partition_cnt))
                return 0; /* No change in partition count */

        if (unlikely(rkt->rkt_partition_cnt != 0 &&
                     !rd_kafka_terminating(rkt->rkt_rk)))
                rd_kafka_log(rk, LOG_NOTICE, "PARTCNT",
                             "Topic %s partition count changed "
                             "from %" PRId32 " to %" PRId32,
                             rkt->rkt_topic->str, rkt->rkt_partition_cnt,
                             partition_cnt);
        else
                rd_kafka_dbg(rk, TOPIC, "PARTCNT",
                             "Topic %s partition count changed "
                             "from %" PRId32 " to %" PRId32,
                             rkt->rkt_topic->str, rkt->rkt_partition_cnt,
                             partition_cnt);


        /* Create and assign new partition list */
        if (partition_cnt > 0)
                rktps = rd_calloc(partition_cnt, sizeof(*rktps));
        else
                rktps = NULL;

        for (i = 0; i < partition_cnt; i++) {
                if (i >= rkt->rkt_partition_cnt) {
                        /* New partition. Check if its in the list of
                         * desired partitions first. */

                        rktp = rd_kafka_toppar_desired_get(rkt, i);
                        if (rktp) {
                                rd_kafka_toppar_lock(rktp);
                                rktp->rktp_flags &=
                                    ~(RD_KAFKA_TOPPAR_F_UNKNOWN |
                                      RD_KAFKA_TOPPAR_F_REMOVE);

                                /* Remove from desp list since the
                                 * partition is now known. */
                                rd_kafka_toppar_desired_unlink(rktp);
                        } else {
                                rktp = rd_kafka_toppar_new(rkt, i);

                                rd_kafka_toppar_lock(rktp);
                                rktp->rktp_flags &=
                                    ~(RD_KAFKA_TOPPAR_F_UNKNOWN |
                                      RD_KAFKA_TOPPAR_F_REMOVE);
                        }
                        rktps[i] = rktp;

                        if (is_idempodent)
                                /* Restore idempotent producer state for
                                 * this partition, if any. */
                                rd_kafka_toppar_idemp_msgid_restore(rkt, rktp);

                        rd_kafka_toppar_unlock(rktp);

                } else {
                        /* Existing partition, grab our own reference. */
                        rktps[i] = rd_kafka_toppar_keep(rkt->rkt_p[i]);
                        /* Loose previous ref */
                        rd_kafka_toppar_destroy(rkt->rkt_p[i]);
                }
        }

        /* Propagate notexist errors for desired partitions */
        RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) {
                rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
                             "%s [%" PRId32
                             "]: "
                             "desired partition does not exist in cluster",
                             rkt->rkt_topic->str, rktp->rktp_partition);
                rd_kafka_toppar_enq_error(
                    rktp,
                    rkt->rkt_err ? rkt->rkt_err
                                 : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
                    "desired partition is not available");
        }

        /* Remove excessive partitions */
        for (i = partition_cnt; i < rkt->rkt_partition_cnt; i++) {
                rktp = rkt->rkt_p[i];

                rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE",
                             "%s [%" PRId32 "] no longer reported in metadata",
                             rkt->rkt_topic->str, rktp->rktp_partition);

                rd_kafka_toppar_lock(rktp);

                /* Idempotent/Transactional producer:
                 * We need to save each removed partition's base msgid for
                 * the (rare) chance the partition comes back,
                 * in which case we must continue with the correct msgid
                 * in future ProduceRequests.
                 *
                 * These base msgsid are restored (above) if/when partitions
                 * come back and the PID,Epoch hasn't changed.
                 *
                 * One situation where this might happen is if a broker goes
                 * out of sync and starts to wrongfully report an existing
                 * topic as non-existent, triggering the removal of partitions
                 * on the producer client. When metadata is eventually correct
                 * again and the topic is "re-created" on the producer, it
                 * must continue with the next msgid/baseseq. */
                if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid))
                        rd_kafka_toppar_idemp_msgid_save(rkt, rktp);

                rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;

                if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
                                     "Topic %s [%" PRId32
                                     "] is desired "
                                     "but no longer known: "
                                     "moving back on desired list",
                                     rkt->rkt_topic->str, rktp->rktp_partition);

                        /* If this is a desired partition move it back on to
                         * the desired list since partition is no longer known*/
                        rd_kafka_toppar_desired_link(rktp);

                        if (!rd_kafka_terminating(rkt->rkt_rk))
                                rd_kafka_toppar_enq_error(
                                    rktp,
                                    rkt->rkt_err
                                        ? rkt->rkt_err
                                        : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
                                    "desired partition is no longer "
                                    "available");

                        rd_kafka_toppar_broker_delegate(rktp, NULL);

                } else {
                        /* Tell handling broker to let go of the toppar */
                        rd_kafka_toppar_broker_leave_for_remove(rktp);
                }

                rd_kafka_toppar_unlock(rktp);

                rd_kafka_toppar_destroy(rktp);
        }

        if (rkt->rkt_p)
                rd_free(rkt->rkt_p);

        rkt->rkt_p = rktps;

        rkt->rkt_partition_cnt = partition_cnt;

        return 1;
}



/**
 * Topic 'rkt' does not exist: propagate to interested parties.
 * The topic's state must have been set to NOTEXISTS and
 * rd_kafka_topic_partition_cnt_update() must have been called prior to
 * calling this function.
 *
 * Locks: rd_kafka_topic_*lock() must be held.
 */
static void rd_kafka_topic_propagate_notexists(rd_kafka_topic_t *rkt,
                                               rd_kafka_resp_err_t err) {
        rd_kafka_toppar_t *rktp;
        int i;

        if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)
                return;


        /* Notify consumers that the topic doesn't exist. */
        RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
        rd_kafka_toppar_enq_error(rktp, err, "topic does not exist");
}


/**
 * Assign messages on the UA partition to available partitions.
 * Locks: rd_kafka_topic_*lock() must be held.
 */
static void rd_kafka_topic_assign_uas(rd_kafka_topic_t *rkt,
                                      rd_kafka_resp_err_t err) {
        rd_kafka_t *rk = rkt->rkt_rk;
        rd_kafka_toppar_t *rktp_ua;
        rd_kafka_msg_t *rkm, *tmp;
        rd_kafka_msgq_t uas         = RD_KAFKA_MSGQ_INITIALIZER(uas);
        rd_kafka_msgq_t failed      = RD_KAFKA_MSGQ_INITIALIZER(failed);
        rd_kafka_resp_err_t err_all = RD_KAFKA_RESP_ERR_NO_ERROR;
        int cnt;

        if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
                return;

        rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
        if (unlikely(!rktp_ua)) {
                rd_kafka_dbg(rk, TOPIC, "ASSIGNUA",
                             "No UnAssigned partition available for %s",
                             rkt->rkt_topic->str);
                return;
        }

        /* Assign all unassigned messages to new topics. */
        rd_kafka_toppar_lock(rktp_ua);

        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) {
                err_all = rkt->rkt_err;
                rd_kafka_dbg(rk, TOPIC, "PARTCNT",
                             "Failing all %i unassigned messages in "
                             "topic %.*s due to permanent topic error: %s",
                             rktp_ua->rktp_msgq.rkmq_msg_cnt,
                             RD_KAFKAP_STR_PR(rkt->rkt_topic),
                             rd_kafka_err2str(err_all));
        } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) {
                err_all = err;
                rd_kafka_dbg(rk, TOPIC, "PARTCNT",
                             "Failing all %i unassigned messages in "
                             "topic %.*s since topic does not exist: %s",
                             rktp_ua->rktp_msgq.rkmq_msg_cnt,
                             RD_KAFKAP_STR_PR(rkt->rkt_topic),
                             rd_kafka_err2str(err_all));
        } else {
                rd_kafka_dbg(rk, TOPIC, "PARTCNT",
                             "Partitioning %i unassigned messages in "
                             "topic %.*s to %" PRId32 " partitions",
                             rktp_ua->rktp_msgq.rkmq_msg_cnt,
                             RD_KAFKAP_STR_PR(rkt->rkt_topic),
                             rkt->rkt_partition_cnt);
        }

        rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
        cnt = uas.rkmq_msg_cnt;
        rd_kafka_toppar_unlock(rktp_ua);

        TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
                /* Fast-path for failing messages with forced partition or
                 * when all messages are to fail. */
                if (err_all || (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
                                rkm->rkm_partition >= rkt->rkt_partition_cnt &&
                                rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN)) {
                        rd_kafka_msgq_enq(&failed, rkm);
                        continue;
                }

                if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
                        /* Desired partition not available */
                        rd_kafka_msgq_enq(&failed, rkm);
                }
        }

        rd_kafka_dbg(rk, TOPIC, "UAS",
                     "%i/%i messages were partitioned in topic %s",
                     cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);

        if (failed.rkmq_msg_cnt > 0) {
                /* Fail the messages */
                rd_kafka_dbg(rk, TOPIC, "UAS",
                             "%" PRId32
                             "/%i messages failed partitioning "
                             "in topic %s",
                             failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);
                rd_kafka_dr_msgq(
                    rkt, &failed,
                    err_all ? err_all : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
        }

        rd_kafka_toppar_destroy(rktp_ua); /* from get() */
}


/**
 * @brief Mark topic as non-existent, unless metadata propagation configuration
 *        disallows it.
 *
 * @param err Propagate non-existent topic using this error code.
 *            If \p err is RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION it means the
 *            topic is invalid and no propagation delay will be used.
 *
 * @returns true if the topic was marked as non-existent, else false.
 *
 * @locks topic_wrlock() MUST be held.
 */
rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt,
                                       rd_kafka_resp_err_t err) {
        rd_ts_t remains_us;
        rd_bool_t permanent = err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION;

        if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) {
                /* Dont update metadata while terminating. */
                return rd_false;
        }

        rd_assert(err != RD_KAFKA_RESP_ERR_NO_ERROR);

        remains_us =
            (rkt->rkt_ts_create +
             (rkt->rkt_rk->rk_conf.metadata_propagation_max_ms * 1000)) -
            rkt->rkt_ts_metadata;

        if (!permanent && rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN &&
            remains_us > 0) {
                /* Still allowing topic metadata to propagate. */
                rd_kafka_dbg(
                    rkt->rkt_rk, TOPIC | RD_KAFKA_DBG_METADATA, "TOPICPROP",
                    "Topic %.*s does not exist, allowing %dms "
                    "for metadata propagation before marking topic "
                    "as non-existent",
                    RD_KAFKAP_STR_PR(rkt->rkt_topic), (int)(remains_us / 1000));
                return rd_false;
        }

        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);

        rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

        /* Update number of partitions */
        rd_kafka_topic_partition_cnt_update(rkt, 0);

        /* Purge messages with forced partition */
        rd_kafka_topic_assign_uas(rkt, err);

        /* Propagate nonexistent topic info */
        rd_kafka_topic_propagate_notexists(rkt, err);

        return rd_true;
}

/**
 * @brief Mark topic as errored, such as when topic authorization fails.
 *
 * @param err Propagate error using this error code.
 *
 * @returns true if the topic was marked as errored, else false.
 *
 * @locality any
 * @locks topic_wrlock() MUST be held.
 */
rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt,
                                   rd_kafka_resp_err_t err) {

        if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) {
                /* Dont update metadata while terminating. */
                return rd_false;
        }

        rd_assert(err != RD_KAFKA_RESP_ERR_NO_ERROR);

        /* Same error, ignore. */
        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR && rkt->rkt_err == err)
                return rd_true;

        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPICERROR",
                     "Topic %s has permanent error: %s", rkt->rkt_topic->str,
                     rd_kafka_err2str(err));

        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_ERROR);

        rkt->rkt_err = err;

        /* Update number of partitions */
        rd_kafka_topic_partition_cnt_update(rkt, 0);

        /* Purge messages with forced partition */
        rd_kafka_topic_assign_uas(rkt, err);

        return rd_true;
}



/**
 * @brief Update a topic from metadata.
 *
 * @param mdt Topic metadata.
 * @param mdit Topic internal metadata.
 * @param ts_age absolute age (timestamp) of metadata.
 * @returns 1 if the number of partitions changed, 0 if not, and -1 if the
 *          topic is unknown.

 *
 * @locks_required rd_kafka_*lock() MUST be held.
 */
static int
rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
                               const struct rd_kafka_metadata_topic *mdt,
                               const rd_kafka_metadata_topic_internal_t *mdit,
                               rd_ts_t ts_age) {
        rd_kafka_t *rk = rkt->rkt_rk;
        int upd        = 0;
        int j;
        rd_kafka_broker_t **partbrokers;
        int leader_cnt = 0;
        int old_state;
        rd_bool_t partition_exists_with_no_leader_epoch      = rd_false;
        rd_bool_t partition_exists_with_updated_leader_epoch = rd_false;

        if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
                rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
                             "Error in metadata reply for "
                             "topic %s (PartCnt %i): %s",
                             rkt->rkt_topic->str, mdt->partition_cnt,
                             rd_kafka_err2str(mdt->err));

        if (unlikely(rd_kafka_terminating(rk))) {
                /* Dont update metadata while terminating, do this
                 * after acquiring lock for proper synchronisation */
                return -1;
        }

        /* Look up brokers before acquiring rkt lock to preserve lock order */
        partbrokers = rd_malloc(mdt->partition_cnt * sizeof(*partbrokers));

        for (j = 0; j < mdt->partition_cnt; j++) {
                if (mdt->partitions[j].leader == -1) {
                        partbrokers[j] = NULL;
                        continue;
                }

                partbrokers[j] = rd_kafka_broker_find_by_nodeid(
                    rk, mdt->partitions[j].leader);
        }


        rd_kafka_topic_wrlock(rkt);

        old_state            = rkt->rkt_state;
        rkt->rkt_ts_metadata = ts_age;

        /* Set topic state.
         * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */
        if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ ||
            mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
                rd_kafka_topic_set_notexists(rkt, mdt->err);
        else if (mdt->partition_cnt > 0)
                rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS);
        else if (mdt->err)
                rd_kafka_topic_set_error(rkt, mdt->err);

        /* Update number of partitions, but not if there are
         * (possibly intermittent) errors (e.g., "Leader not available"). */
        if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
                upd += rd_kafka_topic_partition_cnt_update(rkt,
                                                           mdt->partition_cnt);

                /* If the metadata times out for a topic (because all brokers
                 * are down) the state will transition to S_UNKNOWN.
                 * When updated metadata is eventually received there might
                 * not be any change to partition count or leader,
                 * but there may still be messages in the UA partition that
                 * needs to be assigned, so trigger an update for this case too.
                 * Issue #1985. */
                if (old_state == RD_KAFKA_TOPIC_S_UNKNOWN)
                        upd++;
        }

        /* Update leader for each partition */
        for (j = 0; j < mdt->partition_cnt; j++) {
                int r;
                rd_kafka_broker_t *leader;
                int32_t leader_epoch = mdit->partitions[j].leader_epoch;
                rd_kafka_toppar_t *rktp =
                    rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0);

                rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
                             "  Topic %s partition %i Leader %" PRId32
                             " Epoch %" PRId32,
                             rkt->rkt_topic->str, mdt->partitions[j].id,
                             mdt->partitions[j].leader, leader_epoch);

                leader         = partbrokers[j];
                partbrokers[j] = NULL;

                /* If broker does not support leaderEpoch(KIP 320) then it is
                 * set to -1, we assume that metadata is not stale. */
                if (leader_epoch == -1)
                        partition_exists_with_no_leader_epoch = rd_true;
                else if (rktp->rktp_leader_epoch < leader_epoch)
                        partition_exists_with_updated_leader_epoch = rd_true;


                /* Update leader for partition */
                r = rd_kafka_toppar_leader_update(rkt, mdt->partitions[j].id,
                                                  mdt->partitions[j].leader,
                                                  leader, leader_epoch);

                upd += (r != 0 ? 1 : 0);

                if (leader) {
                        if (r != -1)
                                leader_cnt++;
                        /* Drop reference to broker (from find()) */
                        rd_kafka_broker_destroy(leader);
                }
                RD_IF_FREE(rktp, rd_kafka_toppar_destroy);
        }

        /* If all partitions have leaders, and this metadata update was not
         * stale, we can turn off fast leader query. */
        if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt &&
            (partition_exists_with_no_leader_epoch ||
             partition_exists_with_updated_leader_epoch))
                rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

        if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
                /* (Possibly intermittent) topic-wide error:
                 * remove leaders for partitions */

                for (j = 0; j < rkt->rkt_partition_cnt; j++) {
                        rd_kafka_toppar_t *rktp;
                        if (!rkt->rkt_p[j])
                                continue;

                        rktp = rkt->rkt_p[j];
                        rd_kafka_toppar_lock(rktp);
                        rd_kafka_toppar_broker_delegate(rktp, NULL);
                        rd_kafka_toppar_unlock(rktp);
                }
        }

        /* If there was an update to the partitions try to assign
         * unassigned messages to new partitions, or fail them */
        if (upd > 0)
                rd_kafka_topic_assign_uas(
                    rkt,
                    mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);

        rd_kafka_topic_wrunlock(rkt);

        /* Loose broker references */
        for (j = 0; j < mdt->partition_cnt; j++)
                if (partbrokers[j])
                        rd_kafka_broker_destroy(partbrokers[j]);

        rd_free(partbrokers);

        return upd;
}

/**
 * @brief Update topic by metadata, if topic is locally known.
 * @sa rd_kafka_topic_metadata_update()
 * @locks none
 */
int rd_kafka_topic_metadata_update2(
    rd_kafka_broker_t *rkb,
    const struct rd_kafka_metadata_topic *mdt,
    const rd_kafka_metadata_topic_internal_t *mdit) {
        rd_kafka_topic_t *rkt;
        int r;

        rd_kafka_wrlock(rkb->rkb_rk);
        if (!(rkt =
                  rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/))) {
                rd_kafka_wrunlock(rkb->rkb_rk);
                return -1; /* Ignore topics that we dont have locally. */
        }

        r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock());

        rd_kafka_wrunlock(rkb->rkb_rk);

        rd_kafka_topic_destroy0(rkt); /* from find() */

        return r;
}



/**
 * @returns a list of all partitions (rktp's) for a topic.
 * @remark rd_kafka_topic_*lock() MUST be held.
 */
static rd_list_t *rd_kafka_topic_get_all_partitions(rd_kafka_topic_t *rkt) {
        rd_list_t *list;
        rd_kafka_toppar_t *rktp;
        int i;

        list = rd_list_new(rkt->rkt_partition_cnt +
                               rd_list_cnt(&rkt->rkt_desp) + 1 /*ua*/,
                           NULL);

        for (i = 0; i < rkt->rkt_partition_cnt; i++)
                rd_list_add(list, rd_kafka_toppar_keep(rkt->rkt_p[i]));

        RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
        rd_list_add(list, rd_kafka_toppar_keep(rktp));

        if (rkt->rkt_ua)
                rd_list_add(list, rd_kafka_toppar_keep(rkt->rkt_ua));

        return list;
}



/**
 * Remove all partitions from a topic, including the ua.
 * Must only be called during rd_kafka_t termination.
 *
 * Locality: main thread
 */
void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt) {
        rd_kafka_toppar_t *rktp;
        rd_list_t *partitions;
        int i;

        /* Purge messages for all partitions outside the topic_wrlock since
         * a message can hold a reference to the topic_t and thus
         * would trigger a recursive lock dead-lock. */
        rd_kafka_topic_rdlock(rkt);
        partitions = rd_kafka_topic_get_all_partitions(rkt);
        rd_kafka_topic_rdunlock(rkt);

        RD_LIST_FOREACH(rktp, partitions, i) {
                rd_kafka_toppar_lock(rktp);
                rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq);
                rd_kafka_toppar_purge_and_disable_queues(rktp);
                rd_kafka_toppar_unlock(rktp);

                rd_kafka_toppar_destroy(rktp);
        }
        rd_list_destroy(partitions);

        rd_kafka_topic_keep(rkt);
        rd_kafka_topic_wrlock(rkt);

        /* Setting the partition count to 0 moves all partitions to
         * the desired list (rktp_desp). */
        rd_kafka_topic_partition_cnt_update(rkt, 0);

        /* Now clean out the desired partitions list.
         * Use reverse traversal to avoid excessive memory shuffling
         * in rd_list_remove() */
        RD_LIST_FOREACH_REVERSE(rktp, &rkt->rkt_desp, i) {
                /* Keep a reference while deleting from desired list */
                rd_kafka_toppar_keep(rktp);

                rd_kafka_toppar_lock(rktp);
                rd_kafka_toppar_desired_del(rktp);
                rd_kafka_toppar_unlock(rktp);

                rd_kafka_toppar_destroy(rktp);
        }

        rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0);

        if (rkt->rkt_p)
                rd_free(rkt->rkt_p);

        rkt->rkt_p             = NULL;
        rkt->rkt_partition_cnt = 0;

        if ((rktp = rkt->rkt_ua)) {
                rkt->rkt_ua = NULL;
                rd_kafka_toppar_destroy(rktp);
        }

        rd_kafka_topic_wrunlock(rkt);

        rd_kafka_topic_destroy0(rkt);
}



/**
 * @returns the broker state (as a human readable string) if a query
 *          for the partition leader is necessary, else NULL.
 * @locality any
 * @locks rd_kafka_toppar_lock MUST be held
 */
static const char *rd_kafka_toppar_needs_query(rd_kafka_t *rk,
                                               rd_kafka_toppar_t *rktp) {
        int broker_state;

        if (!rktp->rktp_broker)
                return "not delegated";

        if (rktp->rktp_broker->rkb_source == RD_KAFKA_INTERNAL)
                return "internal";

        broker_state = rd_kafka_broker_get_state(rktp->rktp_broker);

        if (broker_state >= RD_KAFKA_BROKER_STATE_UP)
                return NULL;

        if (!rk->rk_conf.sparse_connections)
                return "down";

        /* Partition assigned to broker but broker does not
         * need a persistent connection, this typically means
         * the partition is not being fetched or not being produced to,
         * so there is no need to re-query the leader. */
        if (broker_state == RD_KAFKA_BROKER_STATE_INIT)
                return NULL;

        /* This is most likely a persistent broker,
         * which means the partition leader should probably
         * be re-queried to see if it needs changing. */
        return "down";
}



/**
 * @brief Scan all topics and partitions for:
 *  - timed out messages in UA partitions.
 *  - topics that needs to be created on the broker.
 *  - topics who's metadata is too old.
 *  - partitions with unknown leaders that require leader query.
 *
 * @locality rdkafka main thread
 */
void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now) {
        rd_kafka_topic_t *rkt;
        rd_kafka_toppar_t *rktp;
        rd_list_t query_topics;

        rd_list_init(&query_topics, 0, rd_free);

        rd_kafka_rdlock(rk);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
                int p;
                int query_this           = 0;
                rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout);

                rd_kafka_topic_wrlock(rkt);

                /* Check if metadata information has timed out. */
                if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
                    !rd_kafka_metadata_cache_topic_get(rk, rkt->rkt_topic->str,
                                                       1 /*only valid*/)) {
                        rd_kafka_dbg(rk, TOPIC, "NOINFO",
                                     "Topic %s metadata information timed out "
                                     "(%" PRId64 "ms old)",
                                     rkt->rkt_topic->str,
                                     (rd_clock() - rkt->rkt_ts_metadata) /
                                         1000);
                        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);

                        query_this = 1;
                } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN) {
                        rd_kafka_dbg(rk, TOPIC, "NOINFO",
                                     "Topic %s metadata information unknown",
                                     rkt->rkt_topic->str);
                        query_this = 1;
                }

                /* Just need a read-lock from here on. */
                rd_kafka_topic_wrunlock(rkt);
                rd_kafka_topic_rdlock(rkt);

                if (rkt->rkt_partition_cnt == 0) {
                        /* If this topic is unknown by brokers try
                         * to create it by sending a topic-specific
                         * metadata request.
                         * This requires "auto.create.topics.enable=true"
                         * on the brokers. */
                        rd_kafka_dbg(rk, TOPIC, "NOINFO",
                                     "Topic %s partition count is zero: "
                                     "should refresh metadata",
                                     rkt->rkt_topic->str);

                        query_this = 1;

                } else if (!rd_list_empty(&rkt->rkt_desp) &&
                           rd_interval_immediate(&rkt->rkt_desp_refresh_intvl,
                                                 10 * 1000 * 1000, 0) > 0) {
                        /* Query topic metadata if there are
                         * desired (non-existent) partitions.
                         * At most every 10 seconds. */
                        rd_kafka_dbg(rk, TOPIC, "DESIRED",
                                     "Topic %s has %d desired partition(s): "
                                     "should refresh metadata",
                                     rkt->rkt_topic->str,
                                     rd_list_cnt(&rkt->rkt_desp));

                        query_this = 1;
                }

                for (p = RD_KAFKA_PARTITION_UA; p < rkt->rkt_partition_cnt;
                     p++) {

                        if (!(rktp = rd_kafka_toppar_get(
                                  rkt, p,
                                  p == RD_KAFKA_PARTITION_UA ? rd_true
                                                             : rd_false)))
                                continue;

                        rd_kafka_toppar_lock(rktp);

                        /* Check that partition is delegated to a broker that
                         * is up, else add topic to query list. */
                        if (p != RD_KAFKA_PARTITION_UA) {
                                const char *leader_reason =
                                    rd_kafka_toppar_needs_query(rk, rktp);

                                if (leader_reason) {
                                        rd_kafka_dbg(rk, TOPIC, "QRYLEADER",
                                                     "Topic %s [%" PRId32
                                                     "]: "
                                                     "broker is %s: re-query",
                                                     rkt->rkt_topic->str,
                                                     rktp->rktp_partition,
                                                     leader_reason);
                                        query_this = 1;
                                }
                        } else {
                                if (rk->rk_type == RD_KAFKA_PRODUCER) {
                                        /* Scan UA partition for message
                                         * timeouts.
                                         * Proper partitions are scanned by
                                         * their toppar broker thread. */
                                        rd_kafka_msgq_age_scan(
                                            rktp, &rktp->rktp_msgq, &timedout,
                                            now, NULL);
                                }
                        }

                        rd_kafka_toppar_unlock(rktp);
                        rd_kafka_toppar_destroy(rktp);
                }

                rd_kafka_topic_rdunlock(rkt);

                /* Propagate delivery reports for timed out messages */
                if (rd_kafka_msgq_len(&timedout) > 0) {
                        rd_kafka_dbg(
                            rk, MSG, "TIMEOUT", "%s: %d message(s) timed out",
                            rkt->rkt_topic->str, rd_kafka_msgq_len(&timedout));
                        rd_kafka_dr_msgq(rkt, &timedout,
                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
                }

                /* Need to re-query this topic's leader. */
                if (query_this &&
                    !rd_list_find(&query_topics, rkt->rkt_topic->str,
                                  (void *)strcmp))
                        rd_list_add(&query_topics,
                                    rd_strdup(rkt->rkt_topic->str));
        }
        rd_kafka_rdunlock(rk);

        if (!rd_list_empty(&query_topics))
                rd_kafka_metadata_refresh_topics(
                    rk, NULL, &query_topics, rd_true /*force even if cached
                                                      * info exists*/
                    ,
                    rk->rk_conf.allow_auto_create_topics,
                    rd_false /*!cgrp_update*/, "refresh unavailable topics");
        rd_list_destroy(&query_topics);
}


/**
 * Locks: rd_kafka_topic_*lock() must be held.
 */
int rd_kafka_topic_partition_available(const rd_kafka_topic_t *app_rkt,
                                       int32_t partition) {
        int avail;
        rd_kafka_toppar_t *rktp;
        rd_kafka_broker_t *rkb;

        /* This API must only be called from a partitioner and the
         * partitioner is always passed a proper topic */
        rd_assert(!rd_kafka_rkt_is_lw(app_rkt));

        rktp = rd_kafka_toppar_get(app_rkt, partition, 0 /*no ua-on-miss*/);
        if (unlikely(!rktp))
                return 0;

        rkb   = rd_kafka_toppar_broker(rktp, 1 /*proper broker*/);
        avail = rkb ? 1 : 0;
        if (rkb)
                rd_kafka_broker_destroy(rkb);
        rd_kafka_toppar_destroy(rktp);
        return avail;
}


void *rd_kafka_topic_opaque(const rd_kafka_topic_t *app_rkt) {
        const rd_kafka_lwtopic_t *lrkt;

        lrkt = rd_kafka_rkt_get_lw((rd_kafka_topic_t *)app_rkt);
        if (unlikely(lrkt != NULL)) {
                void *opaque;
                rd_kafka_topic_t *rkt;

                if (!(rkt = rd_kafka_topic_find(lrkt->lrkt_rk, lrkt->lrkt_topic,
                                                1 /*lock*/)))
                        return NULL;

                opaque = rkt->rkt_conf.opaque;

                rd_kafka_topic_destroy0(rkt); /* loose refcnt from find() */

                return opaque;
        }

        return app_rkt->rkt_conf.opaque;
}


int rd_kafka_topic_info_cmp(const void *_a, const void *_b) {
        const rd_kafka_topic_info_t *a = _a, *b = _b;
        int r, i;

        if ((r = strcmp(a->topic, b->topic)))
                return r;

        if ((r = RD_CMP(a->partition_cnt, b->partition_cnt)))
                return r;

        if (a->partitions_internal == NULL && b->partitions_internal == NULL)
                return 0;

        if (a->partitions_internal == NULL || b->partitions_internal == NULL)
                return (a->partitions_internal == NULL) ? 1 : -1;

        /* We're certain partitions_internal exist for a/b and have the same
         * count. */
        for (i = 0; i < a->partition_cnt; i++) {
                size_t k;
                if ((r = RD_CMP(a->partitions_internal[i].racks_cnt,
                                b->partitions_internal[i].racks_cnt)))
                        return r;

                for (k = 0; k < a->partitions_internal[i].racks_cnt; k++) {
                        if ((r = rd_strcmp(a->partitions_internal[i].racks[k],
                                           b->partitions_internal[i].racks[k])))
                                return r;
                }
        }

        return 0;
}


/**
 * @brief string compare two topics.
 *
 * @param _a topic string (type char *)
 * @param _b rd_kafka_topic_info_t * pointer.
 */
int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b) {
        const char *a                  = _a;
        const rd_kafka_topic_info_t *b = _b;
        return strcmp(a, b->topic);
}


/**
 * Allocate new topic_info.
 * \p topic is copied.
 */
rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic,
                                               int partition_cnt) {
        rd_kafka_topic_info_t *ti;
        size_t tlen = strlen(topic) + 1;

        /* Allocate space for the topic along with the struct */
        ti        = rd_malloc(sizeof(*ti) + tlen);
        ti->topic = (char *)(ti + 1);
        memcpy((char *)ti->topic, topic, tlen);
        ti->partition_cnt       = partition_cnt;
        ti->partitions_internal = NULL;

        return ti;
}

/**
 * Allocate new topic_info, including rack information.
 * \p topic is copied.
 */
rd_kafka_topic_info_t *rd_kafka_topic_info_new_with_rack(
    const char *topic,
    int partition_cnt,
    const rd_kafka_metadata_partition_internal_t *mdpi) {
        rd_kafka_topic_info_t *ti;
        rd_tmpabuf_t tbuf;
        int i;
        rd_bool_t has_racks = rd_false;

        rd_tmpabuf_new(&tbuf, 0, rd_true /* assert on fail */);

        rd_tmpabuf_add_alloc(&tbuf, sizeof(*ti));
        rd_tmpabuf_add_alloc(&tbuf, strlen(topic) + 1);
        for (i = 0; i < partition_cnt; i++) {
                size_t j;
                if (!mdpi[i].racks)
                        continue;

                if (unlikely(!has_racks))
                        has_racks = rd_true;

                for (j = 0; j < mdpi[i].racks_cnt; j++) {
                        rd_tmpabuf_add_alloc(&tbuf,
                                             strlen(mdpi[i].racks[j]) + 1);
                }
                rd_tmpabuf_add_alloc(&tbuf, sizeof(char *) * mdpi[i].racks_cnt);
        }

        /* Only bother allocating this if at least one
         * rack is there. */
        if (has_racks) {
                rd_tmpabuf_add_alloc(
                    &tbuf, sizeof(rd_kafka_metadata_partition_internal_t) *
                               partition_cnt);
        }

        rd_tmpabuf_finalize(&tbuf);

        ti                      = rd_tmpabuf_alloc(&tbuf, sizeof(*ti));
        ti->topic               = rd_tmpabuf_write_str(&tbuf, topic);
        ti->partition_cnt       = partition_cnt;
        ti->partitions_internal = NULL;

        if (has_racks) {
                ti->partitions_internal = rd_tmpabuf_alloc(
                    &tbuf, sizeof(*ti->partitions_internal) * partition_cnt);

                for (i = 0; i < partition_cnt; i++) {
                        size_t j;
                        ti->partitions_internal[i].id    = mdpi[i].id;
                        ti->partitions_internal[i].racks = NULL;

                        if (!mdpi[i].racks)
                                continue;

                        ti->partitions_internal[i].racks_cnt =
                            mdpi[i].racks_cnt;
                        ti->partitions_internal[i].racks = rd_tmpabuf_alloc(
                            &tbuf, sizeof(char *) * mdpi[i].racks_cnt);

                        for (j = 0; j < mdpi[i].racks_cnt; j++) {
                                ti->partitions_internal[i].racks[j] =
                                    rd_tmpabuf_write_str(&tbuf,
                                                         mdpi[i].racks[j]);
                        }
                }
        }

        return ti;
}

/**
 * Destroy/free topic_info
 */
void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti) {
        rd_free(ti);
}


/**
 * @brief Match \p topic to \p pattern.
 *
 * If pattern begins with "^" it is considered a regexp,
 * otherwise a simple string comparison is performed.
 *
 * @returns 1 on match, else 0.
 */
int rd_kafka_topic_match(rd_kafka_t *rk,
                         const char *pattern,
                         const char *topic) {
        char errstr[128];

        if (*pattern == '^') {
                int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr));
                if (unlikely(r == -1))
                        rd_kafka_dbg(rk, TOPIC, "TOPICREGEX",
                                     "Topic \"%s\" regex \"%s\" "
                                     "matching failed: %s",
                                     topic, pattern, errstr);
                return r == 1;
        } else
                return !strcmp(pattern, topic);
}



/**
 * @brief Trigger broker metadata query for topic leader.
 *
 * @locks none
 */
void rd_kafka_topic_leader_query0(rd_kafka_t *rk,
                                  rd_kafka_topic_t *rkt,
                                  int do_rk_lock,
                                  rd_bool_t force) {
        rd_list_t topics;

        rd_list_init(&topics, 1, rd_free);
        rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));

        rd_kafka_metadata_refresh_topics(
            rk, NULL, &topics, force, rk->rk_conf.allow_auto_create_topics,
            rd_false /*!cgrp_update*/, "leader query");

        rd_list_destroy(&topics);
}



/**
 * @brief Populate list \p topics with the topic names (strdupped char *) of
 *        all locally known or cached topics.
 *
 * @param cache_cntp is an optional pointer to an int that will be set to the
 *                   number of entries added to \p topics from the
 *                   metadata cache.
 * @remark \p rk lock MUST NOT be held
 */
void rd_kafka_local_topics_to_list(rd_kafka_t *rk,
                                   rd_list_t *topics,
                                   int *cache_cntp) {
        rd_kafka_topic_t *rkt;
        int cache_cnt;

        rd_kafka_rdlock(rk);
        rd_list_grow(topics, rk->rk_topic_cnt);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
        rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
        cache_cnt = rd_kafka_metadata_cache_topics_to_list(rk, topics);
        if (cache_cntp)
                *cache_cntp = cache_cnt;
        rd_kafka_rdunlock(rk);
}


/**
 * @brief Unit test helper to set a topic's state to EXISTS
 *        with the given number of partitions.
 */
void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,
                                        int partition_cnt,
                                        int32_t leader_id) {
        rd_kafka_metadata_partition_internal_t *partitions =
            rd_calloc(partition_cnt, sizeof(*partitions));
        struct rd_kafka_metadata_topic mdt      = {.topic =
                                                  (char *)rkt->rkt_topic->str,
                                              .partition_cnt = partition_cnt};
        rd_kafka_metadata_topic_internal_t mdit = {.partitions = partitions};
        int i;

        mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt);

        for (i = 0; i < partition_cnt; i++) {
                memset(&mdt.partitions[i], 0, sizeof(mdt.partitions[i]));
                mdt.partitions[i].id     = i;
                mdt.partitions[i].leader = leader_id;
        }

        rd_kafka_wrlock(rkt->rkt_rk);
        rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
                                             rd_false, NULL, 0);
        rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
        rd_kafka_wrunlock(rkt->rkt_rk);
        rd_free(partitions);
}
