/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2020-2022, Magnus Edenhill
 *               2023, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Mocks
 *
 */

#include "rdkafka_int.h"
#include "rdbuf.h"
#include "rdkafka_mock_int.h"


static const char *rd_kafka_mock_cgrp_state_names[] = {
    "Empty", "Joining", "Syncing", "Rebalancing", "Up"};


static void rd_kafka_mock_cgrp_rebalance(rd_kafka_mock_cgrp_t *mcgrp,
                                         const char *reason);
static void
rd_kafka_mock_cgrp_member_destroy(rd_kafka_mock_cgrp_t *mcgrp,
                                  rd_kafka_mock_cgrp_member_t *member);

static void rd_kafka_mock_cgrp_set_state(rd_kafka_mock_cgrp_t *mcgrp,
                                         unsigned int new_state,
                                         const char *reason) {
        if (mcgrp->state == new_state)
                return;

        rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                     "Mock consumer group %s with %d member(s) "
                     "changing state %s -> %s: %s",
                     mcgrp->id, mcgrp->member_cnt,
                     rd_kafka_mock_cgrp_state_names[mcgrp->state],
                     rd_kafka_mock_cgrp_state_names[new_state], reason);

        mcgrp->state = new_state;
}


/**
 * @brief Mark member as active (restart session timer)
 */
void rd_kafka_mock_cgrp_member_active(rd_kafka_mock_cgrp_t *mcgrp,
                                      rd_kafka_mock_cgrp_member_t *member) {
        rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                     "Marking mock consumer group member %s as active",
                     member->id);
        member->ts_last_activity = rd_clock();
}


/**
 * @brief Verify that the protocol request is valid in the current state.
 *
 * @param member may be NULL.
 */
rd_kafka_resp_err_t
rd_kafka_mock_cgrp_check_state(rd_kafka_mock_cgrp_t *mcgrp,
                               rd_kafka_mock_cgrp_member_t *member,
                               const rd_kafka_buf_t *request,
                               int32_t generation_id) {
        int16_t ApiKey              = request->rkbuf_reqhdr.ApiKey;
        rd_bool_t has_generation_id = ApiKey == RD_KAFKAP_SyncGroup ||
                                      ApiKey == RD_KAFKAP_Heartbeat ||
                                      ApiKey == RD_KAFKAP_OffsetCommit;

        if (has_generation_id && generation_id != mcgrp->generation_id)
                return RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION;

        if (ApiKey == RD_KAFKAP_OffsetCommit && !member)
                return RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;

        switch (mcgrp->state) {
        case RD_KAFKA_MOCK_CGRP_STATE_EMPTY:
                if (ApiKey == RD_KAFKAP_JoinGroup)
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                break;

        case RD_KAFKA_MOCK_CGRP_STATE_JOINING:
                if (ApiKey == RD_KAFKAP_JoinGroup ||
                    ApiKey == RD_KAFKAP_LeaveGroup)
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                else
                        return RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS;

        case RD_KAFKA_MOCK_CGRP_STATE_SYNCING:
                if (ApiKey == RD_KAFKAP_SyncGroup ||
                    ApiKey == RD_KAFKAP_JoinGroup ||
                    ApiKey == RD_KAFKAP_LeaveGroup)
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                else
                        return RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS;

        case RD_KAFKA_MOCK_CGRP_STATE_REBALANCING:
                if (ApiKey == RD_KAFKAP_JoinGroup ||
                    ApiKey == RD_KAFKAP_LeaveGroup ||
                    ApiKey == RD_KAFKAP_OffsetCommit)
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                else
                        return RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS;

        case RD_KAFKA_MOCK_CGRP_STATE_UP:
                if (ApiKey == RD_KAFKAP_JoinGroup ||
                    ApiKey == RD_KAFKAP_LeaveGroup ||
                    ApiKey == RD_KAFKAP_Heartbeat ||
                    ApiKey == RD_KAFKAP_OffsetCommit)
                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                break;
        }

        return RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}


/**
 * @brief Set a member's assignment (from leader's SyncGroupRequest)
 */
void rd_kafka_mock_cgrp_member_assignment_set(
    rd_kafka_mock_cgrp_t *mcgrp,
    rd_kafka_mock_cgrp_member_t *member,
    const rd_kafkap_bytes_t *Metadata) {
        if (member->assignment) {
                rd_assert(mcgrp->assignment_cnt > 0);
                mcgrp->assignment_cnt--;
                rd_kafkap_bytes_destroy(member->assignment);
                member->assignment = NULL;
        }

        if (Metadata) {
                mcgrp->assignment_cnt++;
                member->assignment = rd_kafkap_bytes_copy(Metadata);
        }
}


/**
 * @brief Sync done (successfully) or failed, send responses back to members.
 */
static void rd_kafka_mock_cgrp_sync_done(rd_kafka_mock_cgrp_t *mcgrp,
                                         rd_kafka_resp_err_t err) {
        rd_kafka_mock_cgrp_member_t *member;

        TAILQ_FOREACH(member, &mcgrp->members, link) {
                rd_kafka_buf_t *resp;

                if ((resp = member->resp)) {
                        member->resp = NULL;
                        rd_assert(resp->rkbuf_reqhdr.ApiKey ==
                                  RD_KAFKAP_SyncGroup);

                        rd_kafka_buf_write_i16(resp, err); /* ErrorCode */
                        /* MemberState */
                        rd_kafka_buf_write_kbytes(
                            resp, !err ? member->assignment : NULL);
                }

                rd_kafka_mock_cgrp_member_assignment_set(mcgrp, member, NULL);

                if (member->conn) {
                        rd_kafka_mock_connection_set_blocking(member->conn,
                                                              rd_false);
                        if (resp)
                                rd_kafka_mock_connection_send_response(
                                    member->conn, resp);
                } else if (resp) {
                        /* Member has disconnected. */
                        rd_kafka_buf_destroy(resp);
                }
        }
}


/**
 * @brief Check if all members have sent SyncGroupRequests, if so, propagate
 *        assignment to members.
 */
static void rd_kafka_mock_cgrp_sync_check(rd_kafka_mock_cgrp_t *mcgrp) {

        rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                     "Mock consumer group %s: awaiting %d/%d syncing members "
                     "in state %s",
                     mcgrp->id, mcgrp->assignment_cnt, mcgrp->member_cnt,
                     rd_kafka_mock_cgrp_state_names[mcgrp->state]);

        if (mcgrp->assignment_cnt < mcgrp->member_cnt)
                return;

        rd_kafka_mock_cgrp_sync_done(mcgrp, RD_KAFKA_RESP_ERR_NO_ERROR);
        rd_kafka_mock_cgrp_set_state(mcgrp, RD_KAFKA_MOCK_CGRP_STATE_UP,
                                     "all members synced");
}


/**
 * @brief Member has sent SyncGroupRequest and is waiting for a response,
 *        which will be sent when the all group member SyncGroupRequest are
 *        received.
 */
rd_kafka_resp_err_t
rd_kafka_mock_cgrp_member_sync_set(rd_kafka_mock_cgrp_t *mcgrp,
                                   rd_kafka_mock_cgrp_member_t *member,
                                   rd_kafka_mock_connection_t *mconn,
                                   rd_kafka_buf_t *resp) {

        if (mcgrp->state != RD_KAFKA_MOCK_CGRP_STATE_SYNCING)
                return RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS; /* FIXME */

        rd_kafka_mock_cgrp_member_active(mcgrp, member);

        rd_assert(!member->resp);

        member->resp = resp;
        member->conn = mconn;
        rd_kafka_mock_connection_set_blocking(member->conn, rd_true);

        /* Check if all members now have an assignment, if so, send responses */
        rd_kafka_mock_cgrp_sync_check(mcgrp);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief Member is explicitly leaving the group (through LeaveGroupRequest)
 */
rd_kafka_resp_err_t
rd_kafka_mock_cgrp_member_leave(rd_kafka_mock_cgrp_t *mcgrp,
                                rd_kafka_mock_cgrp_member_t *member) {

        rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                     "Member %s is leaving group %s", member->id, mcgrp->id);

        rd_kafka_mock_cgrp_member_destroy(mcgrp, member);

        rd_kafka_mock_cgrp_rebalance(mcgrp, "explicit member leave");

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
 * @brief Destroys/frees an array of protocols, including the array itself.
 */
void rd_kafka_mock_cgrp_protos_destroy(rd_kafka_mock_cgrp_proto_t *protos,
                                       int proto_cnt) {
        int i;

        for (i = 0; i < proto_cnt; i++) {
                rd_free(protos[i].name);
                if (protos[i].metadata)
                        rd_free(protos[i].metadata);
        }

        rd_free(protos);
}

static void
rd_kafka_mock_cgrp_rebalance_timer_restart(rd_kafka_mock_cgrp_t *mcgrp,
                                           int timeout_ms);

/**
 * @brief Elect consumer group leader and send JoinGroup responses
 */
static void rd_kafka_mock_cgrp_elect_leader(rd_kafka_mock_cgrp_t *mcgrp) {
        rd_kafka_mock_cgrp_member_t *member;

        rd_assert(mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_JOINING);
        rd_assert(!TAILQ_EMPTY(&mcgrp->members));

        mcgrp->generation_id++;

        /* Elect a leader deterministically if the group.instance.id is
         * available, using the lexicographic order of group.instance.ids.
         * This is not how it's done on a real broker, which uses the first
         * member joined. But we use a determinstic method for better testing,
         * (in case we want to enforce a some consumer to be the group leader).
         * If group.instance.id is not specified for any consumer, we use the
         * first one joined, similar to the real broker. */
        mcgrp->leader = NULL;
        TAILQ_FOREACH(member, &mcgrp->members, link) {
                if (!mcgrp->leader)
                        mcgrp->leader = member;
                else if (mcgrp->leader->group_instance_id &&
                         member->group_instance_id &&
                         (rd_strcmp(mcgrp->leader->group_instance_id,
                                    member->group_instance_id) > 0))
                        mcgrp->leader = member;
        }

        rd_kafka_dbg(
            mcgrp->cluster->rk, MOCK, "MOCK",
            "Consumer group %s with %d member(s) is rebalancing: "
            "elected leader is %s (group.instance.id = %s), generation id %d",
            mcgrp->id, mcgrp->member_cnt, mcgrp->leader->id,
            mcgrp->leader->group_instance_id, mcgrp->generation_id);

        /* Find the most commonly supported protocol name among the members.
         * FIXME: For now we'll blindly use the first protocol of the leader. */
        if (mcgrp->protocol_name)
                rd_free(mcgrp->protocol_name);
        mcgrp->protocol_name = RD_KAFKAP_STR_DUP(mcgrp->leader->protos[0].name);

        /* Send JoinGroupResponses to all members */
        TAILQ_FOREACH(member, &mcgrp->members, link) {
                rd_bool_t is_leader = member == mcgrp->leader;
                int member_cnt      = is_leader ? mcgrp->member_cnt : 0;
                rd_kafka_buf_t *resp;
                rd_kafka_mock_cgrp_member_t *member2;
                rd_kafka_mock_connection_t *mconn;

                /* Member connection has been closed, it will eventually
                 * reconnect or time out from the group. */
                if (!member->conn || !member->resp)
                        continue;
                mconn        = member->conn;
                member->conn = NULL;
                resp         = member->resp;
                member->resp = NULL;

                rd_assert(resp->rkbuf_reqhdr.ApiKey == RD_KAFKAP_JoinGroup);

                rd_kafka_buf_write_i16(resp, 0); /* ErrorCode */
                rd_kafka_buf_write_i32(resp, mcgrp->generation_id);
                rd_kafka_buf_write_str(resp, mcgrp->protocol_name, -1);
                rd_kafka_buf_write_str(resp, mcgrp->leader->id, -1);
                rd_kafka_buf_write_str(resp, member->id, -1);
                rd_kafka_buf_write_i32(resp, member_cnt);

                /* Send full member list to leader */
                if (member_cnt > 0) {
                        TAILQ_FOREACH(member2, &mcgrp->members, link) {
                                rd_kafka_buf_write_str(resp, member2->id, -1);
                                if (resp->rkbuf_reqhdr.ApiVersion >= 5)
                                        rd_kafka_buf_write_str(
                                            resp, member2->group_instance_id,
                                            -1);
                                /* FIXME: look up correct protocol name */
                                rd_assert(!rd_kafkap_str_cmp_str(
                                    member2->protos[0].name,
                                    mcgrp->protocol_name));

                                rd_kafka_buf_write_kbytes(
                                    resp, member2->protos[0].metadata);
                        }
                }

                /* Mark each member as active to avoid them timing out
                 * at the same time as a JoinGroup handler that blocks
                 * session.timeout.ms to elect a leader. */
                rd_kafka_mock_cgrp_member_active(mcgrp, member);

                rd_kafka_mock_connection_set_blocking(mconn, rd_false);
                rd_kafka_mock_connection_send_response(mconn, resp);
        }

        mcgrp->last_member_cnt = mcgrp->member_cnt;

        rd_kafka_mock_cgrp_set_state(mcgrp, RD_KAFKA_MOCK_CGRP_STATE_SYNCING,
                                     "leader elected, waiting for all "
                                     "members to sync");

        rd_kafka_mock_cgrp_rebalance_timer_restart(mcgrp,
                                                   mcgrp->session_timeout_ms);
}


/**
 * @brief Trigger group rebalance.
 */
static void rd_kafka_mock_cgrp_rebalance(rd_kafka_mock_cgrp_t *mcgrp,
                                         const char *reason) {
        int timeout_ms;

        if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_JOINING)
                return; /* Do nothing, group is already rebalancing. */
        else if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_EMPTY)
                timeout_ms = 3000; /* First join, low timeout.
                                    * Same as group.initial.rebalance.delay.ms
                                    * on the broker. */
        else if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_REBALANCING &&
                 mcgrp->member_cnt == mcgrp->last_member_cnt)
                timeout_ms = 100; /* All members rejoined, quickly transition
                                   * to election. */
        else /* Let the rebalance delay be a bit shorter than the
              * session timeout so that we don't time out waiting members
              * who are also subject to the session timeout. */
                timeout_ms = mcgrp->session_timeout_ms > 1000
                                 ? mcgrp->session_timeout_ms - 1000
                                 : mcgrp->session_timeout_ms;

        if (mcgrp->state == RD_KAFKA_MOCK_CGRP_STATE_SYNCING)
                /* Abort current Syncing state */
                rd_kafka_mock_cgrp_sync_done(
                    mcgrp, RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS);

        rd_kafka_mock_cgrp_set_state(mcgrp, RD_KAFKA_MOCK_CGRP_STATE_JOINING,
                                     reason);
        rd_kafka_mock_cgrp_rebalance_timer_restart(mcgrp, timeout_ms);
}

/**
 * @brief Consumer group state machine triggered by timer events.
 */
static void rd_kafka_mock_cgrp_fsm_timeout(rd_kafka_mock_cgrp_t *mcgrp) {
        rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                     "Mock consumer group %s FSM timeout in state %s",
                     mcgrp->id, rd_kafka_mock_cgrp_state_names[mcgrp->state]);

        switch (mcgrp->state) {
        case RD_KAFKA_MOCK_CGRP_STATE_EMPTY:
                /* No members, do nothing */
                break;
        case RD_KAFKA_MOCK_CGRP_STATE_JOINING:
                /* Timed out waiting for more members, elect a leader */
                if (mcgrp->member_cnt > 0)
                        rd_kafka_mock_cgrp_elect_leader(mcgrp);
                else
                        rd_kafka_mock_cgrp_set_state(
                            mcgrp, RD_KAFKA_MOCK_CGRP_STATE_EMPTY,
                            "no members joined");
                break;

        case RD_KAFKA_MOCK_CGRP_STATE_SYNCING:
                /* Timed out waiting for all members to sync */

                /* Send error response to all waiting members */
                rd_kafka_mock_cgrp_sync_done(
                    mcgrp, RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS /* FIXME */);

                rd_kafka_mock_cgrp_set_state(
                    mcgrp, RD_KAFKA_MOCK_CGRP_STATE_REBALANCING,
                    "timed out waiting for all members to synchronize");
                break;

        case RD_KAFKA_MOCK_CGRP_STATE_REBALANCING:
                /* Timed out waiting for all members to Leave or re-Join */
                rd_kafka_mock_cgrp_set_state(mcgrp,
                                             RD_KAFKA_MOCK_CGRP_STATE_JOINING,
                                             "timed out waiting for all "
                                             "members to re-Join or Leave");
                break;

        case RD_KAFKA_MOCK_CGRP_STATE_UP:
                /* No fsm timers triggered in this state, see
                 * the session_tmr instead */
                break;
        }
}

static void rd_kafka_mcgrp_rebalance_timer_cb(rd_kafka_timers_t *rkts,
                                              void *arg) {
        rd_kafka_mock_cgrp_t *mcgrp = arg;

        rd_kafka_mock_cgrp_fsm_timeout(mcgrp);
}


/**
 * @brief Restart the rebalance timer, postponing leader election.
 */
static void
rd_kafka_mock_cgrp_rebalance_timer_restart(rd_kafka_mock_cgrp_t *mcgrp,
                                           int timeout_ms) {
        rd_kafka_timer_start_oneshot(
            &mcgrp->cluster->timers, &mcgrp->rebalance_tmr, rd_true,
            timeout_ms * 1000, rd_kafka_mcgrp_rebalance_timer_cb, mcgrp);
}


static void
rd_kafka_mock_cgrp_member_destroy(rd_kafka_mock_cgrp_t *mcgrp,
                                  rd_kafka_mock_cgrp_member_t *member) {
        rd_assert(mcgrp->member_cnt > 0);
        TAILQ_REMOVE(&mcgrp->members, member, link);
        mcgrp->member_cnt--;

        rd_free(member->id);

        if (member->resp)
                rd_kafka_buf_destroy(member->resp);

        if (member->group_instance_id)
                rd_free(member->group_instance_id);

        rd_kafka_mock_cgrp_member_assignment_set(mcgrp, member, NULL);

        rd_kafka_mock_cgrp_protos_destroy(member->protos, member->proto_cnt);

        rd_free(member);
}


/**
 * @brief Find member in group.
 */
rd_kafka_mock_cgrp_member_t *
rd_kafka_mock_cgrp_member_find(const rd_kafka_mock_cgrp_t *mcgrp,
                               const rd_kafkap_str_t *MemberId) {
        const rd_kafka_mock_cgrp_member_t *member;
        TAILQ_FOREACH(member, &mcgrp->members, link) {
                if (!rd_kafkap_str_cmp_str(MemberId, member->id))
                        return (rd_kafka_mock_cgrp_member_t *)member;
        }

        return NULL;
}


/**
 * @brief Update or add member to consumer group
 */
rd_kafka_resp_err_t
rd_kafka_mock_cgrp_member_add(rd_kafka_mock_cgrp_t *mcgrp,
                              rd_kafka_mock_connection_t *mconn,
                              rd_kafka_buf_t *resp,
                              const rd_kafkap_str_t *MemberId,
                              const rd_kafkap_str_t *ProtocolType,
                              const rd_kafkap_str_t *GroupInstanceId,
                              rd_kafka_mock_cgrp_proto_t *protos,
                              int proto_cnt,
                              int session_timeout_ms) {
        rd_kafka_mock_cgrp_member_t *member;
        rd_kafka_resp_err_t err;

        err = rd_kafka_mock_cgrp_check_state(mcgrp, NULL, resp, -1);
        if (err)
                return err;

        /* Find member */
        member = rd_kafka_mock_cgrp_member_find(mcgrp, MemberId);
        if (!member) {
                /* Not found, add member */
                member = rd_calloc(1, sizeof(*member));

                if (!RD_KAFKAP_STR_LEN(MemberId)) {
                        /* Generate a member id */
                        char memberid[32];
                        rd_snprintf(memberid, sizeof(memberid), "%p", member);
                        member->id = rd_strdup(memberid);
                } else
                        member->id = RD_KAFKAP_STR_DUP(MemberId);

                if (RD_KAFKAP_STR_LEN(GroupInstanceId))
                        member->group_instance_id =
                            RD_KAFKAP_STR_DUP(GroupInstanceId);

                TAILQ_INSERT_TAIL(&mcgrp->members, member, link);
                mcgrp->member_cnt++;
        }

        if (mcgrp->state != RD_KAFKA_MOCK_CGRP_STATE_JOINING)
                rd_kafka_mock_cgrp_rebalance(mcgrp, "member join");

        mcgrp->session_timeout_ms = session_timeout_ms;

        if (member->protos)
                rd_kafka_mock_cgrp_protos_destroy(member->protos,
                                                  member->proto_cnt);
        member->protos    = protos;
        member->proto_cnt = proto_cnt;

        rd_assert(!member->resp);
        member->resp = resp;
        member->conn = mconn;
        rd_kafka_mock_cgrp_member_active(mcgrp, member);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
 * @brief Check if any members have exceeded the session timeout.
 */
static void rd_kafka_mock_cgrp_session_tmr_cb(rd_kafka_timers_t *rkts,
                                              void *arg) {
        rd_kafka_mock_cgrp_t *mcgrp = arg;
        rd_kafka_mock_cgrp_member_t *member, *tmp;
        rd_ts_t now     = rd_clock();
        int timeout_cnt = 0;

        TAILQ_FOREACH_SAFE(member, &mcgrp->members, link, tmp) {
                if (member->ts_last_activity +
                        (mcgrp->session_timeout_ms * 1000) >
                    now)
                        continue;

                rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
                             "Member %s session timed out for group %s",
                             member->id, mcgrp->id);

                rd_kafka_mock_cgrp_member_destroy(mcgrp, member);
                timeout_cnt++;
        }

        if (timeout_cnt)
                rd_kafka_mock_cgrp_rebalance(mcgrp, "member timeout");
}


void rd_kafka_mock_cgrp_destroy(rd_kafka_mock_cgrp_t *mcgrp) {
        rd_kafka_mock_cgrp_member_t *member;

        TAILQ_REMOVE(&mcgrp->cluster->cgrps, mcgrp, link);

        rd_kafka_timer_stop(&mcgrp->cluster->timers, &mcgrp->rebalance_tmr,
                            rd_true);
        rd_kafka_timer_stop(&mcgrp->cluster->timers, &mcgrp->session_tmr,
                            rd_true);
        rd_free(mcgrp->id);
        rd_free(mcgrp->protocol_type);
        if (mcgrp->protocol_name)
                rd_free(mcgrp->protocol_name);
        while ((member = TAILQ_FIRST(&mcgrp->members)))
                rd_kafka_mock_cgrp_member_destroy(mcgrp, member);
        rd_free(mcgrp);
}


rd_kafka_mock_cgrp_t *rd_kafka_mock_cgrp_find(rd_kafka_mock_cluster_t *mcluster,
                                              const rd_kafkap_str_t *GroupId) {
        rd_kafka_mock_cgrp_t *mcgrp;
        TAILQ_FOREACH(mcgrp, &mcluster->cgrps, link) {
                if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id))
                        return mcgrp;
        }

        return NULL;
}


/**
 * @brief Find or create a consumer group
 */
rd_kafka_mock_cgrp_t *
rd_kafka_mock_cgrp_get(rd_kafka_mock_cluster_t *mcluster,
                       const rd_kafkap_str_t *GroupId,
                       const rd_kafkap_str_t *ProtocolType) {
        rd_kafka_mock_cgrp_t *mcgrp;

        mcgrp = rd_kafka_mock_cgrp_find(mcluster, GroupId);
        if (mcgrp)
                return mcgrp;

        /* FIXME: What to do with mismatching ProtocolTypes? */

        mcgrp = rd_calloc(1, sizeof(*mcgrp));

        mcgrp->cluster       = mcluster;
        mcgrp->id            = RD_KAFKAP_STR_DUP(GroupId);
        mcgrp->protocol_type = RD_KAFKAP_STR_DUP(ProtocolType);
        mcgrp->generation_id = 1;
        TAILQ_INIT(&mcgrp->members);
        rd_kafka_timer_start(&mcluster->timers, &mcgrp->session_tmr,
                             1000 * 1000 /*1s*/,
                             rd_kafka_mock_cgrp_session_tmr_cb, mcgrp);

        TAILQ_INSERT_TAIL(&mcluster->cgrps, mcgrp, link);

        return mcgrp;
}


/**
 * @brief A client connection closed, check if any cgrp has any state
 *        for this connection that needs to be cleared.
 */
void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster,
                                           rd_kafka_mock_connection_t *mconn) {
        rd_kafka_mock_cgrp_t *mcgrp;

        TAILQ_FOREACH(mcgrp, &mcluster->cgrps, link) {
                rd_kafka_mock_cgrp_member_t *member, *tmp;
                TAILQ_FOREACH_SAFE(member, &mcgrp->members, link, tmp) {
                        if (member->conn == mconn) {
                                member->conn = NULL;
                                if (member->resp) {
                                        rd_kafka_buf_destroy(member->resp);
                                        member->resp = NULL;
                                }
                        }
                }
        }
}
