/*
 * librdkafka - The Apache Kafka C/C++ library
 *
 * Copyright (c) 2020 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


/**
 * @name Consumer assignment state.
 *
 * Responsible for managing the state of assigned partitions.
 *
 *
 ******************************************************************************
 * rd_kafka_assignment_serve()
 * ---------------------------
 *
 * It is important to call rd_kafka_assignment_serve() after each change
 * to the assignment through assignment_add, assignment_subtract or
 * assignment_clear as those functions only modify the assignment but does
 * not take any action to transition partitions to or from the assignment
 * states.
 *
 * The reason assignment_serve() is not automatically called from these
 * functions is for the caller to be able to set the current state before
 * the side-effects of serve() kick in, such as the call to
 * rd_kafka_cgrp_assignment_done() that in turn will set the cgrp state.
 *
 *
 *
 ******************************************************************************
 * Querying for committed offsets (.queried list)
 * ----------------------------------------------
 *
 * We only allow one outstanding query (fetch committed offset), this avoids
 * complex handling of partitions that are assigned, unassigned and reassigned
 * all within the window of a OffsetFetch request.
 * Consider the following case:
 *
 *  1. tp1 and tp2 are incrementally assigned.
 *  2. An OffsetFetchRequest is sent for tp1 and tp2
 *  3. tp2 is incremental unassigned.
 *  4. Broker sends OffsetFetchResponse with offsets tp1=10, tp2=20.
 *  4. Some other consumer commits offsets 30 for tp2.
 *  5. tp2 is incrementally assigned again.
 *  6. The OffsetFetchResponse is received.
 *
 * Without extra handling the consumer would start fetching tp1 at offset 10
 * (which is correct) and tp2 at offset 20 (which is incorrect, the last
 *  committed offset is now 30).
 *
 * To alleviate this situation we remove unassigned partitions from the
 * .queried list, and in the OffsetFetch response handler we only use offsets
 * for partitions that are on the .queried list.
 *
 * To make sure the tp1 offset is used and not re-queried we only allow
 * one outstanding OffsetFetch request at the time, meaning that at step 5
 * a new OffsetFetch request will not be sent and tp2 will remain in the
 * .pending list until the outstanding OffsetFetch response is received in
 * step 6. At this point tp2 will transition to .queried and a new
 * OffsetFetch request will be sent.
 *
 * This explanation is more verbose than the code involved.
 *
 ******************************************************************************
 *
 *
 * @remark Try to keep any cgrp state out of this file.
 *
 * FIXME: There are some pretty obvious optimizations that needs to be done here
 *        with regards to partition_list_t lookups. But we can do that when
 *        we know the current implementation works correctly.
 */

#include "rdkafka_int.h"
#include "rdkafka_offset.h"
#include "rdkafka_request.h"


static void rd_kafka_assignment_dump (rd_kafka_t *rk) {
        rd_kafka_dbg(rk, CGRP, "DUMP",
                     "Assignment dump (started_cnt=%d, wait_stop_cnt=%d)",
                     rk->rk_consumer.assignment.started_cnt,
                     rk->rk_consumer.assignment.wait_stop_cnt);

        rd_kafka_topic_partition_list_log(
                rk, "DUMP_ALL", RD_KAFKA_DBG_CGRP,
                rk->rk_consumer.assignment.all);

        rd_kafka_topic_partition_list_log(
                rk, "DUMP_PND", RD_KAFKA_DBG_CGRP,
                rk->rk_consumer.assignment.pending);

        rd_kafka_topic_partition_list_log(
                rk, "DUMP_QRY", RD_KAFKA_DBG_CGRP,
                rk->rk_consumer.assignment.queried);

        rd_kafka_topic_partition_list_log(
                rk, "DUMP_REM", RD_KAFKA_DBG_CGRP,
                rk->rk_consumer.assignment.removed);
}

/**
 * @brief Apply the fetched committed offsets to the current assignment's
 *        queried partitions.
 *
 * @param err is the request-level error, if any. The caller is responsible
 *            for raising this error to the application. It is only used here
 *            to avoid taking actions.
 *
 * Called from the FetchOffsets response handler below.
 */
static void
rd_kafka_assignment_apply_offsets (rd_kafka_t *rk,
                                   rd_kafka_topic_partition_list_t *offsets,
                                   rd_kafka_resp_err_t err) {
        rd_kafka_topic_partition_t *rktpar;

        RD_KAFKA_TPLIST_FOREACH(rktpar, offsets) {
                rd_kafka_toppar_t *rktp = rktpar->_private; /* May be NULL */

                if (!rd_kafka_topic_partition_list_del(
                            rk->rk_consumer.assignment.queried,
                            rktpar->topic, rktpar->partition)) {
                        rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
                                     "Ignoring OffsetFetch "
                                     "response for %s [%"PRId32"] which is no "
                                     "longer in the queried list "
                                     "(possibly unassigned?)",
                                     rktpar->topic, rktpar->partition);
                        continue;
                }

                if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
                    rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
                        /* Ongoing transactions are blocking offset retrieval.
                         * This is typically retried from the OffsetFetch
                         * handler but we can come here if the assignment
                         * (and thus the assignment.version) was changed while
                         * the OffsetFetch request was in-flight, in which case
                         * we put this partition back on the pending list for
                         * later handling by the assignment state machine. */

                        rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
                                     "Adding %s [%"PRId32"] back to pending "
                                     "list because on-going transaction is "
                                     "blocking offset retrieval",
                                     rktpar->topic,
                                     rktpar->partition);

                        rd_kafka_topic_partition_list_add_copy(
                                rk->rk_consumer.assignment.pending, rktpar);

                } else if (rktpar->err) {
                        /* Partition-level error */
                        rd_kafka_consumer_err(
                                rk->rk_consumer.q, RD_KAFKA_NODEID_UA,
                                rktpar->err, 0,
                                rktpar->topic, rktp,
                                RD_KAFKA_OFFSET_INVALID,
                                "Failed to fetch committed offset for "
                                "group \"%s\" topic %s [%"PRId32"]: %s",
                                rk->rk_group_id->str,
                                rktpar->topic, rktpar->partition,
                                rd_kafka_err2str(rktpar->err));

                        /* The partition will not be added back to .pending
                         * and thus only reside on .all until the application
                         * unassigns it and possible re-assigns it. */

                } else if (!err) {
                        /* If rktpar->offset is RD_KAFKA_OFFSET_INVALID it means
                         * there was no committed offset for this partition.
                         * serve_pending() will now start this partition
                         * since the offset is set to INVALID (rather than
                         * STORED) and the partition fetcher will employ
                         * auto.offset.reset to know what to do. */

                        /* Add partition to pending list where serve()
                         * will start the fetcher. */
                        rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
                                     "Adding %s [%"PRId32"] back to pending "
                                     "list with offset %s",
                                     rktpar->topic,
                                     rktpar->partition,
                                     rd_kafka_offset2str(rktpar->offset));

                        rd_kafka_topic_partition_list_add_copy(
                                rk->rk_consumer.assignment.pending, rktpar);
                }
                /* Do nothing for request-level errors (err is set). */

        }

        if (offsets->cnt > 0)
                rd_kafka_assignment_serve(rk);
}



/**
 * @brief Reply handler for OffsetFetch queries from the assignment code.
 *
 * @param opaque Is a malloced int64_t* containing the assignment version at the
 *               time of the request.
 *
 * @locality rdkafka main thread
 */
static void
rd_kafka_assignment_handle_OffsetFetch (rd_kafka_t *rk,
                                        rd_kafka_broker_t *rkb,
                                        rd_kafka_resp_err_t err,
                                        rd_kafka_buf_t *reply,
                                        rd_kafka_buf_t *request,
                                        void *opaque) {
        rd_kafka_topic_partition_list_t *offsets = NULL;
        int64_t *req_assignment_version = (int64_t *)opaque;
        /* Only allow retries if there's been no change to the assignment,
         * otherwise rely on assignment state machine to retry. */
        rd_bool_t allow_retry = *req_assignment_version ==
                rk->rk_consumer.assignment.version;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* Termination, quick cleanup. */
                rd_free(req_assignment_version);
                return;
        }

        err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
                                          reply, request, &offsets,
                                          rd_true/* Update toppars */,
                                          rd_true/* Add parts */,
                                          allow_retry);
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
                if (offsets)
                        rd_kafka_topic_partition_list_destroy(offsets);
                return; /* retrying */
        }

        rd_free(req_assignment_version);

        /* offsets may be NULL for certain errors, such
         * as ERR__TRANSPORT. */
        if (!offsets && !allow_retry) {
                rd_dassert(err);
                if (!err)
                        err = RD_KAFKA_RESP_ERR__NO_OFFSET;

                rd_kafka_dbg(rk, CGRP, "OFFSET",
                             "Offset fetch error: %s",
                             rd_kafka_err2str(err));
                rd_kafka_consumer_err(rk->rk_consumer.q,
                                      rd_kafka_broker_id(rkb),
                                      err, 0, NULL, NULL,
                                      RD_KAFKA_OFFSET_INVALID,
                                      "Failed to fetch committed "
                                      "offsets for partitions "
                                      "in group \"%s\": %s",
                                      rk->rk_group_id->str,
                                      rd_kafka_err2str(err));

                return;
        }



        if (err) {
                rd_kafka_dbg(rk, CGRP, "OFFSET",
                             "Offset fetch error for %d partition(s): %s",
                             offsets->cnt, rd_kafka_err2str(err));
                rd_kafka_consumer_err(rk->rk_consumer.q,
                                      rd_kafka_broker_id(rkb),
                                      err, 0, NULL, NULL,
                                      RD_KAFKA_OFFSET_INVALID,
                                      "Failed to fetch committed offsets for "
                                      "%d partition(s) in group \"%s\": %s",
                                      offsets->cnt,
                                      rk->rk_group_id->str,
                                      rd_kafka_err2str(err));
        }

        /* Apply the fetched offsets to the assignment */
        rd_kafka_assignment_apply_offsets(rk, offsets, err);

        rd_kafka_topic_partition_list_destroy(offsets);
}


/**
 * @brief Decommission all partitions in the removed list.
 *
 * @returns >0 if there are removal operations in progress, else 0.
 */
static int
rd_kafka_assignment_serve_removals (rd_kafka_t *rk) {
        rd_kafka_topic_partition_t *rktpar;
        int valid_offsets = 0;

        RD_KAFKA_TPLIST_FOREACH(rktpar, rk->rk_consumer.assignment.removed) {
                rd_kafka_toppar_t *rktp = rktpar->_private; /* Borrow ref */
                int was_pending, was_queried;

                /* Remove partition from pending and querying lists,
                 * if it happens to be there.
                 * Outstanding OffsetFetch query results will be ignored
                 * for partitions that are no longer on the .queried list. */
                was_pending = rd_kafka_topic_partition_list_del(
                        rk->rk_consumer.assignment.pending,
                        rktpar->topic, rktpar->partition);
                was_queried = rd_kafka_topic_partition_list_del(
                        rk->rk_consumer.assignment.queried,
                        rktpar->topic, rktpar->partition);

                if (rktp->rktp_started) {
                        /* Partition was started, stop the fetcher. */
                        rd_assert(rk->rk_consumer.assignment.started_cnt > 0);

                        rd_kafka_toppar_op_fetch_stop(
                                rktp, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
                        rk->rk_consumer.assignment.wait_stop_cnt++;
                }

                /* Reset the (lib) pause flag which may have been set by
                 * the cgrp when scheduling the rebalance callback. */
                rd_kafka_toppar_op_pause_resume(rktp,
                                                rd_false/*resume*/,
                                                RD_KAFKA_TOPPAR_F_LIB_PAUSE,
                                                RD_KAFKA_NO_REPLYQ);

                rd_kafka_toppar_lock(rktp);

                /* Save the currently stored offset on .removed
                 * so it will be committed below. */
                rktpar->offset = rktp->rktp_stored_offset;
                valid_offsets += !RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset);

                /* Reset the stored offset to invalid so that
                 * a manual offset-less commit() or the auto-committer
                 * will not commit a stored offset from a previous
                 * assignment (issue #2782). */
                rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID,
                                       RD_DONT_LOCK);

                /* Partition is no longer desired */
                rd_kafka_toppar_desired_del(rktp);
                rd_kafka_toppar_unlock(rktp);

                rd_kafka_dbg(rk, CGRP, "REMOVE",
                             "Removing %s [%"PRId32"] from assignment "
                             "(started=%s, pending=%s, queried=%s, "
                             "stored offset=%s)",
                             rktpar->topic, rktpar->partition,
                             RD_STR_ToF(rktp->rktp_started),
                             RD_STR_ToF(was_pending),
                             RD_STR_ToF(was_queried),
                             rd_kafka_offset2str(rktpar->offset));
        }

        rd_kafka_dbg(rk, CONSUMER|RD_KAFKA_DBG_CGRP, "REMOVE",
                     "Served %d removed partition(s), "
                     "with %d offset(s) to commit",
                     rk->rk_consumer.assignment.removed->cnt, valid_offsets);

        /* If enable.auto.commit=true:
         * Commit final offsets to broker for the removed partitions,
         * unless this is a consumer destruction with a close() call. */
        if (valid_offsets > 0 &&
            rk->rk_conf.offset_store_method ==
            RD_KAFKA_OFFSET_METHOD_BROKER &&
            rk->rk_cgrp &&
            rk->rk_conf.enable_auto_commit &&
            !rd_kafka_destroy_flags_no_consumer_close(rk))
                rd_kafka_cgrp_assigned_offsets_commit(
                        rk->rk_cgrp,
                        rk->rk_consumer.assignment.removed,
                        rd_false /* use offsets from .removed */,
                        "unassigned partitions");

        rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.removed);

        return rk->rk_consumer.assignment.wait_stop_cnt +
                rk->rk_consumer.wait_commit_cnt;
}


/**
 * @brief Serve all partitions in the pending list.
 *
 * This either (asynchronously) queries the partition's committed offset, or
 * if the start offset is known, starts the partition fetcher.
 *
 * @returns >0 if there are pending operations in progress for the current
 *          assignment, else 0.
 */
static int
rd_kafka_assignment_serve_pending (rd_kafka_t *rk) {
        rd_kafka_topic_partition_list_t *partitions_to_query = NULL;
        /* We can query committed offsets only if all of the following are true:
         *  - We have a group coordinator.
         *  - There are no outstanding commits (since we might need to
         *    read back those commits as our starting position).
         *  - There are no outstanding queries already (since we want to
         *    avoid using a earlier queries response for a partition that
         *    is unassigned and then assigned again).
         */
        rd_kafka_broker_t *coord =
                rk->rk_cgrp ? rd_kafka_cgrp_get_coord(rk->rk_cgrp) : NULL;
        rd_bool_t can_query_offsets =
                coord &&
                rk->rk_consumer.wait_commit_cnt == 0 &&
                rk->rk_consumer.assignment.queried->cnt == 0;
        int i;

        if (can_query_offsets)
                partitions_to_query = rd_kafka_topic_partition_list_new(
                        rk->rk_consumer.assignment.pending->cnt);

        /* Scan the list backwards so removals are cheap (no array shuffle) */
        for (i = rk->rk_consumer.assignment.pending->cnt - 1 ; i >= 0 ; i--) {
                rd_kafka_topic_partition_t *rktpar =
                        &rk->rk_consumer.assignment.pending->elems[i];
                rd_kafka_toppar_t *rktp = rktpar->_private; /* Borrow ref */

                rd_assert(!rktp->rktp_started);

                if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) ||
                    rktpar->offset == RD_KAFKA_OFFSET_BEGINNING ||
                    rktpar->offset == RD_KAFKA_OFFSET_END ||
                    rktpar->offset == RD_KAFKA_OFFSET_INVALID ||
                    rktpar->offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
                        /* The partition fetcher can handle absolute
                         * as well as beginning/end/tail start offsets, so we're
                         * ready to start the fetcher now.
                         * The INVALID offset means there was no committed
                         * offset and the partition fetcher will employ
                         * auto.offset.reset.
                         *
                         * Start fetcher for partition and forward partition's
                         * fetchq to consumer group's queue. */

                        rd_kafka_dbg(rk, CGRP, "SRVPEND",
                                     "Starting pending assigned partition "
                                     "%s [%"PRId32"] at offset %s",
                                     rktpar->topic, rktpar->partition,
                                     rd_kafka_offset2str(rktpar->offset));

                        /* Reset the (lib) pause flag which may have been set by
                         * the cgrp when scheduling the rebalance callback. */
                        rd_kafka_toppar_op_pause_resume(
                                rktp,
                                rd_false/*resume*/,
                                RD_KAFKA_TOPPAR_F_LIB_PAUSE,
                                RD_KAFKA_NO_REPLYQ);

                        /* Start the fetcher */
                        rktp->rktp_started = rd_true;
                        rk->rk_consumer.assignment.started_cnt++;

                        rd_kafka_toppar_op_fetch_start(
                                rktp, rktpar->offset,
                                rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);


                } else if (can_query_offsets) {
                        /* Else use the last committed offset for partition.
                         * We can't rely on any internal cached committed offset
                         * so we'll accumulate a list of partitions that need
                         * to be queried and then send FetchOffsetsRequest
                         * to the group coordinator. */

                        rd_dassert(!rd_kafka_topic_partition_list_find(
                                           rk->rk_consumer.assignment.queried,
                                           rktpar->topic, rktpar->partition));

                        rd_kafka_topic_partition_list_add_copy(
                                partitions_to_query, rktpar);

                        rd_kafka_topic_partition_list_add_copy(
                                rk->rk_consumer.assignment.queried, rktpar);

                        rd_kafka_dbg(rk, CGRP, "SRVPEND",
                                     "Querying committed offset for pending "
                                     "assigned partition %s [%"PRId32"]",
                                     rktpar->topic, rktpar->partition);


                } else {
                        rd_kafka_dbg(rk, CGRP, "SRVPEND",
                                     "Pending assignment partition "
                                     "%s [%"PRId32"] can't fetch committed "
                                     "offset yet "
                                     "(cgrp state %s, awaiting %d commits, "
                                     "%d partition(s) already being queried)",
                                     rktpar->topic, rktpar->partition,
                                     rk->rk_cgrp ?
                                     rd_kafka_cgrp_state_names[
                                             rk->rk_cgrp->rkcg_state] :
                                     "n/a",
                                     rk->rk_consumer.wait_commit_cnt,
                                     rk->rk_consumer.assignment.queried->cnt);

                        continue; /* Keep rktpar on pending list */
                }

                /* Remove rktpar from the pending list */
                rd_kafka_topic_partition_list_del_by_idx(
                        rk->rk_consumer.assignment.pending, i);
        }


        if (!can_query_offsets) {
                if (coord)
                        rd_kafka_broker_destroy(coord);
                return rk->rk_consumer.assignment.pending->cnt +
                        rk->rk_consumer.assignment.queried->cnt;
        }


        if (partitions_to_query->cnt > 0) {
                int64_t *req_assignment_version = rd_malloc(sizeof(int64_t));
                *req_assignment_version = rk->rk_consumer.assignment.version;

                rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
                             "Fetching committed offsets for "
                             "%d pending partition(s) in assignment",
                             partitions_to_query->cnt);

                rd_kafka_OffsetFetchRequest(
                        coord,
                        partitions_to_query,
                        rk->rk_conf.isolation_level ==
                        RD_KAFKA_READ_COMMITTED/*require_stable*/,
                        RD_KAFKA_REPLYQ(rk->rk_ops, 0),
                        rd_kafka_assignment_handle_OffsetFetch,
                        /* Must be freed by handler */
                        (void *)req_assignment_version);
        }

        if (coord)
                rd_kafka_broker_destroy(coord);

        rd_kafka_topic_partition_list_destroy(partitions_to_query);

        return rk->rk_consumer.assignment.pending->cnt +
                rk->rk_consumer.assignment.queried->cnt;
}



/**
 * @brief Serve updates to the assignment.
 *
 * Call on:
 * - assignment changes
 * - wait_commit_cnt reaches 0
 * - partition fetcher is stopped
 */
void rd_kafka_assignment_serve (rd_kafka_t *rk) {
        int inp_removals = 0;
        int inp_pending = 0;

        rd_kafka_assignment_dump(rk);

        /* Serve any partitions that should be removed */
        if (rk->rk_consumer.assignment.removed->cnt > 0)
                inp_removals = rd_kafka_assignment_serve_removals(rk);

        /* Serve any partitions in the pending list that need further action,
         * unless we're waiting for a previous assignment change (an unassign
         * in some form) to propagate, or outstanding offset commits
         * to finish (since we might need the committed offsets as start
         * offsets). */
        if (rk->rk_consumer.assignment.wait_stop_cnt == 0 &&
            rk->rk_consumer.wait_commit_cnt == 0 &&
            inp_removals == 0 &&
            rk->rk_consumer.assignment.pending->cnt > 0)
                inp_pending = rd_kafka_assignment_serve_pending(rk);

        if (inp_removals + inp_pending +
            rk->rk_consumer.assignment.queried->cnt +
            rk->rk_consumer.assignment.wait_stop_cnt +
            rk->rk_consumer.wait_commit_cnt == 0) {
                /* No assignment operations in progress,
                 * signal assignment done back to cgrp to let it
                 * transition to its next state if necessary.
                 * We may emit this signalling more than necessary and it is
                 * up to the cgrp to only take action if needed, based on its
                 * state. */
                rd_kafka_cgrp_assignment_done(rk->rk_cgrp);
        } else {
                rd_kafka_dbg(rk, CGRP, "ASSIGNMENT",
                             "Current assignment of %d partition(s) "
                             "with %d pending adds, %d offset queries, "
                             "%d partitions awaiting stop and "
                             "%d offset commits in progress",
                             rk->rk_consumer.assignment.all->cnt,
                             inp_pending,
                             rk->rk_consumer.assignment.queried->cnt,
                             rk->rk_consumer.assignment.wait_stop_cnt,
                             rk->rk_consumer.wait_commit_cnt);
        }
}


/**
 * @returns true if the current or previous assignment has operations in
 *          progress, such as waiting for partition fetchers to stop.
 */
rd_bool_t rd_kafka_assignment_in_progress (rd_kafka_t *rk) {
        return rk->rk_consumer.wait_commit_cnt > 0 ||
                rk->rk_consumer.assignment.wait_stop_cnt > 0 ||
                rk->rk_consumer.assignment.pending->cnt > 0 ||
                rk->rk_consumer.assignment.queried->cnt > 0 ||
                rk->rk_consumer.assignment.removed->cnt > 0;
}


/**
 * @brief Clear the current assignment.
 *
 * @remark Make sure to call rd_kafka_assignment_serve() after successful
 *         return from this function.
 *
 * @returns the number of partitions removed.
 */
int rd_kafka_assignment_clear (rd_kafka_t *rk) {
        int cnt = rk->rk_consumer.assignment.all->cnt;

        if (cnt == 0) {
                rd_kafka_dbg(rk, CONSUMER|RD_KAFKA_DBG_CGRP,
                             "CLEARASSIGN",
                             "No current assignment to clear");
                return 0;
        }

        rd_kafka_dbg(rk, CONSUMER|RD_KAFKA_DBG_CGRP, "CLEARASSIGN",
                     "Clearing current assignment of %d partition(s)",
                     rk->rk_consumer.assignment.all->cnt);

        rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.pending);
        rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.queried);

        rd_kafka_topic_partition_list_add_list(
                rk->rk_consumer.assignment.removed,
                rk->rk_consumer.assignment.all);
        rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.all);

        rk->rk_consumer.assignment.version++;

        return cnt;
}


/**
 * @brief Adds \p partitions to the current assignment.
 *
 * Will return error if trying to add a partition that is already in the
 * assignment.
 *
 * @remark Make sure to call rd_kafka_assignment_serve() after successful
 *         return from this function.
 */
rd_kafka_error_t *
rd_kafka_assignment_add (rd_kafka_t *rk,
                         rd_kafka_topic_partition_list_t *partitions) {
        rd_bool_t was_empty = rk->rk_consumer.assignment.all->cnt == 0;
        int i;

        /* Make sure there are no duplicates, invalid partitions, or
         * invalid offsets in the input partitions. */
        rd_kafka_topic_partition_list_sort(partitions, NULL, NULL);

        for (i = 0 ; i < partitions->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
                const rd_kafka_topic_partition_t *prev =
                        i > 0 ? &partitions->elems[i-1] : NULL;

                if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
                    rktpar->offset != RD_KAFKA_OFFSET_BEGINNING &&
                    rktpar->offset != RD_KAFKA_OFFSET_END &&
                    rktpar->offset != RD_KAFKA_OFFSET_STORED &&
                    rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
                    rktpar->offset > RD_KAFKA_OFFSET_TAIL_BASE)
                        return rd_kafka_error_new(
                                RD_KAFKA_RESP_ERR__INVALID_ARG,
                                "%s [%"PRId32"] has invalid start offset %"
                                PRId64,
                                rktpar->topic, rktpar->partition,
                                rktpar->offset);

                if (prev && !rd_kafka_topic_partition_cmp(rktpar, prev))
                        return rd_kafka_error_new(
                                RD_KAFKA_RESP_ERR__INVALID_ARG,
                                "Duplicate %s [%"PRId32"] in input list",
                                rktpar->topic, rktpar->partition);

                if (rd_kafka_topic_partition_list_find(
                            rk->rk_consumer.assignment.all,
                            rktpar->topic, rktpar->partition))
                        return rd_kafka_error_new(
                                RD_KAFKA_RESP_ERR__CONFLICT,
                                "%s [%"PRId32"] is already part of the "
                                "current assignment",
                                rktpar->topic, rktpar->partition);

                /* Translate RD_KAFKA_OFFSET_INVALID to RD_KAFKA_OFFSET_STORED,
                 * i.e., read from committed offset, since we use INVALID
                 * internally to differentiate between querying for
                 * committed offset (STORED) and no committed offset (INVALID).
                 */
                if (rktpar->offset == RD_KAFKA_OFFSET_INVALID)
                        rktpar->offset = RD_KAFKA_OFFSET_STORED;

                /* Get toppar object for each partition.
                 * This is to make sure the rktp stays alive while unassigning
                 * any previous assignment in the call to
                 * assignment_clear() below. */
                rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
                                                       rd_true);
        }

        /* Add the new list of partitions to the current assignment.
         * Only need to sort the final assignment if it was non-empty
         * to begin with since \p partitions is sorted above. */
        rd_kafka_topic_partition_list_add_list(rk->rk_consumer.assignment.all,
                                               partitions);
        if (!was_empty)
                rd_kafka_topic_partition_list_sort(rk->rk_consumer.
                                                   assignment.all,
                                                   NULL, NULL);

        /* And add to .pending for serve_pending() to handle. */
        rd_kafka_topic_partition_list_add_list(rk->rk_consumer.
                                               assignment.pending,
                                               partitions);


        rd_kafka_dbg(rk, CONSUMER|RD_KAFKA_DBG_CGRP, "ASSIGNMENT",
                     "Added %d partition(s) to assignment which "
                     "now consists of %d partition(s) where of %d are in "
                     "pending state and %d are being queried",
                     partitions->cnt,
                     rk->rk_consumer.assignment.all->cnt,
                     rk->rk_consumer.assignment.pending->cnt,
                     rk->rk_consumer.assignment.queried->cnt);

        rk->rk_consumer.assignment.version++;

        return NULL;
}


/**
 * @brief Remove \p partitions from the current assignment.
 *
 * Will return error if trying to remove a partition that is not in the
 * assignment.
 *
 * @remark Make sure to call rd_kafka_assignment_serve() after successful
 *         return from this function.
 */
rd_kafka_error_t *
rd_kafka_assignment_subtract (rd_kafka_t *rk,
                              rd_kafka_topic_partition_list_t *partitions) {
        int i;
        int matched_queried_partitions = 0;
        int assignment_pre_cnt;

        if (rk->rk_consumer.assignment.all->cnt == 0 && partitions->cnt > 0)
                return rd_kafka_error_new(
                        RD_KAFKA_RESP_ERR__INVALID_ARG,
                        "Can't subtract from empty assignment");

        /* Verify that all partitions in \p partitions are in the assignment
         * before starting to modify the assignment. */
        rd_kafka_topic_partition_list_sort(partitions, NULL, NULL);

        for (i = 0 ; i < partitions->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];

                if (!rd_kafka_topic_partition_list_find(
                            rk->rk_consumer.assignment.all,
                            rktpar->topic, rktpar->partition))
                        return rd_kafka_error_new(
                                RD_KAFKA_RESP_ERR__INVALID_ARG,
                                "%s [%"PRId32"] can't be unassigned since "
                                "it is not in the current assignment",
                                rktpar->topic, rktpar->partition);

                rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
                                                       rd_true);
        }


        assignment_pre_cnt = rk->rk_consumer.assignment.all->cnt;

        /* Remove partitions in reverse order to avoid excessive
         * array shuffling of .all.
         * Add the removed partitions to .pending for serve() to handle. */
        for (i = partitions->cnt-1 ; i >= 0 ; i--) {
                const rd_kafka_topic_partition_t *rktpar =
                        &partitions->elems[i];

                if (!rd_kafka_topic_partition_list_del(
                            rk->rk_consumer.assignment.all,
                            rktpar->topic, rktpar->partition))
                        RD_BUG("Removed partition %s [%"PRId32"] not found "
                               "in assignment.all",
                               rktpar->topic, rktpar->partition);

                if (rd_kafka_topic_partition_list_del(
                            rk->rk_consumer.assignment.queried,
                            rktpar->topic, rktpar->partition))
                        matched_queried_partitions++;
                else
                        rd_kafka_topic_partition_list_del(
                                rk->rk_consumer.assignment.pending,
                                rktpar->topic, rktpar->partition);

                /* Add to .removed list which will be served by
                 * serve_removals(). */
                rd_kafka_topic_partition_list_add_copy(
                        rk->rk_consumer.assignment.removed, rktpar);
        }

        rd_kafka_dbg(rk, CGRP, "REMOVEASSIGN",
                     "Removed %d partition(s) "
                     "(%d with outstanding offset queries) from assignment "
                     "of %d partition(s)",
                     partitions->cnt,
                     matched_queried_partitions, assignment_pre_cnt);

        if (rk->rk_consumer.assignment.all->cnt == 0) {
                /* Some safe checking */
                rd_assert(rk->rk_consumer.assignment.pending->cnt == 0);
                rd_assert(rk->rk_consumer.assignment.queried->cnt == 0);
        }

        rk->rk_consumer.assignment.version++;

        return NULL;
}


/**
 * @brief Call when partition fetcher has stopped.
 */
void rd_kafka_assignment_partition_stopped (rd_kafka_t *rk,
                                            rd_kafka_toppar_t *rktp) {
        rd_assert(rk->rk_consumer.assignment.wait_stop_cnt > 0);
        rk->rk_consumer.assignment.wait_stop_cnt--;

        rd_assert(rktp->rktp_started);
        rktp->rktp_started = rd_false;

        rd_assert(rk->rk_consumer.assignment.started_cnt > 0);
        rk->rk_consumer.assignment.started_cnt--;

        /* If this was the last partition we awaited stop for, serve the
         * assignment to transition any existing assignment to the next state */
        if (rk->rk_consumer.assignment.wait_stop_cnt == 0) {
                rd_kafka_dbg(rk, CGRP, "STOPSERVE",
                             "All partitions awaiting stop are now "
                             "stopped: serving assignment");
                rd_kafka_assignment_serve(rk);
        }
}


/**
 * @brief Pause fetching of the currently assigned partitions.
 *
 * Partitions will be resumed by calling rd_kafka_assignment_resume() or
 * from either serve_removals() or serve_pending() above.
 */
void rd_kafka_assignment_pause (rd_kafka_t *rk, const char *reason) {

        if (rk->rk_consumer.assignment.all->cnt == 0)
                return;

        rd_kafka_dbg(rk, CGRP, "PAUSE",
                     "Pausing fetchers for %d assigned partition(s): %s",
                     rk->rk_consumer.assignment.all->cnt, reason);

        rd_kafka_toppars_pause_resume(rk,
                                      rd_true/*pause*/,
                                      RD_ASYNC,
                                      RD_KAFKA_TOPPAR_F_LIB_PAUSE,
                                      rk->rk_consumer.assignment.all);
}

/**
 * @brief Resume fetching of the currently assigned partitions which have
 *        previously been paused by rd_kafka_assignment_pause().
 */
void rd_kafka_assignment_resume (rd_kafka_t *rk, const char *reason) {

        if (rk->rk_consumer.assignment.all->cnt == 0)
                return;

        rd_kafka_dbg(rk, CGRP, "PAUSE",
                     "Resuming fetchers for %d assigned partition(s): %s",
                     rk->rk_consumer.assignment.all->cnt, reason);

        rd_kafka_toppars_pause_resume(rk,
                                      rd_false/*resume*/,
                                      RD_ASYNC,
                                      RD_KAFKA_TOPPAR_F_LIB_PAUSE,
                                      rk->rk_consumer.assignment.all);
}



/**
 * @brief Destroy assignment state (but not \p assignment itself)
 */
void rd_kafka_assignment_destroy (rd_kafka_t *rk) {
        if (!rk->rk_consumer.assignment.all)
                return; /* rd_kafka_assignment_init() not called */
        rd_kafka_topic_partition_list_destroy(
                rk->rk_consumer.assignment.all);
        rd_kafka_topic_partition_list_destroy(
                rk->rk_consumer.assignment.pending);
        rd_kafka_topic_partition_list_destroy(
                rk->rk_consumer.assignment.queried);
        rd_kafka_topic_partition_list_destroy(
                rk->rk_consumer.assignment.removed);
}


/**
 * @brief Initialize the assignment struct.
 */
void rd_kafka_assignment_init (rd_kafka_t *rk) {
        rk->rk_consumer.assignment.all =
                rd_kafka_topic_partition_list_new(100);
        rk->rk_consumer.assignment.pending =
                rd_kafka_topic_partition_list_new(100);
        rk->rk_consumer.assignment.queried =
                rd_kafka_topic_partition_list_new(100);
        rk->rk_consumer.assignment.removed =
                rd_kafka_topic_partition_list_new(100);
}
