/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2020-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

extern "C" {
#include "../src/rdkafka_protocol.h"
#include "test.h"
}
#include <iostream>
#include <map>
#include <set>
#include <algorithm>
#include <cstring>
#include <cstdlib>
#include <assert.h>
#include "testcpp.h"
#include <fstream>

using namespace std;

/** Topic+Partition helper class */
class Toppar {
 public:
  Toppar(const string &topic, int32_t partition) :
      topic(topic), partition(partition) {
  }

  Toppar(const RdKafka::TopicPartition *tp) :
      topic(tp->topic()), partition(tp->partition()) {
  }

  friend bool operator==(const Toppar &a, const Toppar &b) {
    return a.partition == b.partition && a.topic == b.topic;
  }

  friend bool operator<(const Toppar &a, const Toppar &b) {
    if (a.partition < b.partition)
      return true;
    return a.topic < b.topic;
  }

  string str() const {
    return tostr() << topic << "[" << partition << "]";
  }

  std::string topic;
  int32_t partition;
};



static std::string get_bootstrap_servers() {
  RdKafka::Conf *conf;
  std::string bootstrap_servers;
  Test::conf_init(&conf, NULL, 0);
  conf->get("bootstrap.servers", bootstrap_servers);
  delete conf;
  return bootstrap_servers;
}


class DrCb : public RdKafka::DeliveryReportCb {
 public:
  void dr_cb(RdKafka::Message &msg) {
    if (msg.err())
      Test::Fail("Delivery failed: " + RdKafka::err2str(msg.err()));
  }
};


/**
 * @brief Produce messages to partitions.
 *
 * The pair is Toppar,msg_cnt_per_partition.
 * The Toppar is topic,partition_cnt.
 */
static void produce_msgs(vector<pair<Toppar, int> > partitions) {
  RdKafka::Conf *conf;
  Test::conf_init(&conf, NULL, 0);

  string errstr;
  DrCb dr;
  conf->set("dr_cb", &dr, errstr);
  RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
  if (!p)
    Test::Fail("Failed to create producer: " + errstr);
  delete conf;

  for (vector<pair<Toppar, int> >::iterator it = partitions.begin();
       it != partitions.end(); it++) {
    for (int part = 0; part < it->first.partition; part++) {
      for (int i = 0; i < it->second; i++) {
        RdKafka::ErrorCode err =
            p->produce(it->first.topic, part, RdKafka::Producer::RK_MSG_COPY,
                       (void *)"Hello there", 11, NULL, 0, 0, NULL);
        TEST_ASSERT(!err, "produce(%s, %d) failed: %s", it->first.topic.c_str(),
                    part, RdKafka::err2str(err).c_str());

        p->poll(0);
      }
    }
  }

  p->flush(10000);

  delete p;
}



static RdKafka::KafkaConsumer *make_consumer(
    string client_id,
    string group_id,
    string assignment_strategy,
    vector<pair<string, string> > *additional_conf,
    RdKafka::RebalanceCb *rebalance_cb,
    int timeout_s) {
  std::string bootstraps;
  std::string errstr;
  std::vector<std::pair<std::string, std::string> >::iterator itr;

  RdKafka::Conf *conf;
  Test::conf_init(&conf, NULL, timeout_s);
  Test::conf_set(conf, "client.id", client_id);
  Test::conf_set(conf, "group.id", group_id);
  Test::conf_set(conf, "auto.offset.reset", "earliest");
  Test::conf_set(conf, "enable.auto.commit", "false");
  Test::conf_set(conf, "partition.assignment.strategy", assignment_strategy);
  if (additional_conf != NULL) {
    for (itr = (*additional_conf).begin(); itr != (*additional_conf).end();
         itr++)
      Test::conf_set(conf, itr->first, itr->second);
  }

  if (rebalance_cb) {
    if (conf->set("rebalance_cb", rebalance_cb, errstr))
      Test::Fail("Failed to set rebalance_cb: " + errstr);
  }
  RdKafka::KafkaConsumer *consumer =
      RdKafka::KafkaConsumer::create(conf, errstr);
  if (!consumer)
    Test::Fail("Failed to create KafkaConsumer: " + errstr);
  delete conf;

  return consumer;
}

/**
 * @returns a CSV string of the vector
 */
static string string_vec_to_str(const vector<string> &v) {
  ostringstream ss;
  for (vector<string>::const_iterator it = v.begin(); it != v.end(); it++)
    ss << (it == v.begin() ? "" : ", ") << *it;
  return ss.str();
}

void expect_assignment(RdKafka::KafkaConsumer *consumer, size_t count) {
  std::vector<RdKafka::TopicPartition *> partitions;
  RdKafka::ErrorCode err;
  err = consumer->assignment(partitions);
  if (err)
    Test::Fail(consumer->name() +
               " assignment() failed: " + RdKafka::err2str(err));
  if (partitions.size() != count)
    Test::Fail(tostr() << "Expecting consumer " << consumer->name()
                       << " to have " << count
                       << " assigned partition(s), not: " << partitions.size());
  RdKafka::TopicPartition::destroy(partitions);
}


static bool TopicPartition_cmp(const RdKafka::TopicPartition *a,
                               const RdKafka::TopicPartition *b) {
  if (a->topic() < b->topic())
    return true;
  else if (a->topic() > b->topic())
    return false;
  return a->partition() < b->partition();
}


void expect_assignment(RdKafka::KafkaConsumer *consumer,
                       vector<RdKafka::TopicPartition *> &expected) {
  vector<RdKafka::TopicPartition *> partitions;
  RdKafka::ErrorCode err;
  err = consumer->assignment(partitions);
  if (err)
    Test::Fail(consumer->name() +
               " assignment() failed: " + RdKafka::err2str(err));

  if (partitions.size() != expected.size())
    Test::Fail(tostr() << "Expecting consumer " << consumer->name()
                       << " to have " << expected.size()
                       << " assigned partition(s), not " << partitions.size());

  sort(partitions.begin(), partitions.end(), TopicPartition_cmp);
  sort(expected.begin(), expected.end(), TopicPartition_cmp);

  int fails = 0;
  for (int i = 0; i < (int)partitions.size(); i++) {
    if (!TopicPartition_cmp(partitions[i], expected[i]))
      continue;

    Test::Say(tostr() << _C_RED << consumer->name() << ": expected assignment #"
                      << i << " " << expected[i]->topic() << " ["
                      << expected[i]->partition() << "], not "
                      << partitions[i]->topic() << " ["
                      << partitions[i]->partition() << "]\n");
    fails++;
  }

  if (fails)
    Test::Fail(consumer->name() + ": Expected assignment mismatch, see above");

  RdKafka::TopicPartition::destroy(partitions);
}


class DefaultRebalanceCb : public RdKafka::RebalanceCb {
 private:
  static string part_list_print(
      const vector<RdKafka::TopicPartition *> &partitions) {
    ostringstream ss;
    for (unsigned int i = 0; i < partitions.size(); i++)
      ss << (i == 0 ? "" : ", ") << partitions[i]->topic() << " ["
         << partitions[i]->partition() << "]";
    return ss.str();
  }

 public:
  int assign_call_cnt;
  int revoke_call_cnt;
  int nonempty_assign_call_cnt; /**< ASSIGN_PARTITIONS with partitions */
  int lost_call_cnt;
  int partitions_assigned_net;
  bool wait_rebalance;
  int64_t ts_last_assign;   /**< Timestamp of last rebalance assignment */
  map<Toppar, int> msg_cnt; /**< Number of consumed messages per partition. */

  ~DefaultRebalanceCb() {
    reset_msg_cnt();
  }

  DefaultRebalanceCb() :
      assign_call_cnt(0),
      revoke_call_cnt(0),
      nonempty_assign_call_cnt(0),
      lost_call_cnt(0),
      partitions_assigned_net(0),
      wait_rebalance(false),
      ts_last_assign(0) {
  }


  void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                    RdKafka::ErrorCode err,
                    std::vector<RdKafka::TopicPartition *> &partitions) {
    wait_rebalance = false;

    std::string protocol = consumer->rebalance_protocol();

    TEST_ASSERT(protocol == "COOPERATIVE",
                "%s: Expected rebalance_protocol \"COOPERATIVE\", not %s",
                consumer->name().c_str(), protocol.c_str());

    const char *lost_str = consumer->assignment_lost() ? " (LOST)" : "";
    Test::Say(tostr() << _C_YEL "RebalanceCb " << protocol << ": "
                      << consumer->name() << " " << RdKafka::err2str(err)
                      << lost_str << ": " << part_list_print(partitions)
                      << "\n");

    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      if (consumer->assignment_lost())
        Test::Fail("unexpected lost assignment during ASSIGN rebalance");
      RdKafka::Error *error = consumer->incremental_assign(partitions);
      if (error)
        Test::Fail(tostr() << "consumer->incremental_assign() failed: "
                           << error->str());
      if (partitions.size() > 0)
        nonempty_assign_call_cnt++;
      assign_call_cnt += 1;
      partitions_assigned_net += (int)partitions.size();
      ts_last_assign = test_clock();

    } else {
      if (consumer->assignment_lost())
        lost_call_cnt += 1;
      RdKafka::Error *error = consumer->incremental_unassign(partitions);
      if (error)
        Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
                           << error->str());
      if (partitions.size() == 0)
        Test::Fail("revoked partitions size should never be 0");
      revoke_call_cnt += 1;
      partitions_assigned_net -= (int)partitions.size();
    }

    /* Reset message counters for the given partitions. */
    Test::Say(consumer->name() + ": resetting message counters:\n");
    reset_msg_cnt(partitions);
  }

  bool poll_once(RdKafka::KafkaConsumer *c, int timeout_ms) {
    RdKafka::Message *msg = c->consume(timeout_ms);
    bool ret              = msg->err() != RdKafka::ERR__TIMED_OUT;
    if (!msg->err())
      msg_cnt[Toppar(msg->topic_name(), msg->partition())]++;
    delete msg;
    return ret;
  }

  void reset_msg_cnt() {
    msg_cnt.clear();
  }

  void reset_msg_cnt(Toppar &tp) {
    int msgcnt = get_msg_cnt(tp);
    Test::Say(tostr() << " RESET " << tp.topic << " [" << tp.partition << "]"
                      << " with " << msgcnt << " messages\n");
    if (!msg_cnt.erase(tp) && msgcnt)
      Test::Fail("erase failed!");
  }

  void reset_msg_cnt(const vector<RdKafka::TopicPartition *> &partitions) {
    for (unsigned int i = 0; i < partitions.size(); i++) {
      Toppar tp(partitions[i]->topic(), partitions[i]->partition());
      reset_msg_cnt(tp);
    }
  }

  int get_msg_cnt(const Toppar &tp) {
    map<Toppar, int>::iterator it = msg_cnt.find(tp);
    if (it == msg_cnt.end())
      return 0;
    return it->second;
  }
};



/**
 * @brief Verify that the consumer's assignment is a subset of the
 *        subscribed topics.
 *
 * @param allow_mismatch Allow assignment of not subscribed topics.
 *                       This can happen when the subscription is updated
 *                       but a rebalance callback hasn't been seen yet.
 * @param all_assignments Accumulated assignments for all consumers.
 *                        If an assigned partition already exists it means
 *                        the partition is assigned to multiple consumers and
 *                        the test will fail.
 * @param exp_msg_cnt Expected message count per assigned partition, or -1
 *                    if not to check.
 *
 * @returns the number of assigned partitions, or fails if the
 *          assignment is empty or there is an assignment for
 *          topic that is not subscribed.
 */
static int verify_consumer_assignment(
    RdKafka::KafkaConsumer *consumer,
    DefaultRebalanceCb &rebalance_cb,
    const vector<string> &topics,
    bool allow_empty,
    bool allow_mismatch,
    map<Toppar, RdKafka::KafkaConsumer *> *all_assignments,
    int exp_msg_cnt) {
  vector<RdKafka::TopicPartition *> partitions;
  RdKafka::ErrorCode err;
  int fails = 0;
  int count;
  ostringstream ss;

  err = consumer->assignment(partitions);
  TEST_ASSERT(!err, "Failed to get assignment for consumer %s: %s",
              consumer->name().c_str(), RdKafka::err2str(err).c_str());

  count = (int)partitions.size();

  for (vector<RdKafka::TopicPartition *>::iterator it = partitions.begin();
       it != partitions.end(); it++) {
    RdKafka::TopicPartition *p = *it;

    if (find(topics.begin(), topics.end(), p->topic()) == topics.end()) {
      Test::Say(tostr() << (allow_mismatch ? _C_YEL "Warning (allowed)"
                                           : _C_RED "Error")
                        << ": " << consumer->name() << " is assigned "
                        << p->topic() << " [" << p->partition() << "] which is "
                        << "not in the list of subscribed topics: "
                        << string_vec_to_str(topics) << "\n");
      if (!allow_mismatch)
        fails++;
    }

    Toppar tp(p);
    pair<map<Toppar, RdKafka::KafkaConsumer *>::iterator, bool> ret;
    ret = all_assignments->insert(
        pair<Toppar, RdKafka::KafkaConsumer *>(tp, consumer));
    if (!ret.second) {
      Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
                        << " is assigned " << p->topic() << " ["
                        << p->partition()
                        << "] which is "
                           "already assigned to consumer "
                        << ret.first->second->name() << "\n");
      fails++;
    }


    int msg_cnt = rebalance_cb.get_msg_cnt(tp);

    if (exp_msg_cnt != -1 && msg_cnt != exp_msg_cnt) {
      Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
                        << " expected " << exp_msg_cnt << " messages on "
                        << p->topic() << " [" << p->partition() << "], not "
                        << msg_cnt << "\n");
      fails++;
    }

    ss << (it == partitions.begin() ? "" : ", ") << p->topic() << " ["
       << p->partition() << "] (" << msg_cnt << "msgs)";
  }

  RdKafka::TopicPartition::destroy(partitions);

  Test::Say(tostr() << "Consumer " << consumer->name() << " assignment ("
                    << count << "): " << ss.str() << "\n");

  if (count == 0 && !allow_empty)
    Test::Fail("Consumer " + consumer->name() +
               " has unexpected empty assignment");

  if (fails)
    Test::Fail(
        tostr() << "Consumer " + consumer->name()
                << " assignment verification failed (see previous error)");

  return count;
}



/* -------- a_assign_tests
 *
 * check behavior incremental assign / unassign outside the context of a
 * rebalance.
 */


/** Incremental assign, then assign(NULL).
 */
static void assign_test_1(RdKafka::KafkaConsumer *consumer,
                          std::vector<RdKafka::TopicPartition *> toppars1,
                          std::vector<RdKafka::TopicPartition *> toppars2) {
  RdKafka::ErrorCode err;
  RdKafka::Error *error;

  Test::Say("Incremental assign, then assign(NULL)\n");

  if ((error = consumer->incremental_assign(toppars1)))
    Test::Fail(tostr() << "Incremental assign failed: " << error->str());
  Test::check_assignment(consumer, 1, &toppars1[0]->topic());

  if ((err = consumer->unassign()))
    Test::Fail("Unassign failed: " + RdKafka::err2str(err));
  Test::check_assignment(consumer, 0, NULL);
}


/** Assign, then incremental unassign.
 */
static void assign_test_2(RdKafka::KafkaConsumer *consumer,
                          std::vector<RdKafka::TopicPartition *> toppars1,
                          std::vector<RdKafka::TopicPartition *> toppars2) {
  RdKafka::ErrorCode err;
  RdKafka::Error *error;

  Test::Say("Assign, then incremental unassign\n");

  if ((err = consumer->assign(toppars1)))
    Test::Fail("Assign failed: " + RdKafka::err2str(err));
  Test::check_assignment(consumer, 1, &toppars1[0]->topic());

  if ((error = consumer->incremental_unassign(toppars1)))
    Test::Fail("Incremental unassign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);
}


/** Incremental assign, then incremental unassign.
 */
static void assign_test_3(RdKafka::KafkaConsumer *consumer,
                          std::vector<RdKafka::TopicPartition *> toppars1,
                          std::vector<RdKafka::TopicPartition *> toppars2) {
  RdKafka::Error *error;

  Test::Say("Incremental assign, then incremental unassign\n");

  if ((error = consumer->incremental_assign(toppars1)))
    Test::Fail("Incremental assign failed: " + error->str());
  Test::check_assignment(consumer, 1, &toppars1[0]->topic());

  if ((error = consumer->incremental_unassign(toppars1)))
    Test::Fail("Incremental unassign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);
}


/** Multi-topic incremental assign and unassign + message consumption.
 */
static void assign_test_4(RdKafka::KafkaConsumer *consumer,
                          std::vector<RdKafka::TopicPartition *> toppars1,
                          std::vector<RdKafka::TopicPartition *> toppars2) {
  RdKafka::Error *error;

  Test::Say(
      "Multi-topic incremental assign and unassign + message consumption\n");

  if ((error = consumer->incremental_assign(toppars1)))
    Test::Fail("Incremental assign failed: " + error->str());
  Test::check_assignment(consumer, 1, &toppars1[0]->topic());

  RdKafka::Message *m = consumer->consume(5000);
  if (m->err() != RdKafka::ERR_NO_ERROR)
    Test::Fail("Expecting a consumed message.");
  if (m->len() != 100)
    Test::Fail(tostr() << "Expecting msg len to be 100, not: "
                       << m->len()); /* implies read from topic 1. */
  delete m;

  if ((error = consumer->incremental_unassign(toppars1)))
    Test::Fail("Incremental unassign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);

  m = consumer->consume(100);
  if (m->err() != RdKafka::ERR__TIMED_OUT)
    Test::Fail("Not expecting a consumed message.");
  delete m;

  if ((error = consumer->incremental_assign(toppars2)))
    Test::Fail("Incremental assign failed: " + error->str());
  Test::check_assignment(consumer, 1, &toppars2[0]->topic());

  m = consumer->consume(5000);
  if (m->err() != RdKafka::ERR_NO_ERROR)
    Test::Fail("Expecting a consumed message.");
  if (m->len() != 200)
    Test::Fail(tostr() << "Expecting msg len to be 200, not: "
                       << m->len()); /* implies read from topic 2. */
  delete m;

  if ((error = consumer->incremental_assign(toppars1)))
    Test::Fail("Incremental assign failed: " + error->str());
  if (Test::assignment_partition_count(consumer, NULL) != 2)
    Test::Fail(tostr() << "Expecting current assignment to have size 2, not: "
                       << Test::assignment_partition_count(consumer, NULL));

  m = consumer->consume(5000);
  if (m->err() != RdKafka::ERR_NO_ERROR)
    Test::Fail("Expecting a consumed message.");
  delete m;

  if ((error = consumer->incremental_unassign(toppars2)))
    Test::Fail("Incremental unassign failed: " + error->str());
  if ((error = consumer->incremental_unassign(toppars1)))
    Test::Fail("Incremental unassign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);
}


/** Incremental assign and unassign of empty collection.
 */
static void assign_test_5(RdKafka::KafkaConsumer *consumer,
                          std::vector<RdKafka::TopicPartition *> toppars1,
                          std::vector<RdKafka::TopicPartition *> toppars2) {
  RdKafka::Error *error;
  std::vector<RdKafka::TopicPartition *> toppars3;

  Test::Say("Incremental assign and unassign of empty collection\n");

  if ((error = consumer->incremental_assign(toppars3)))
    Test::Fail("Incremental assign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);

  if ((error = consumer->incremental_unassign(toppars3)))
    Test::Fail("Incremental unassign failed: " + error->str());
  Test::check_assignment(consumer, 0, NULL);
}



static void run_test(
    const std::string &t1,
    const std::string &t2,
    void (*test)(RdKafka::KafkaConsumer *consumer,
                 std::vector<RdKafka::TopicPartition *> toppars1,
                 std::vector<RdKafka::TopicPartition *> toppars2)) {
  std::vector<RdKafka::TopicPartition *> toppars1;
  toppars1.push_back(RdKafka::TopicPartition::create(t1, 0));
  std::vector<RdKafka::TopicPartition *> toppars2;
  toppars2.push_back(RdKafka::TopicPartition::create(t2, 0));

  RdKafka::KafkaConsumer *consumer =
      make_consumer("C_1", t1, "cooperative-sticky", NULL, NULL, 10);

  test(consumer, toppars1, toppars2);

  RdKafka::TopicPartition::destroy(toppars1);
  RdKafka::TopicPartition::destroy(toppars2);

  consumer->close();
  delete consumer;
}


static void a_assign_tests() {
  SUB_TEST_QUICK();

  int msgcnt         = 1000;
  const int msgsize1 = 100;
  const int msgsize2 = 200;

  std::string topic1_str = Test::mk_topic_name("0113-a1", 1);
  test_create_topic(NULL, topic1_str.c_str(), 1, 1);
  std::string topic2_str = Test::mk_topic_name("0113-a2", 1);
  test_create_topic(NULL, topic2_str.c_str(), 1, 1);

  test_produce_msgs_easy_size(topic1_str.c_str(), 0, 0, msgcnt, msgsize1);
  test_produce_msgs_easy_size(topic2_str.c_str(), 0, 0, msgcnt, msgsize2);

  run_test(topic1_str, topic2_str, assign_test_1);
  run_test(topic1_str, topic2_str, assign_test_2);
  run_test(topic1_str, topic2_str, assign_test_3);
  run_test(topic1_str, topic2_str, assign_test_4);
  run_test(topic1_str, topic2_str, assign_test_5);

  SUB_TEST_PASS();
}



/**
 * @brief Quick Assign 1,2, Assign 2,3, Assign 1,2,3 test to verify
 *        that the correct OffsetFetch response is used.
 *        See note in rdkafka_assignment.c for details.
 *
 * Makes use of the mock cluster to induce latency.
 */
static void a_assign_rapid() {
  SUB_TEST_QUICK();

  std::string group_id = __FUNCTION__;

  rd_kafka_mock_cluster_t *mcluster;
  const char *bootstraps;

  mcluster         = test_mock_cluster_new(3, &bootstraps);
  int32_t coord_id = 1;
  rd_kafka_mock_coordinator_set(mcluster, "group", group_id.c_str(), coord_id);

  rd_kafka_mock_topic_create(mcluster, "topic1", 1, 1);
  rd_kafka_mock_topic_create(mcluster, "topic2", 1, 1);
  rd_kafka_mock_topic_create(mcluster, "topic3", 1, 1);

  /*
   * Produce messages to topics
   */
  const int msgs_per_partition = 1000;

  RdKafka::Conf *pconf;
  Test::conf_init(&pconf, NULL, 10);
  Test::conf_set(pconf, "bootstrap.servers", bootstraps);
  Test::conf_set(pconf, "security.protocol", "plaintext");
  std::string errstr;
  RdKafka::Producer *p = RdKafka::Producer::create(pconf, errstr);
  if (!p)
    Test::Fail(tostr() << __FUNCTION__
                       << ": Failed to create producer: " << errstr);
  delete pconf;

  Test::produce_msgs(p, "topic1", 0, msgs_per_partition, 10,
                     false /*no flush*/);
  Test::produce_msgs(p, "topic2", 0, msgs_per_partition, 10,
                     false /*no flush*/);
  Test::produce_msgs(p, "topic3", 0, msgs_per_partition, 10,
                     false /*no flush*/);
  p->flush(10 * 1000);

  delete p;

  vector<RdKafka::TopicPartition *> toppars1;
  toppars1.push_back(RdKafka::TopicPartition::create("topic1", 0));
  vector<RdKafka::TopicPartition *> toppars2;
  toppars2.push_back(RdKafka::TopicPartition::create("topic2", 0));
  vector<RdKafka::TopicPartition *> toppars3;
  toppars3.push_back(RdKafka::TopicPartition::create("topic3", 0));


  RdKafka::Conf *conf;
  Test::conf_init(&conf, NULL, 20);
  Test::conf_set(conf, "bootstrap.servers", bootstraps);
  Test::conf_set(conf, "security.protocol", "plaintext");
  Test::conf_set(conf, "client.id", __FUNCTION__);
  Test::conf_set(conf, "group.id", group_id);
  Test::conf_set(conf, "auto.offset.reset", "earliest");
  Test::conf_set(conf, "enable.auto.commit", "false");

  RdKafka::KafkaConsumer *consumer;
  consumer = RdKafka::KafkaConsumer::create(conf, errstr);
  if (!consumer)
    Test::Fail(tostr() << __FUNCTION__
                       << ": Failed to create consumer: " << errstr);
  delete conf;

  vector<RdKafka::TopicPartition *> toppars;
  vector<RdKafka::TopicPartition *> expected;

  map<Toppar, int64_t> pos; /* Expected consume position per partition */
  pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 0;
  pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 0;
  pos[Toppar(toppars3[0]->topic(), toppars3[0]->partition())] = 0;

  /* To make sure offset commits are fetched in proper assign sequence
   * we commit an offset that should not be used in the final consume loop.
   * This commit will be overwritten below with another commit. */
  vector<RdKafka::TopicPartition *> offsets;
  offsets.push_back(RdKafka::TopicPartition::create(
      toppars1[0]->topic(), toppars1[0]->partition(), 11));
  /* This partition should start at this position even though
   * there will be a sub-sequent commit to overwrite it, that should not
   * be used since this partition is never unassigned. */
  offsets.push_back(RdKafka::TopicPartition::create(
      toppars2[0]->topic(), toppars2[0]->partition(), 22));
  pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 22;

  Test::print_TopicPartitions("pre-commit", offsets);

  RdKafka::ErrorCode err;
  err = consumer->commitSync(offsets);
  if (err)
    Test::Fail(tostr() << __FUNCTION__ << ": pre-commit failed: "
                       << RdKafka::err2str(err) << "\n");

  /* Add coordinator delay so that the OffsetFetchRequest originating
   * from the coming incremental_assign() will not finish before
   * we call incremental_unassign() and incremental_assign() again, resulting
   * in a situation where the initial OffsetFetchResponse will contain
   * an older offset for a previous assignment of one partition. */
  rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 5000);


  /* Assign 1,2 == 1,2 */
  toppars.push_back(toppars1[0]);
  toppars.push_back(toppars2[0]);
  expected.push_back(toppars1[0]);
  expected.push_back(toppars2[0]);
  Test::incremental_assign(consumer, toppars);
  expect_assignment(consumer, expected);

  /* Unassign -1 == 2 */
  toppars.clear();
  toppars.push_back(toppars1[0]);
  vector<RdKafka::TopicPartition *>::iterator it =
      find(expected.begin(), expected.end(), toppars1[0]);
  expected.erase(it);

  Test::incremental_unassign(consumer, toppars);
  expect_assignment(consumer, expected);


  /* Commit offset for the removed partition and the partition that is
   * unchanged in the assignment. */
  RdKafka::TopicPartition::destroy(offsets);
  offsets.push_back(RdKafka::TopicPartition::create(
      toppars1[0]->topic(), toppars1[0]->partition(), 55));
  offsets.push_back(RdKafka::TopicPartition::create(
      toppars2[0]->topic(), toppars2[0]->partition(), 33)); /* should not be
                                                             * used. */
  pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 55;
  Test::print_TopicPartitions("commit", offsets);

  err = consumer->commitAsync(offsets);
  if (err)
    Test::Fail(tostr() << __FUNCTION__
                       << ": commit failed: " << RdKafka::err2str(err) << "\n");

  /* Assign +3 == 2,3 */
  toppars.clear();
  toppars.push_back(toppars3[0]);
  expected.push_back(toppars3[0]);
  Test::incremental_assign(consumer, toppars);
  expect_assignment(consumer, expected);

  /* Now remove the latency */
  Test::Say(_C_MAG "Clearing rtt\n");
  rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 0);

  /* Assign +1 == 1,2,3 */
  toppars.clear();
  toppars.push_back(toppars1[0]);
  expected.push_back(toppars1[0]);
  Test::incremental_assign(consumer, toppars);
  expect_assignment(consumer, expected);

  /*
   * Verify consumed messages
   */
  int wait_end = (int)expected.size();
  while (wait_end > 0) {
    RdKafka::Message *msg = consumer->consume(10 * 1000);
    if (msg->err() == RdKafka::ERR__TIMED_OUT)
      Test::Fail(tostr() << __FUNCTION__
                         << ": Consume timed out waiting "
                            "for "
                         << wait_end << " more partitions");

    Toppar tp        = Toppar(msg->topic_name(), msg->partition());
    int64_t *exp_pos = &pos[tp];

    Test::Say(3, tostr() << __FUNCTION__ << ": Received " << tp.topic << " ["
                         << tp.partition << "] at offset " << msg->offset()
                         << " (expected offset " << *exp_pos << ")\n");

    if (*exp_pos != msg->offset())
      Test::Fail(tostr() << __FUNCTION__ << ": expected message offset "
                         << *exp_pos << " for " << msg->topic_name() << " ["
                         << msg->partition() << "], not " << msg->offset()
                         << "\n");
    (*exp_pos)++;
    if (*exp_pos == msgs_per_partition) {
      TEST_ASSERT(wait_end > 0, "");
      wait_end--;
    } else if (msg->offset() > msgs_per_partition)
      Test::Fail(tostr() << __FUNCTION__ << ": unexpected message with "
                         << "offset " << msg->offset() << " on " << tp.topic
                         << " [" << tp.partition << "]\n");

    delete msg;
  }

  RdKafka::TopicPartition::destroy(offsets);
  RdKafka::TopicPartition::destroy(toppars1);
  RdKafka::TopicPartition::destroy(toppars2);
  RdKafka::TopicPartition::destroy(toppars3);

  delete consumer;

  test_mock_cluster_destroy(mcluster);

  SUB_TEST_PASS();
}


/* Check behavior when:
 *   1. single topic with 2 partitions.
 *   2. consumer 1 (with rebalance_cb) subscribes to it.
 *   3. consumer 2 (with rebalance_cb) subscribes to it.
 *   4. close.
 */

static void b_subscribe_with_cb_test(rd_bool_t close_consumer) {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name.c_str(), 2, 1);

  DefaultRebalanceCb rebalance_cb1;
  RdKafka::KafkaConsumer *c1 = make_consumer(
      "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 25);
  DefaultRebalanceCb rebalance_cb2;
  RdKafka::KafkaConsumer *c2 = make_consumer(
      "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 25);
  test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c1, topic_name);

  bool c2_subscribed = false;
  while (true) {
    Test::poll_once(c1, 500);
    Test::poll_once(c2, 500);

    /* Start c2 after c1 has received initial assignment */
    if (!c2_subscribed && rebalance_cb1.assign_call_cnt > 0) {
      Test::subscribe(c2, topic_name);
      c2_subscribed = true;
    }

    /* Failure case: test will time out. */
    if (rebalance_cb1.assign_call_cnt == 3 &&
        rebalance_cb2.assign_call_cnt == 2) {
      break;
    }
  }

  /* Sequence of events:
   *
   * 1. c1 joins group.
   * 2. c1 gets assigned 2 partitions.
   *     - there isn't a follow-on rebalance because there aren't any revoked
   * partitions.
   * 3. c2 joins group.
   * 4. This results in a rebalance with one partition being revoked from c1,
   * and no partitions assigned to either c1 or c2 (however the rebalance
   * callback will be called in each case with an empty set).
   * 5. c1 then re-joins the group since it had a partition revoked.
   * 6. c2 is now assigned a single partition, and c1's incremental assignment
   * is empty.
   * 7. Since there were no revoked partitions, no further rebalance is
   * triggered.
   */

  /* The rebalance cb is always called on assign, even if empty. */
  if (rebalance_cb1.assign_call_cnt != 3)
    Test::Fail(tostr() << "Expecting 3 assign calls on consumer 1, not "
                       << rebalance_cb1.assign_call_cnt);
  if (rebalance_cb2.assign_call_cnt != 2)
    Test::Fail(tostr() << "Expecting 2 assign calls on consumer 2, not: "
                       << rebalance_cb2.assign_call_cnt);

  /* The rebalance cb is not called on and empty revoke (unless partitions lost,
   * which is not the case here) */
  if (rebalance_cb1.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expecting 1 revoke call on consumer 1, not: "
                       << rebalance_cb1.revoke_call_cnt);
  if (rebalance_cb2.revoke_call_cnt != 0)
    Test::Fail(tostr() << "Expecting 0 revoke calls on consumer 2, not: "
                       << rebalance_cb2.revoke_call_cnt);

  /* Final state */

  /* Expect both consumers to have 1 assigned partition (via net calculation in
   * rebalance_cb) */
  if (rebalance_cb1.partitions_assigned_net != 1)
    Test::Fail(tostr()
               << "Expecting consumer 1 to have net 1 assigned partition, not: "
               << rebalance_cb1.partitions_assigned_net);
  if (rebalance_cb2.partitions_assigned_net != 1)
    Test::Fail(tostr()
               << "Expecting consumer 2 to have net 1 assigned partition, not: "
               << rebalance_cb2.partitions_assigned_net);

  /* Expect both consumers to have 1 assigned partition (via ->assignment()
   * query) */
  expect_assignment(c1, 1);
  expect_assignment(c2, 1);

  /* Make sure the fetchers are running */
  int msgcnt         = 100;
  const int msgsize1 = 100;
  test_produce_msgs_easy_size(topic_name.c_str(), 0, 0, msgcnt, msgsize1);
  test_produce_msgs_easy_size(topic_name.c_str(), 0, 1, msgcnt, msgsize1);

  bool consumed_from_c1 = false;
  bool consumed_from_c2 = false;
  while (true) {
    RdKafka::Message *msg1 = c1->consume(100);
    RdKafka::Message *msg2 = c2->consume(100);

    if (msg1->err() == RdKafka::ERR_NO_ERROR)
      consumed_from_c1 = true;
    if (msg1->err() == RdKafka::ERR_NO_ERROR)
      consumed_from_c2 = true;

    delete msg1;
    delete msg2;

    /* Failure case: test will timeout. */
    if (consumed_from_c1 && consumed_from_c2)
      break;
  }

  if (!close_consumer) {
    delete c1;
    delete c2;
    return;
  }

  c1->close();
  c2->close();

  /* Closing the consumer should trigger rebalance_cb (revoke): */
  if (rebalance_cb1.revoke_call_cnt != 2)
    Test::Fail(tostr() << "Expecting 2 revoke calls on consumer 1, not: "
                       << rebalance_cb1.revoke_call_cnt);
  if (rebalance_cb2.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expecting 1 revoke call on consumer 2, not: "
                       << rebalance_cb2.revoke_call_cnt);

  /* ..and net assigned partitions should drop to 0 in both cases: */
  if (rebalance_cb1.partitions_assigned_net != 0)
    Test::Fail(
        tostr()
        << "Expecting consumer 1 to have net 0 assigned partitions, not: "
        << rebalance_cb1.partitions_assigned_net);
  if (rebalance_cb2.partitions_assigned_net != 0)
    Test::Fail(
        tostr()
        << "Expecting consumer 2 to have net 0 assigned partitions, not: "
        << rebalance_cb2.partitions_assigned_net);

  /* Nothing in this test should result in lost partitions */
  if (rebalance_cb1.lost_call_cnt > 0)
    Test::Fail(
        tostr() << "Expecting consumer 1 to have 0 lost partition events, not: "
                << rebalance_cb1.lost_call_cnt);
  if (rebalance_cb2.lost_call_cnt > 0)
    Test::Fail(
        tostr() << "Expecting consumer 2 to have 0 lost partition events, not: "
                << rebalance_cb2.lost_call_cnt);

  delete c1;
  delete c2;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single topic with 2 partitions.
 *   2. Consumer 1 (no rebalance_cb) subscribes to it.
 *   3. Consumer 2 (no rebalance_cb) subscribes to it.
 *   4. Close.
 */

static void c_subscribe_no_cb_test(rd_bool_t close_consumer) {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name.c_str(), 2, 1);

  RdKafka::KafkaConsumer *c1 =
      make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 20);
  RdKafka::KafkaConsumer *c2 =
      make_consumer("C_2", group_name, "cooperative-sticky", NULL, NULL, 20);
  test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c1, topic_name);

  bool c2_subscribed = false;
  bool done          = false;
  while (!done) {
    Test::poll_once(c1, 500);
    Test::poll_once(c2, 500);

    if (Test::assignment_partition_count(c1, NULL) == 2 && !c2_subscribed) {
      Test::subscribe(c2, topic_name);
      c2_subscribed = true;
    }

    if (Test::assignment_partition_count(c1, NULL) == 1 &&
        Test::assignment_partition_count(c2, NULL) == 1) {
      Test::Say("Consumer 1 and 2 are both assigned to single partition.\n");
      done = true;
    }
  }

  if (close_consumer) {
    Test::Say("Closing consumer 1\n");
    c1->close();
    Test::Say("Closing consumer 2\n");
    c2->close();
  } else {
    Test::Say("Skipping close() of consumer 1 and 2.\n");
  }

  delete c1;
  delete c2;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single consumer (no rebalance_cb) subscribes to topic.
 *   2. Subscription is changed (topic added).
 *   3. Consumer is closed.
 */

static void d_change_subscription_add_topic(rd_bool_t close_consumer) {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
  std::string topic_name_2 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_2.c_str(), 2, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1);

  bool subscribed_to_one_topic = false;
  bool done                    = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 2 &&
        !subscribed_to_one_topic) {
      subscribed_to_one_topic = true;
      Test::subscribe(c, topic_name_1, topic_name_2);
    }

    if (Test::assignment_partition_count(c, NULL) == 4) {
      Test::Say("Consumer is assigned to two topics.\n");
      done = true;
    }
  }

  if (close_consumer) {
    Test::Say("Closing consumer\n");
    c->close();
  } else
    Test::Say("Skipping close() of consumer\n");

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single consumer (no rebalance_cb) subscribes to topic.
 *   2. Subscription is changed (topic added).
 *   3. Consumer is closed.
 */

static void e_change_subscription_remove_topic(rd_bool_t close_consumer) {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
  std::string topic_name_2 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_2.c_str(), 2, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1, topic_name_2);

  bool subscribed_to_two_topics = false;
  bool done                     = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 4 &&
        !subscribed_to_two_topics) {
      subscribed_to_two_topics = true;
      Test::subscribe(c, topic_name_1);
    }

    if (Test::assignment_partition_count(c, NULL) == 2) {
      Test::Say("Consumer is assigned to one topic\n");
      done = true;
    }
  }

  if (!close_consumer) {
    Test::Say("Closing consumer\n");
    c->close();
  } else
    Test::Say("Skipping close() of consumer\n");

  delete c;

  SUB_TEST_PASS();
}



/* Check that use of consumer->assign() and consumer->unassign() is disallowed
 * when a COOPERATIVE assignor is in use.
 *
 * Except when the consumer is closing, where all forms of unassign are
 * allowed and treated as a full unassign.
 */

class FTestRebalanceCb : public RdKafka::RebalanceCb {
 public:
  bool assigned;
  bool closing;

  FTestRebalanceCb() : assigned(false), closing(false) {
  }

  void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                    RdKafka::ErrorCode err,
                    std::vector<RdKafka::TopicPartition *> &partitions) {
    Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
                      << RdKafka::err2str(err) << (closing ? " (closing)" : "")
                      << "\n");

    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      RdKafka::ErrorCode err_resp = consumer->assign(partitions);
      Test::Say(tostr() << "consumer->assign() response code: " << err_resp
                        << "\n");
      if (err_resp != RdKafka::ERR__STATE)
        Test::Fail(tostr() << "Expected assign to fail with error code: "
                           << RdKafka::ERR__STATE << "(ERR__STATE)");

      RdKafka::Error *error = consumer->incremental_assign(partitions);
      if (error)
        Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
                           << error->str());

      assigned = true;

    } else {
      RdKafka::ErrorCode err_resp = consumer->unassign();
      Test::Say(tostr() << "consumer->unassign() response code: " << err_resp
                        << "\n");

      if (!closing) {
        if (err_resp != RdKafka::ERR__STATE)
          Test::Fail(tostr() << "Expected assign to fail with error code: "
                             << RdKafka::ERR__STATE << "(ERR__STATE)");

        RdKafka::Error *error = consumer->incremental_unassign(partitions);
        if (error)
          Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
                             << error->str());

      } else {
        /* During termination (close()) any type of unassign*() is allowed. */
        if (err_resp)
          Test::Fail(tostr() << "Expected unassign to succeed during close, "
                                "but got: "
                             << RdKafka::ERR__STATE << "(ERR__STATE)");
      }
    }
  }
};


static void f_assign_call_cooperative() {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name.c_str(), 1, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  FTestRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name);

  while (!rebalance_cb.assigned)
    Test::poll_once(c, 500);

  rebalance_cb.closing = true;
  c->close();
  delete c;

  SUB_TEST_PASS();
}



/* Check that use of consumer->incremental_assign() and
 * consumer->incremental_unassign() is disallowed when an EAGER assignor is in
 * use.
 */
class GTestRebalanceCb : public RdKafka::RebalanceCb {
 public:
  bool assigned;
  bool closing;

  GTestRebalanceCb() : assigned(false), closing(false) {
  }

  void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                    RdKafka::ErrorCode err,
                    std::vector<RdKafka::TopicPartition *> &partitions) {
    Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
                      << RdKafka::err2str(err) << "\n");

    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      RdKafka::Error *error = consumer->incremental_assign(partitions);
      Test::Say(tostr() << "consumer->incremental_assign() response: "
                        << (!error ? "NULL" : error->str()) << "\n");
      if (!error)
        Test::Fail("Expected consumer->incremental_assign() to fail");
      if (error->code() != RdKafka::ERR__STATE)
        Test::Fail(tostr() << "Expected consumer->incremental_assign() to fail "
                              "with error code "
                           << RdKafka::ERR__STATE);
      delete error;

      RdKafka::ErrorCode err_resp = consumer->assign(partitions);
      if (err_resp)
        Test::Fail(tostr() << "consumer->assign() failed: " << err_resp);

      assigned = true;

    } else {
      RdKafka::Error *error = consumer->incremental_unassign(partitions);
      Test::Say(tostr() << "consumer->incremental_unassign() response: "
                        << (!error ? "NULL" : error->str()) << "\n");

      if (!closing) {
        if (!error)
          Test::Fail("Expected consumer->incremental_unassign() to fail");
        if (error->code() != RdKafka::ERR__STATE)
          Test::Fail(tostr() << "Expected consumer->incremental_unassign() to "
                                "fail with error code "
                             << RdKafka::ERR__STATE);
        delete error;

        RdKafka::ErrorCode err_resp = consumer->unassign();
        if (err_resp)
          Test::Fail(tostr() << "consumer->unassign() failed: " << err_resp);

      } else {
        /* During termination (close()) any type of unassign*() is allowed. */
        if (error)
          Test::Fail(
              tostr()
              << "Expected incremental_unassign to succeed during close, "
                 "but got: "
              << RdKafka::ERR__STATE << "(ERR__STATE)");
      }
    }
  }
};

static void g_incremental_assign_call_eager() {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name.c_str(), 1, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  GTestRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c = make_consumer(
      "C_1", group_name, "roundrobin", &additional_conf, &rebalance_cb, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name);

  while (!rebalance_cb.assigned)
    Test::poll_once(c, 500);

  rebalance_cb.closing = true;
  c->close();
  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single consumer (rebalance_cb) subscribes to two topics.
 *   2. One of the topics is deleted.
 *   3. Consumer is closed.
 */

static void h_delete_topic() {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
  std::string topic_name_2 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_2.c_str(), 1, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  DefaultRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1, topic_name_2);

  bool deleted = false;
  bool done    = false;
  while (!done) {
    Test::poll_once(c, 500);

    std::vector<RdKafka::TopicPartition *> partitions;
    c->assignment(partitions);

    if (partitions.size() == 2 && !deleted) {
      if (rebalance_cb.assign_call_cnt != 1)
        Test::Fail(tostr() << "Expected 1 assign call, saw "
                           << rebalance_cb.assign_call_cnt << "\n");
      Test::delete_topic(c, topic_name_2.c_str());
      deleted = true;
    }

    if (partitions.size() == 1 && deleted) {
      if (partitions[0]->topic() != topic_name_1)
        Test::Fail(tostr() << "Expecting subscribed topic to be '"
                           << topic_name_1 << "' not '"
                           << partitions[0]->topic() << "'");
      Test::Say(tostr() << "Assignment no longer includes deleted topic '"
                        << topic_name_2 << "'\n");
      done = true;
    }

    RdKafka::TopicPartition::destroy(partitions);
  }

  Test::Say("Closing consumer\n");
  c->close();

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single consumer (rebalance_cb) subscribes to a single topic.
 *   2. That topic is deleted leaving no topics.
 *   3. Consumer is closed.
 */

static void i_delete_topic_2() {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  DefaultRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1);

  bool deleted = false;
  bool done    = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
      if (rebalance_cb.assign_call_cnt != 1)
        Test::Fail(tostr() << "Expected one assign call, saw "
                           << rebalance_cb.assign_call_cnt << "\n");
      Test::delete_topic(c, topic_name_1.c_str());
      deleted = true;
    }

    if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
      Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
      done = true;
    }
  }

  Test::Say("Closing consumer\n");
  c->close();

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. single consumer (without rebalance_cb) subscribes to a single topic.
 *   2. that topic is deleted leaving no topics.
 *   3. consumer is closed.
 */

static void j_delete_topic_no_rb_callback() {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name_1.c_str(), 1, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  RdKafka::KafkaConsumer *c = make_consumer(
      "C_1", group_name, "cooperative-sticky", &additional_conf, NULL, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1);

  bool deleted = false;
  bool done    = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
      Test::delete_topic(c, topic_name_1.c_str());
      deleted = true;
    }

    if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
      Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
      done = true;
    }
  }

  Test::Say("Closing consumer\n");
  c->close();

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Single consumer (rebalance_cb) subscribes to a 1 partition topic.
 *   2. Number of partitions is increased to 2.
 *   3. Consumer is closed.
 */

static void k_add_partition() {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  test_create_topic(NULL, topic_name.c_str(), 1, 1);

  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
  DefaultRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name);

  bool subscribed = false;
  bool done       = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 1 && !subscribed) {
      if (rebalance_cb.assign_call_cnt != 1)
        Test::Fail(tostr() << "Expected 1 assign call, saw "
                           << rebalance_cb.assign_call_cnt);
      if (rebalance_cb.revoke_call_cnt != 0)
        Test::Fail(tostr() << "Expected 0 revoke calls, saw "
                           << rebalance_cb.revoke_call_cnt);
      Test::create_partitions(c, topic_name.c_str(), 2);
      subscribed = true;
    }

    if (Test::assignment_partition_count(c, NULL) == 2 && subscribed) {
      if (rebalance_cb.assign_call_cnt != 2)
        Test::Fail(tostr() << "Expected 2 assign calls, saw "
                           << rebalance_cb.assign_call_cnt);
      if (rebalance_cb.revoke_call_cnt != 0)
        Test::Fail(tostr() << "Expected 0 revoke calls, saw "
                           << rebalance_cb.revoke_call_cnt);
      done = true;
    }
  }

  Test::Say("Closing consumer\n");
  c->close();
  delete c;

  if (rebalance_cb.assign_call_cnt != 2)
    Test::Fail(tostr() << "Expected 2 assign calls, saw "
                       << rebalance_cb.assign_call_cnt);
  if (rebalance_cb.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expected 1 revoke call, saw "
                       << rebalance_cb.revoke_call_cnt);

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. two consumers (with rebalance_cb's) subscribe to two topics.
 *   2. one of the consumers calls unsubscribe.
 *   3. consumers closed.
 */

static void l_unsubscribe() {
  SUB_TEST();

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string topic_name_2 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
  test_create_topic(NULL, topic_name_2.c_str(), 2, 1);

  DefaultRebalanceCb rebalance_cb1;
  RdKafka::KafkaConsumer *c1 = make_consumer(
      "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 30);
  test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c1->c_ptr(), topic_name_2.c_str(), 10 * 1000);

  Test::subscribe(c1, topic_name_1, topic_name_2);

  DefaultRebalanceCb rebalance_cb2;
  RdKafka::KafkaConsumer *c2 = make_consumer(
      "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 30);
  Test::subscribe(c2, topic_name_1, topic_name_2);

  bool done         = false;
  bool unsubscribed = false;
  while (!done) {
    Test::poll_once(c1, 500);
    Test::poll_once(c2, 500);

    if (Test::assignment_partition_count(c1, NULL) == 2 &&
        Test::assignment_partition_count(c2, NULL) == 2) {
      if (rebalance_cb1.assign_call_cnt != 1)
        Test::Fail(
            tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
                    << rebalance_cb1.assign_call_cnt);
      if (rebalance_cb2.assign_call_cnt != 1)
        Test::Fail(
            tostr() << "Expecting consumer 2's assign_call_cnt to be 1 not: "
                    << rebalance_cb2.assign_call_cnt);
      Test::Say("Unsubscribing consumer 1 from both topics\n");
      c1->unsubscribe();
      unsubscribed = true;
    }

    if (unsubscribed && Test::assignment_partition_count(c1, NULL) == 0 &&
        Test::assignment_partition_count(c2, NULL) == 4) {
      if (rebalance_cb1.assign_call_cnt !=
          1) /* is now unsubscribed, so rebalance_cb will no longer be called.
              */
        Test::Fail(
            tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
                    << rebalance_cb1.assign_call_cnt);
      if (rebalance_cb2.assign_call_cnt != 2)
        Test::Fail(
            tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
                    << rebalance_cb2.assign_call_cnt);
      if (rebalance_cb1.revoke_call_cnt != 1)
        Test::Fail(
            tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
                    << rebalance_cb1.revoke_call_cnt);
      if (rebalance_cb2.revoke_call_cnt !=
          0) /* the rebalance_cb should not be called if the revoked partition
                list is empty */
        Test::Fail(
            tostr() << "Expecting consumer 2's revoke_call_cnt to be 0 not: "
                    << rebalance_cb2.revoke_call_cnt);
      Test::Say("Unsubscribe completed");
      done = true;
    }
  }

  Test::Say("Closing consumer 1\n");
  c1->close();
  Test::Say("Closing consumer 2\n");
  c2->close();

  /* there should be no assign rebalance_cb calls on close */
  if (rebalance_cb1.assign_call_cnt != 1)
    Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
                       << rebalance_cb1.assign_call_cnt);
  if (rebalance_cb2.assign_call_cnt != 2)
    Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
                       << rebalance_cb2.assign_call_cnt);

  if (rebalance_cb1.revoke_call_cnt !=
      1) /* should not be called a second revoke rebalance_cb */
    Test::Fail(tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
                       << rebalance_cb1.revoke_call_cnt);
  if (rebalance_cb2.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expecting consumer 2's revoke_call_cnt to be 1 not: "
                       << rebalance_cb2.revoke_call_cnt);

  if (rebalance_cb1.lost_call_cnt != 0)
    Test::Fail(tostr() << "Expecting consumer 1's lost_call_cnt to be 0, not: "
                       << rebalance_cb1.lost_call_cnt);
  if (rebalance_cb2.lost_call_cnt != 0)
    Test::Fail(tostr() << "Expecting consumer 2's lost_call_cnt to be 0, not: "
                       << rebalance_cb2.lost_call_cnt);

  delete c1;
  delete c2;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. A consumers (with no rebalance_cb) subscribes to a topic.
 *   2. The consumer calls unsubscribe.
 *   3. Consumers closed.
 */

static void m_unsubscribe_2() {
  SUB_TEST();

  std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name.c_str(), 2, 1);

  RdKafka::KafkaConsumer *c =
      make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
  test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name);

  bool done         = false;
  bool unsubscribed = false;
  while (!done) {
    Test::poll_once(c, 500);

    if (Test::assignment_partition_count(c, NULL) == 2) {
      Test::unsubscribe(c);
      unsubscribed = true;
    }

    if (unsubscribed && Test::assignment_partition_count(c, NULL) == 0) {
      Test::Say("Unsubscribe completed");
      done = true;
    }
  }

  Test::Say("Closing consumer\n");
  c->close();

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Two consumers (with rebalance_cb) subscribe to a regex (no matching
 * topics exist)
 *   2. Create two topics.
 *   3. Remove one of the topics.
 *   3. Consumers closed.
 */

static void n_wildcard() {
  SUB_TEST();

  const string topic_base_name = Test::mk_topic_name("0113-n_wildcard", 1);
  const string topic_name_1    = topic_base_name + "_1";
  const string topic_name_2    = topic_base_name + "_2";
  const string topic_regex     = "^" + topic_base_name + "_.";
  const string group_name      = Test::mk_unique_group_name("0113-n_wildcard");

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));

  DefaultRebalanceCb rebalance_cb1;
  RdKafka::KafkaConsumer *c1 =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb1, 30);
  Test::subscribe(c1, topic_regex);

  DefaultRebalanceCb rebalance_cb2;
  RdKafka::KafkaConsumer *c2 =
      make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb2, 30);
  Test::subscribe(c2, topic_regex);

  /* There are no matching topics, so the consumers should not join the group
   * initially */
  Test::poll_once(c1, 500);
  Test::poll_once(c2, 500);

  if (rebalance_cb1.assign_call_cnt != 0)
    Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 0 not: "
                       << rebalance_cb1.assign_call_cnt);
  if (rebalance_cb2.assign_call_cnt != 0)
    Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 0 not: "
                       << rebalance_cb2.assign_call_cnt);

  bool done                    = false;
  bool created_topics          = false;
  bool deleted_topic           = false;
  int last_cb1_assign_call_cnt = 0;
  int last_cb2_assign_call_cnt = 0;
  while (!done) {
    Test::poll_once(c1, 500);
    Test::poll_once(c2, 500);

    if (Test::assignment_partition_count(c1, NULL) == 0 &&
        Test::assignment_partition_count(c2, NULL) == 0 && !created_topics) {
      Test::Say(
          "Creating two topics with 2 partitions each that match regex\n");
      test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
      test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
      /* The consumers should autonomously discover these topics and start
       * consuming from them. This happens in the background - is not
       * influenced by whether we wait for the topics to be created before
       * continuing the main loop. It is possible that both topics are
       * discovered simultaneously, requiring a single rebalance OR that
       * topic 1 is discovered first (it was created first), a rebalance
       * initiated, then topic 2 discovered, then another rebalance
       * initiated to include it.
       */
      created_topics = true;
    }

    if (Test::assignment_partition_count(c1, NULL) == 2 &&
        Test::assignment_partition_count(c2, NULL) == 2 && !deleted_topic) {
      if (rebalance_cb1.nonempty_assign_call_cnt == 1) {
        /* just one rebalance was required */
        TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 1,
                    "Expecting C_1's nonempty_assign_call_cnt to be 1 not %d ",
                    rebalance_cb1.nonempty_assign_call_cnt);
        TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 1,
                    "Expecting C_2's nonempty_assign_call_cnt to be 1 not %d ",
                    rebalance_cb2.nonempty_assign_call_cnt);
      } else {
        /* two rebalances were required (occurs infrequently) */
        TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 2,
                    "Expecting C_1's nonempty_assign_call_cnt to be 2 not %d ",
                    rebalance_cb1.nonempty_assign_call_cnt);
        TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 2,
                    "Expecting C_2's nonempty_assign_call_cnt to be 2 not %d ",
                    rebalance_cb2.nonempty_assign_call_cnt);
      }

      TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 0,
                  "Expecting C_1's revoke_call_cnt to be 0 not %d ",
                  rebalance_cb1.revoke_call_cnt);
      TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 0,
                  "Expecting C_2's revoke_call_cnt to be 0 not %d ",
                  rebalance_cb2.revoke_call_cnt);

      last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
      last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;

      Test::Say("Deleting topic 1\n");
      Test::delete_topic(c1, topic_name_1.c_str());
      deleted_topic = true;
    }

    if (Test::assignment_partition_count(c1, NULL) == 1 &&
        Test::assignment_partition_count(c2, NULL) == 1 && deleted_topic) {
      /* accumulated in lost case as well */
      TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 1,
                  "Expecting C_1's revoke_call_cnt to be 1 not %d",
                  rebalance_cb1.revoke_call_cnt);
      TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 1,
                  "Expecting C_2's revoke_call_cnt to be 1 not %d",
                  rebalance_cb2.revoke_call_cnt);
      TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
                  "Expecting C_1's lost_call_cnt to be 1 not %d",
                  rebalance_cb1.lost_call_cnt);
      TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
                  "Expecting C_2's lost_call_cnt to be 1 not %d",
                  rebalance_cb2.lost_call_cnt);

      /* Consumers will rejoin group after revoking the lost partitions.
       * this will result in an rebalance_cb assign (empty partitions).
       * it follows the revoke, which has already been confirmed to have
       * happened. */
      Test::Say("Waiting for rebalance_cb assigns\n");
      while (rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt ||
             rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt) {
        Test::poll_once(c1, 500);
        Test::poll_once(c2, 500);
      }

      Test::Say("Consumers are subscribed to one partition each\n");
      done = true;
    }
  }

  Test::Say("Closing consumer 1\n");
  last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
  c1->close();

  /* There should be no assign rebalance_cb calls on close */
  TEST_ASSERT(rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt,
              "Expecting C_1's assign_call_cnt to be %d not %d",
              last_cb1_assign_call_cnt, rebalance_cb1.assign_call_cnt);

  /* Let C_2 catch up on the rebalance and get assigned C_1's partitions. */
  last_cb2_assign_call_cnt = rebalance_cb2.nonempty_assign_call_cnt;
  while (rebalance_cb2.nonempty_assign_call_cnt == last_cb2_assign_call_cnt)
    Test::poll_once(c2, 500);

  Test::Say("Closing consumer 2\n");
  last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;
  c2->close();

  /* There should be no assign rebalance_cb calls on close */
  TEST_ASSERT(rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt,
              "Expecting C_2's assign_call_cnt to be %d not %d",
              last_cb2_assign_call_cnt, rebalance_cb2.assign_call_cnt);

  TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 2,
              "Expecting C_1's revoke_call_cnt to be 2 not %d",
              rebalance_cb1.revoke_call_cnt);
  TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 2,
              "Expecting C_2's revoke_call_cnt to be 2 not %d",
              rebalance_cb2.revoke_call_cnt);

  TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
              "Expecting C_1's lost_call_cnt to be 1, not %d",
              rebalance_cb1.lost_call_cnt);
  TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
              "Expecting C_2's lost_call_cnt to be 1, not %d",
              rebalance_cb2.lost_call_cnt);

  delete c1;
  delete c2;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *   1. Consumer (librdkafka) subscribes to two topics (2 and 6 partitions).
 *   2. Consumer (java) subscribes to the same two topics.
 *   3. Consumer (librdkafka) unsubscribes from the two partition topic.
 *   4. Consumer (java) process closes upon detecting the above unsubscribe.
 *   5. Consumer (librdkafka) will now be subscribed to 6 partitions.
 *   6. Close librdkafka consumer.
 */

static void o_java_interop() {
  SUB_TEST();

  if (*test_conf_get(NULL, "sasl.mechanism") != '\0')
    SUB_TEST_SKIP(
        "Cluster is set up for SASL: we won't bother with that "
        "for the Java client\n");

  std::string topic_name_1 = Test::mk_topic_name("0113_o_2", 1);
  std::string topic_name_2 = Test::mk_topic_name("0113_o_6", 1);
  std::string group_name   = Test::mk_unique_group_name("0113_o");
  test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
  test_create_topic(NULL, topic_name_2.c_str(), 6, 1);

  DefaultRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c = make_consumer(
      "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);

  Test::subscribe(c, topic_name_1, topic_name_2);

  bool done                      = false;
  bool changed_subscription      = false;
  bool changed_subscription_done = false;
  int java_pid                   = 0;
  while (!done) {
    Test::poll_once(c, 500);

    if (1)  // FIXME: Remove after debugging
      Test::Say(tostr() << "Assignment partition count: "
                        << Test::assignment_partition_count(c, NULL)
                        << ", changed_sub " << changed_subscription
                        << ", changed_sub_done " << changed_subscription_done
                        << ", assign_call_cnt " << rebalance_cb.assign_call_cnt
                        << "\n");
    if (Test::assignment_partition_count(c, NULL) == 8 && !java_pid) {
      Test::Say(_C_GRN "librdkafka consumer assigned to 8 partitions\n");
      string bootstrapServers = get_bootstrap_servers();
      const char *argv[1 + 1 + 1 + 1 + 1 + 1];
      size_t i  = 0;
      argv[i++] = "test1";
      argv[i++] = bootstrapServers.c_str();
      argv[i++] = topic_name_1.c_str();
      argv[i++] = topic_name_2.c_str();
      argv[i++] = group_name.c_str();
      argv[i]   = NULL;
      java_pid  = test_run_java("IncrementalRebalanceCli", argv);
      if (java_pid <= 0)
        Test::Fail(tostr() << "Unexpected pid: " << java_pid);
    }

    if (Test::assignment_partition_count(c, NULL) == 4 && java_pid != 0 &&
        !changed_subscription) {
      if (rebalance_cb.assign_call_cnt != 2)
        Test::Fail(tostr() << "Expecting consumer's assign_call_cnt to be 2, "
                              "not "
                           << rebalance_cb.assign_call_cnt);
      Test::Say(_C_GRN "Java consumer is now part of the group\n");
      Test::subscribe(c, topic_name_1);
      changed_subscription = true;
    }

    /* Depending on the timing of resubscribe rebalancing and the
     * Java consumer terminating we might have one or two rebalances,
     * hence the fuzzy <=5 and >=5 checks. */
    if (Test::assignment_partition_count(c, NULL) == 2 &&
        changed_subscription && rebalance_cb.assign_call_cnt <= 5 &&
        !changed_subscription_done) {
      /* All topic 1 partitions will be allocated to this consumer whether or
       * not the Java consumer has unsubscribed yet because the sticky algorithm
       * attempts to ensure partition counts are even. */
      Test::Say(_C_GRN "Consumer 1 has unsubscribed from topic 2\n");
      changed_subscription_done = true;
    }

    if (Test::assignment_partition_count(c, NULL) == 2 &&
        changed_subscription && rebalance_cb.assign_call_cnt >= 5 &&
        changed_subscription_done) {
      /* When the java consumer closes, this will cause an empty assign
       * rebalance_cb event, allowing detection of when this has happened. */
      Test::Say(_C_GRN "Java consumer has left the group\n");
      done = true;
    }
  }

  Test::Say("Closing consumer\n");
  c->close();

  /* Expected behavior is IncrementalRebalanceCli will exit cleanly, timeout
   * otherwise. */
  test_waitpid(java_pid);

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *  - Single consumer subscribes to topic.
 *  - Soon after (timing such that rebalance is probably in progress) it
 * subscribes to a different topic.
 */

static void s_subscribe_when_rebalancing(int variation) {
  SUB_TEST("variation %d", variation);

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string topic_name_2 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string topic_name_3 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
  test_create_topic(NULL, topic_name_2.c_str(), 1, 1);
  test_create_topic(NULL, topic_name_3.c_str(), 1, 1);

  DefaultRebalanceCb rebalance_cb;
  RdKafka::KafkaConsumer *c = make_consumer(
      "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
  test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
  test_wait_topic_exists(c->c_ptr(), topic_name_3.c_str(), 10 * 1000);

  if (variation == 2 || variation == 4 || variation == 6) {
    /* Pre-cache metadata for all topics. */
    class RdKafka::Metadata *metadata;
    c->metadata(true, NULL, &metadata, 5000);
    delete metadata;
  }

  Test::subscribe(c, topic_name_1);
  Test::wait_for_assignment(c, 1, &topic_name_1);

  Test::subscribe(c, topic_name_2);

  if (variation == 3 || variation == 5)
    Test::poll_once(c, 500);

  if (variation < 5) {
    // Very quickly after subscribing to topic 2, subscribe to topic 3.
    Test::subscribe(c, topic_name_3);
    Test::wait_for_assignment(c, 1, &topic_name_3);
  } else {
    // ..or unsubscribe.
    Test::unsubscribe(c);
    Test::wait_for_assignment(c, 0, NULL);
  }

  delete c;

  SUB_TEST_PASS();
}



/* Check behavior when:
 *  - Two consumer subscribe to a topic.
 *  - Max poll interval is exceeded on the first consumer.
 */

static void t_max_poll_interval_exceeded(int variation) {
  SUB_TEST("variation %d", variation);

  std::string topic_name_1 =
      Test::mk_topic_name("0113-cooperative_rebalance", 1);
  std::string group_name =
      Test::mk_unique_group_name("0113-cooperative_rebalance");
  test_create_topic(NULL, topic_name_1.c_str(), 2, 1);

  std::vector<std::pair<std::string, std::string> > additional_conf;
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("session.timeout.ms"), std::string("6000")));
  additional_conf.push_back(std::pair<std::string, std::string>(
      std::string("max.poll.interval.ms"), std::string("7000")));

  DefaultRebalanceCb rebalance_cb1;
  RdKafka::KafkaConsumer *c1 =
      make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb1, 30);
  DefaultRebalanceCb rebalance_cb2;
  RdKafka::KafkaConsumer *c2 =
      make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
                    &rebalance_cb2, 30);

  test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
  test_wait_topic_exists(c2->c_ptr(), topic_name_1.c_str(), 10 * 1000);

  Test::subscribe(c1, topic_name_1);
  Test::subscribe(c2, topic_name_1);

  bool done                    = false;
  bool both_have_been_assigned = false;
  while (!done) {
    if (!both_have_been_assigned)
      Test::poll_once(c1, 500);
    Test::poll_once(c2, 500);

    if (Test::assignment_partition_count(c1, NULL) == 1 &&
        Test::assignment_partition_count(c2, NULL) == 1 &&
        !both_have_been_assigned) {
      Test::Say(
          tostr()
          << "Both consumers are assigned to topic " << topic_name_1
          << ". WAITING 7 seconds for max.poll.interval.ms to be exceeded\n");
      both_have_been_assigned = true;
    }

    if (Test::assignment_partition_count(c2, NULL) == 2 &&
        both_have_been_assigned) {
      Test::Say("Consumer 1 is no longer assigned any partitions, done\n");
      done = true;
    }
  }

  if (variation == 1) {
    if (rebalance_cb1.lost_call_cnt != 0)
      Test::Fail(
          tostr() << "Expected consumer 1 lost revoke count to be 0, not: "
                  << rebalance_cb1.lost_call_cnt);
    Test::poll_once(c1,
                    500); /* Eat the max poll interval exceeded error message */
    Test::poll_once(c1,
                    500); /* Trigger the rebalance_cb with lost partitions */
    if (rebalance_cb1.lost_call_cnt != 1)
      Test::Fail(
          tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
                  << rebalance_cb1.lost_call_cnt);
  }

  c1->close();
  c2->close();

  if (rebalance_cb1.lost_call_cnt != 1)
    Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
                       << rebalance_cb1.lost_call_cnt);

  if (rebalance_cb1.assign_call_cnt != 1)
    Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: "
                       << rebalance_cb1.assign_call_cnt);
  if (rebalance_cb2.assign_call_cnt != 2)
    Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: "
                       << rebalance_cb1.assign_call_cnt);

  if (rebalance_cb1.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: "
                       << rebalance_cb1.revoke_call_cnt);
  if (rebalance_cb2.revoke_call_cnt != 1)
    Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: "
                       << rebalance_cb1.revoke_call_cnt);

  delete c1;
  delete c2;

  SUB_TEST_PASS();
}


/**
 * @brief Poll all consumers until there are no more events or messages
 *        and the timeout has expired.
 */
static void poll_all_consumers(RdKafka::KafkaConsumer **consumers,
                               DefaultRebalanceCb *rebalance_cbs,
                               size_t num,
                               int timeout_ms) {
  int64_t ts_end = test_clock() + (timeout_ms * 1000);

  /* Poll all consumers until no more events are seen,
   * this makes sure we exhaust the current state events before returning. */
  bool evented;
  do {
    evented = false;
    for (size_t i = 0; i < num; i++) {
      int block_ms = min(10, (int)((ts_end - test_clock()) / 1000));
      while (rebalance_cbs[i].poll_once(consumers[i], max(block_ms, 0)))
        evented = true;
    }
  } while (evented || test_clock() < ts_end);
}


/**
 * @brief Stress test with 8 consumers subscribing, fetching and committing.
 *
 * @param subscription_variation 0..2
 *
 * TODO: incorporate committing offsets.
 */

static void u_multiple_subscription_changes(bool use_rebalance_cb,
                                            int subscription_variation) {
  const int N_CONSUMERS          = 8;
  const int N_TOPICS             = 2;
  const int N_PARTS_PER_TOPIC    = N_CONSUMERS * N_TOPICS;
  const int N_PARTITIONS         = N_PARTS_PER_TOPIC * N_TOPICS;
  const int N_MSGS_PER_PARTITION = 1000;

  SUB_TEST("use_rebalance_cb: %d, subscription_variation: %d",
           (int)use_rebalance_cb, subscription_variation);

  string topic_name_1 = Test::mk_topic_name("0113u_1", 1);
  string topic_name_2 = Test::mk_topic_name("0113u_2", 1);
  string group_name   = Test::mk_unique_group_name("0113u");

  test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, 1);
  test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, 1);

  Test::Say("Creating consumers\n");
  DefaultRebalanceCb rebalance_cbs[N_CONSUMERS];
  RdKafka::KafkaConsumer *consumers[N_CONSUMERS];

  for (int i = 0; i < N_CONSUMERS; i++) {
    std::string name = tostr() << "C_" << i;
    consumers[i] =
        make_consumer(name.c_str(), group_name, "cooperative-sticky", NULL,
                      use_rebalance_cb ? &rebalance_cbs[i] : NULL, 120);
  }

  test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_1.c_str(),
                         10 * 1000);
  test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_2.c_str(),
                         10 * 1000);


  /*
   * Seed all partitions with the same number of messages so we later can
   * verify that consumption is working.
   */
  vector<pair<Toppar, int> > ptopics;
  ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_1, N_PARTS_PER_TOPIC),
                                      N_MSGS_PER_PARTITION));
  ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_2, N_PARTS_PER_TOPIC),
                                      N_MSGS_PER_PARTITION));
  produce_msgs(ptopics);


  /*
   * Track what topics a consumer should be subscribed to and use this to
   * verify both its subscription and assignment throughout the test.
   */

  /* consumer -> currently subscribed topics */
  map<int, vector<string> > consumer_topics;

  /* topic -> consumers subscribed to topic */
  map<string, set<int> > topic_consumers;

  /* The subscription alternatives that consumers
   * alter between in the playbook. */
  vector<string> SUBSCRIPTION_1;
  vector<string> SUBSCRIPTION_2;

  SUBSCRIPTION_1.push_back(topic_name_1);

  switch (subscription_variation) {
  case 0:
    SUBSCRIPTION_2.push_back(topic_name_1);
    SUBSCRIPTION_2.push_back(topic_name_2);
    break;

  case 1:
    SUBSCRIPTION_2.push_back(topic_name_2);
    break;

  case 2:
    /* No subscription */
    break;
  }

  sort(SUBSCRIPTION_1.begin(), SUBSCRIPTION_1.end());
  sort(SUBSCRIPTION_2.begin(), SUBSCRIPTION_2.end());


  /*
   * Define playbook
   */
  const struct {
    int timestamp_ms;
    int consumer;
    const vector<string> *topics;
  } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */
                  {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */
                  {4000, 1, &SUBSCRIPTION_1},  {4000, 1, &SUBSCRIPTION_1},
                  {4000, 1, &SUBSCRIPTION_1},  {4000, 2, &SUBSCRIPTION_1},
                  {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */
                  {6000, 4, &SUBSCRIPTION_1},  {6000, 5, &SUBSCRIPTION_1},
                  {6000, 6, &SUBSCRIPTION_1},  {6000, 7, &SUBSCRIPTION_2},
                  {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */
                  {6000, 1, &SUBSCRIPTION_2},  {6000, 1, &SUBSCRIPTION_1},
                  {6000, 2, &SUBSCRIPTION_2},  {7000, 2, &SUBSCRIPTION_1},
                  {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */
                  {8000, 0, &SUBSCRIPTION_2},  {8000, 1, &SUBSCRIPTION_1},
                  {8000, 0, &SUBSCRIPTION_1},  {13000, 2, &SUBSCRIPTION_1},
                  {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */
                  {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2},
                  {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1},
                  {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */
                  {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}};

  /*
   * Run the playbook
   */
  int cmd_number    = 0;
  uint64_t ts_start = test_clock();

  while (playbook[cmd_number].timestamp_ms != INT_MAX) {
    TEST_ASSERT(playbook[cmd_number].consumer < N_CONSUMERS);

    Test::Say(tostr() << "Cmd #" << cmd_number << ": wait "
                      << playbook[cmd_number].timestamp_ms << "ms\n");

    poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS,
                       playbook[cmd_number].timestamp_ms -
                           (int)((test_clock() - ts_start) / 1000));

    /* Verify consumer assignments match subscribed topics */
    map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
    for (int i = 0; i < N_CONSUMERS; i++)
      verify_consumer_assignment(
          consumers[i], rebalance_cbs[i], consumer_topics[i],
          /* Allow empty assignment */
          true,
          /* Allow mismatch between subscribed topics
           * and actual assignment since we can't
           * synchronize the last subscription
           * to the current assignment due to
           * an unknown number of rebalances required
           * for the final assignment to settle.
           * This is instead checked at the end of
           * this test case. */
          true, &all_assignments, -1 /* no msgcnt check*/);

    int cid                          = playbook[cmd_number].consumer;
    RdKafka::KafkaConsumer *consumer = consumers[playbook[cmd_number].consumer];
    const vector<string> *topics     = playbook[cmd_number].topics;

    /*
     * Update our view of the consumer's subscribed topics and vice versa.
     */
    for (vector<string>::const_iterator it = consumer_topics[cid].begin();
         it != consumer_topics[cid].end(); it++) {
      topic_consumers[*it].erase(cid);
    }

    consumer_topics[cid].clear();

    for (vector<string>::const_iterator it = topics->begin();
         it != topics->end(); it++) {
      consumer_topics[cid].push_back(*it);
      topic_consumers[*it].insert(cid);
    }

    RdKafka::ErrorCode err;

    /*
     * Change subscription
     */
    if (!topics->empty()) {
      Test::Say(tostr() << "Consumer: " << consumer->name()
                        << " is subscribing to topics "
                        << string_vec_to_str(*topics) << " after "
                        << ((test_clock() - ts_start) / 1000) << "ms\n");
      err = consumer->subscribe(*topics);
      TEST_ASSERT(!err, "Expected subscribe() to succeed, got %s",
                  RdKafka::err2str(err).c_str());
    } else {
      Test::Say(tostr() << "Consumer: " << consumer->name()
                        << " is unsubscribing after "
                        << ((test_clock() - ts_start) / 1000) << "ms\n");
      Test::unsubscribe(consumer);
    }

    /* Mark this consumer as waiting for rebalance so that
     * verify_consumer_assignment() allows assigned partitions that
     * (no longer) match the subscription. */
    rebalance_cbs[cid].wait_rebalance = true;


    /*
     * Verify subscription matches what we think it should be.
     */
    vector<string> subscription;
    err = consumer->subscription(subscription);
    TEST_ASSERT(!err, "consumer %s subscription() failed: %s",
                consumer->name().c_str(), RdKafka::err2str(err).c_str());

    sort(subscription.begin(), subscription.end());

    Test::Say(tostr() << "Consumer " << consumer->name()
                      << " subscription is now "
                      << string_vec_to_str(subscription) << "\n");

    if (subscription != *topics)
      Test::Fail(tostr() << "Expected consumer " << consumer->name()
                         << " subscription: " << string_vec_to_str(*topics)
                         << " but got: " << string_vec_to_str(subscription));

    cmd_number++;
  }


  /*
   * Wait for final rebalances and all consumers to settle,
   * then verify assignments and received message counts.
   */
  Test::Say(_C_YEL "Waiting for final assignment state\n");
  int done_count = 0;
  /* Allow at least 20 seconds for group to stabilize. */
  int64_t stabilize_until = test_clock() + (20 * 1000 * 1000); /* 20s */

  while (done_count < 2) {
    bool stabilized = test_clock() > stabilize_until;

    poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS, 5000);

    /* Verify consumer assignments */
    int counts[N_CONSUMERS];
    map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
    Test::Say(tostr() << "Consumer assignments "
                      << "(subscription_variation " << subscription_variation
                      << ")" << (stabilized ? " (stabilized)" : "")
                      << (use_rebalance_cb ? " (use_rebalance_cb)"
                                           : " (no rebalance cb)")
                      << ":\n");
    for (int i = 0; i < N_CONSUMERS; i++) {
      bool last_rebalance_stabilized =
          stabilized &&
          (!use_rebalance_cb ||
           /* session.timeout.ms * 2 + 1 */
           test_clock() > rebalance_cbs[i].ts_last_assign + (13 * 1000 * 1000));

      counts[i] = verify_consumer_assignment(
          consumers[i], rebalance_cbs[i], consumer_topics[i],
          /* allow empty */
          true,
          /* if we're waiting for a
           * rebalance it is okay for the
           * current assignment to contain
           * topics that this consumer
           * (no longer) subscribes to. */
          !last_rebalance_stabilized || !use_rebalance_cb ||
              rebalance_cbs[i].wait_rebalance,
          /* do not allow assignments for
           * topics that are not subscribed*/
          &all_assignments,
          /* Verify received message counts
           * once the assignments have
           * stabilized.
           * Requires the rebalance cb.*/
          done_count > 0 && use_rebalance_cb ? N_MSGS_PER_PARTITION : -1);
    }

    Test::Say(tostr() << all_assignments.size() << "/" << N_PARTITIONS
                      << " partitions assigned\n");

    bool done = true;
    for (int i = 0; i < N_CONSUMERS; i++) {
      /* For each topic the consumer subscribes to it should
       * be assigned its share of partitions. */
      int exp_parts = 0;
      for (vector<string>::const_iterator it = consumer_topics[i].begin();
           it != consumer_topics[i].end(); it++)
        exp_parts += N_PARTS_PER_TOPIC / (int)topic_consumers[*it].size();

      Test::Say(tostr() << (counts[i] == exp_parts ? "" : _C_YEL) << "Consumer "
                        << consumers[i]->name() << " has " << counts[i]
                        << " assigned partitions (" << consumer_topics[i].size()
                        << " subscribed topic(s))"
                        << ", expecting " << exp_parts
                        << " assigned partitions\n");

      if (counts[i] != exp_parts)
        done = false;
    }

    if (done && stabilized) {
      done_count++;
      Test::Say(tostr() << "All assignments verified, done count is "
                        << done_count << "\n");
    }
  }

  Test::Say("Disposing consumers\n");
  for (int i = 0; i < N_CONSUMERS; i++) {
    TEST_ASSERT(!use_rebalance_cb || !rebalance_cbs[i].wait_rebalance,
                "Consumer %d still waiting for rebalance", i);
    if (i & 1)
      consumers[i]->close();
    delete consumers[i];
  }

  SUB_TEST_PASS();
}



extern "C" {

static int rebalance_cnt;
static rd_kafka_resp_err_t rebalance_exp_event;
static rd_bool_t rebalance_exp_lost;

extern void test_print_partition_list(
    const rd_kafka_topic_partition_list_t *partitions);


static void rebalance_cb(rd_kafka_t *rk,
                         rd_kafka_resp_err_t err,
                         rd_kafka_topic_partition_list_t *parts,
                         void *opaque) {
  rebalance_cnt++;
  TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt,
           rd_kafka_err2name(err), parts->cnt);

  test_print_partition_list(parts);

  TEST_ASSERT(err == rebalance_exp_event ||
                  rebalance_exp_event == RD_KAFKA_RESP_ERR_NO_ERROR,
              "Expected rebalance event %s, not %s",
              rd_kafka_err2name(rebalance_exp_event), rd_kafka_err2name(err));

  if (rebalance_exp_lost) {
    TEST_ASSERT(rd_kafka_assignment_lost(rk), "Expected partitions lost");
    TEST_SAY("Partitions were lost\n");
  }

  if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
    test_consumer_incremental_assign("assign", rk, parts);
  } else {
    test_consumer_incremental_unassign("unassign", rk, parts);
  }
}

/**
 * @brief Wait for an expected rebalance event, or fail.
 */
static void expect_rebalance0(const char *func,
                              int line,
                              const char *what,
                              rd_kafka_t *c,
                              rd_kafka_resp_err_t exp_event,
                              rd_bool_t exp_lost,
                              int timeout_s) {
  int64_t tmout = test_clock() + (timeout_s * 1000000);
  int start_cnt = rebalance_cnt;

  TEST_SAY("%s:%d: Waiting for %s (%s) for %ds\n", func, line, what,
           rd_kafka_err2name(exp_event), timeout_s);

  rebalance_exp_lost  = exp_lost;
  rebalance_exp_event = exp_event;

  while (tmout > test_clock() && rebalance_cnt == start_cnt) {
    test_consumer_poll_once(c, NULL, 1000);
  }

  if (rebalance_cnt == start_cnt + 1) {
    rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
    rebalance_exp_lost = exp_lost = rd_false;
    return;
  }

  TEST_FAIL("%s:%d: Timed out waiting for %s (%s)", func, line, what,
            rd_kafka_err2name(exp_event));
}

#define expect_rebalance(WHAT, C, EXP_EVENT, EXP_LOST, TIMEOUT_S)              \
  expect_rebalance0(__FUNCTION__, __LINE__, WHAT, C, EXP_EVENT, EXP_LOST,      \
                    TIMEOUT_S)


/* Check lost partitions revoke occurs on ILLEGAL_GENERATION heartbeat error.
 */

static void p_lost_partitions_heartbeat_illegal_generation_test() {
  const char *bootstraps;
  rd_kafka_mock_cluster_t *mcluster;
  const char *groupid = "mygroup";
  const char *topic   = "test";
  rd_kafka_t *c;
  rd_kafka_conf_t *conf;

  SUB_TEST_QUICK();

  mcluster = test_mock_cluster_new(3, &bootstraps);

  rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);

  /* Seed the topic with messages */
  test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers",
                           bootstraps, "batch.num.messages", "10",
                           "security.protocol", "plaintext", NULL);

  test_conf_init(&conf, NULL, 30);
  test_conf_set(conf, "bootstrap.servers", bootstraps);
  test_conf_set(conf, "security.protocol", "PLAINTEXT");
  test_conf_set(conf, "group.id", groupid);
  test_conf_set(conf, "session.timeout.ms", "5000");
  test_conf_set(conf, "heartbeat.interval.ms", "1000");
  test_conf_set(conf, "auto.offset.reset", "earliest");
  test_conf_set(conf, "enable.auto.commit", "false");
  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");

  c = test_create_consumer(groupid, rebalance_cb, conf, NULL);

  test_consumer_subscribe(c, topic);

  expect_rebalance("initial assignment", c,
                   RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*don't expect lost*/, 5 + 2);

  /* Fail heartbeats */
  rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_Heartbeat, 5,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);

  expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
                   rd_true /*expect lost*/, 10 + 2);

  rd_kafka_mock_clear_request_errors(mcluster, RD_KAFKAP_Heartbeat);

  expect_rebalance("rejoin after lost", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*don't expect lost*/, 10 + 2);

  TEST_SAY("Closing consumer\n");
  test_consumer_close(c);

  TEST_SAY("Destroying consumer\n");
  rd_kafka_destroy(c);

  TEST_SAY("Destroying mock cluster\n");
  test_mock_cluster_destroy(mcluster);

  SUB_TEST_PASS();
}



/* Check lost partitions revoke occurs on ILLEGAL_GENERATION JoinGroup
 * or SyncGroup error.
 */

static void q_lost_partitions_illegal_generation_test(
    rd_bool_t test_joingroup_fail) {
  const char *bootstraps;
  rd_kafka_mock_cluster_t *mcluster;
  const char *groupid = "mygroup";
  const char *topic1  = "test1";
  const char *topic2  = "test2";
  rd_kafka_t *c;
  rd_kafka_conf_t *conf;
  rd_kafka_resp_err_t err;
  rd_kafka_topic_partition_list_t *topics;

  SUB_TEST0(!test_joingroup_fail /*quick*/, "test_joingroup_fail=%d",
            test_joingroup_fail);

  mcluster = test_mock_cluster_new(3, &bootstraps);

  rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);

  /* Seed the topic1 with messages */
  test_produce_msgs_easy_v(topic1, 0, 0, 0, 100, 10, "bootstrap.servers",
                           bootstraps, "batch.num.messages", "10",
                           "security.protocol", "plaintext", NULL);

  /* Seed the topic2 with messages */
  test_produce_msgs_easy_v(topic2, 0, 0, 0, 100, 10, "bootstrap.servers",
                           bootstraps, "batch.num.messages", "10",
                           "security.protocol", "plaintext", NULL);

  test_conf_init(&conf, NULL, 30);
  test_conf_set(conf, "bootstrap.servers", bootstraps);
  test_conf_set(conf, "security.protocol", "PLAINTEXT");
  test_conf_set(conf, "group.id", groupid);
  test_conf_set(conf, "session.timeout.ms", "5000");
  test_conf_set(conf, "heartbeat.interval.ms", "1000");
  test_conf_set(conf, "auto.offset.reset", "earliest");
  test_conf_set(conf, "enable.auto.commit", "false");
  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");

  c = test_create_consumer(groupid, rebalance_cb, conf, NULL);

  test_consumer_subscribe(c, topic1);

  expect_rebalance("initial assignment", c,
                   RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*don't expect lost*/, 5 + 2);

  /* Fail JoinGroups or SyncGroups */
  rd_kafka_mock_push_request_errors(
      mcluster, test_joingroup_fail ? RD_KAFKAP_JoinGroup : RD_KAFKAP_SyncGroup,
      5, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
      RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
      RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
      RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
      RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);

  topics = rd_kafka_topic_partition_list_new(2);
  rd_kafka_topic_partition_list_add(topics, topic1, RD_KAFKA_PARTITION_UA);
  rd_kafka_topic_partition_list_add(topics, topic2, RD_KAFKA_PARTITION_UA);
  err = rd_kafka_subscribe(c, topics);
  if (err)
    TEST_FAIL("%s: Failed to subscribe to topics: %s\n", rd_kafka_name(c),
              rd_kafka_err2str(err));
  rd_kafka_topic_partition_list_destroy(topics);

  expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
                   rd_true /*expect lost*/, 10 + 2);

  rd_kafka_mock_clear_request_errors(mcluster, test_joingroup_fail
                                                   ? RD_KAFKAP_JoinGroup
                                                   : RD_KAFKAP_SyncGroup);

  expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*expect lost*/, 10 + 2);

  TEST_SAY("Closing consumer\n");
  test_consumer_close(c);

  TEST_SAY("Destroying consumer\n");
  rd_kafka_destroy(c);

  TEST_SAY("Destroying mock cluster\n");
  test_mock_cluster_destroy(mcluster);

  SUB_TEST_PASS();
}



/* Check lost partitions revoke occurs on ILLEGAL_GENERATION Commit
 * error.
 */

static void r_lost_partitions_commit_illegal_generation_test_local() {
  const char *bootstraps;
  rd_kafka_mock_cluster_t *mcluster;
  const char *groupid = "mygroup";
  const char *topic   = "test";
  const int msgcnt    = 100;
  rd_kafka_t *c;
  rd_kafka_conf_t *conf;

  SUB_TEST();

  mcluster = test_mock_cluster_new(3, &bootstraps);

  rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);

  /* Seed the topic with messages */
  test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10, "bootstrap.servers",
                           bootstraps, "batch.num.messages", "10",
                           "security.protocol", "plaintext", NULL);

  test_conf_init(&conf, NULL, 30);
  test_conf_set(conf, "bootstrap.servers", bootstraps);
  test_conf_set(conf, "security.protocol", "PLAINTEXT");
  test_conf_set(conf, "group.id", groupid);
  test_conf_set(conf, "auto.offset.reset", "earliest");
  test_conf_set(conf, "enable.auto.commit", "false");
  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");

  c = test_create_consumer(groupid, rebalance_cb, conf, NULL);

  test_consumer_subscribe(c, topic);

  expect_rebalance("initial assignment", c,
                   RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*don't expect lost*/, 5 + 2);


  /* Consume some messages so that the commit has something to commit. */
  test_consumer_poll("consume", c, -1, -1, -1, msgcnt / 2, NULL);

  /* Fail Commit */
  rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_OffsetCommit, 5,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
                                    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);

  rd_kafka_commit(c, NULL, rd_false);

  expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
                   rd_true /*expect lost*/, 10 + 2);

  expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
                   rd_false /*expect lost*/, 20 + 2);

  TEST_SAY("Closing consumer\n");
  test_consumer_close(c);

  TEST_SAY("Destroying consumer\n");
  rd_kafka_destroy(c);

  TEST_SAY("Destroying mock cluster\n");
  test_mock_cluster_destroy(mcluster);
}

/**
 * @brief Test that the consumer is destroyed without segfault if
 *        it happens before first rebalance and there is no assignor
 *        state. See #4312
 */
static void s_no_segfault_before_first_rebalance(void) {
  rd_kafka_t *c;
  rd_kafka_conf_t *conf;
  rd_kafka_mock_cluster_t *mcluster;
  const char *topic;
  const char *bootstraps;

  SUB_TEST_QUICK();

  TEST_SAY("Creating mock cluster\n");
  mcluster = test_mock_cluster_new(1, &bootstraps);

  topic = test_mk_topic_name("0113_s", 1);

  test_conf_init(&conf, NULL, 60);
  test_conf_set(conf, "bootstrap.servers", bootstraps);
  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");

  TEST_SAY("Creating topic %s\n", topic);
  TEST_CALL_ERR__(rd_kafka_mock_topic_create(
      mcluster, topic, 2 /* partition_cnt */, 1 /* replication_factor */));

  c = test_create_consumer(topic, NULL, conf, NULL);

  /* Add a 1s delay to the SyncGroup response so next condition can happen. */
  rd_kafka_mock_broker_push_request_error_rtts(
      mcluster, 1 /*Broker 1*/, RD_KAFKAP_SyncGroup /*FetchRequest*/, 1,
      RD_KAFKA_RESP_ERR_NOT_COORDINATOR, 1000);

  test_consumer_subscribe(c, topic);

  /* Wait for initial rebalance 3000 ms (default) + 500 ms for processing
   * the JoinGroup response. Consumer close must come between the JoinGroup
   * response and the SyncGroup response, so that rkcg_assignor is set,
   * but rkcg_assignor_state isn't. */
  TEST_ASSERT(!test_consumer_poll_once(c, NULL, 3500), "poll should timeout");

  rd_kafka_consumer_close(c);

  rd_kafka_destroy(c);

  TEST_SAY("Destroying mock cluster\n");
  test_mock_cluster_destroy(mcluster);

  SUB_TEST_PASS();
}

/**
 * @brief Rebalance callback for the v_.. test below.
 */
static void v_rebalance_cb(rd_kafka_t *rk,
                           rd_kafka_resp_err_t err,
                           rd_kafka_topic_partition_list_t *parts,
                           void *opaque) {
  bool *auto_commitp = (bool *)opaque;

  TEST_SAY("%s: %s: %d partition(s)%s\n", rd_kafka_name(rk),
           rd_kafka_err2name(err), parts->cnt,
           rd_kafka_assignment_lost(rk) ? " - assignment lost" : "");

  test_print_partition_list(parts);

  if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
    test_consumer_incremental_assign("assign", rk, parts);
  } else {
    test_consumer_incremental_unassign("unassign", rk, parts);

    if (!*auto_commitp) {
      rd_kafka_resp_err_t commit_err;

      TEST_SAY("Attempting manual commit after unassign, in 2 seconds..\n");
      /* Sleep enough to have the generation-id bumped by rejoin. */
      rd_sleep(2);
      commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/);
      TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
                      commit_err == RD_KAFKA_RESP_ERR__DESTROY,
                  "%s: manual commit failed: %s", rd_kafka_name(rk),
                  rd_kafka_err2str(commit_err));
    }
  }
}

/**
 * @brief Commit callback for the v_.. test.
 */
static void v_commit_cb(rd_kafka_t *rk,
                        rd_kafka_resp_err_t err,
                        rd_kafka_topic_partition_list_t *offsets,
                        void *opaque) {
  TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk),
           offsets ? offsets->cnt : -1, rd_kafka_err2name(err));
  TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
                  err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */,
              "%s offset commit failed: %s", rd_kafka_name(rk),
              rd_kafka_err2str(err));
}


static void v_commit_during_rebalance(bool with_rebalance_cb,
                                      bool auto_commit) {
  rd_kafka_t *p, *c1, *c2;
  rd_kafka_conf_t *conf;
  const char *topic              = test_mk_topic_name("0113_v", 1);
  const int partition_cnt        = 6;
  const int msgcnt_per_partition = 100;
  const int msgcnt               = partition_cnt * msgcnt_per_partition;
  uint64_t testid;
  int i;


  SUB_TEST("With%s rebalance callback and %s-commit",
           with_rebalance_cb ? "" : "out", auto_commit ? "auto" : "manual");

  test_conf_init(&conf, NULL, 30);
  testid = test_id_generate();

  /*
   * Produce messages to topic
   */
  p = test_create_producer();

  test_create_topic(p, topic, partition_cnt, 1);

  for (i = 0; i < partition_cnt; i++) {
    test_produce_msgs2(p, topic, testid, i, i * msgcnt_per_partition,
                       msgcnt_per_partition, NULL, 0);
  }

  test_flush(p, -1);

  rd_kafka_destroy(p);


  test_conf_set(conf, "auto.offset.reset", "earliest");
  test_conf_set(conf, "enable.auto.commit", auto_commit ? "true" : "false");
  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
  rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb);
  rd_kafka_conf_set_opaque(conf, (void *)&auto_commit);

  TEST_SAY("Create and subscribe first consumer\n");
  c1 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
                            rd_kafka_conf_dup(conf), NULL);
  TEST_ASSERT(rd_kafka_opaque(c1) == (void *)&auto_commit,
              "c1 opaque mismatch");
  test_consumer_subscribe(c1, topic);

  /* Consume some messages so that we know we have an assignment
   * and something to commit. */
  test_consumer_poll("C1.PRECONSUME", c1, testid, -1, 0,
                     msgcnt / partition_cnt / 2, NULL);

  TEST_SAY("Create and subscribe second consumer\n");
  c2 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
                            conf, NULL);
  TEST_ASSERT(rd_kafka_opaque(c2) == (void *)&auto_commit,
              "c2 opaque mismatch");
  test_consumer_subscribe(c2, topic);

  /* Poll both consumers */
  for (i = 0; i < 10; i++) {
    test_consumer_poll_once(c1, NULL, 1000);
    test_consumer_poll_once(c2, NULL, 1000);
  }

  TEST_SAY("Closing consumers\n");
  test_consumer_close(c1);
  test_consumer_close(c2);

  rd_kafka_destroy(c1);
  rd_kafka_destroy(c2);

  SUB_TEST_PASS();
}


/**
 * @brief Verify that incremental rebalances retain stickyness.
 */
static void x_incremental_rebalances(void) {
#define _NUM_CONS 3
  rd_kafka_t *c[_NUM_CONS];
  rd_kafka_conf_t *conf;
  const char *topic = test_mk_topic_name("0113_x", 1);
  int i;

  SUB_TEST();
  test_conf_init(&conf, NULL, 60);

  test_create_topic(NULL, topic, 6, 1);

  test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
  for (i = 0; i < _NUM_CONS; i++) {
    char clientid[32];
    rd_snprintf(clientid, sizeof(clientid), "consumer%d", i);
    test_conf_set(conf, "client.id", clientid);

    c[i] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
  }
  rd_kafka_conf_destroy(conf);

  /* First consumer joins group */
  TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
  test_consumer_subscribe(c[0], topic);
  test_consumer_wait_assignment(c[0], rd_true /*poll*/);
  test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0,
                                  topic, 1, topic, 2, topic, 3, topic, 4, topic,
                                  5, NULL);


  /* Second consumer joins group */
  TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
  test_consumer_subscribe(c[1], topic);
  test_consumer_wait_assignment(c[1], rd_true /*poll*/);
  rd_sleep(3);
  test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3,
                                  topic, 4, topic, 5, NULL);
  test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 0,
                                  topic, 1, topic, 2, NULL);

  /* Third consumer joins group */
  TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
  test_consumer_subscribe(c[2], topic);
  test_consumer_wait_assignment(c[2], rd_true /*poll*/);
  rd_sleep(3);
  test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4,
                                  topic, 5, NULL);
  test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 1,
                                  topic, 2, NULL);
  test_consumer_verify_assignment(c[2], rd_false /*fail later*/, topic, 3,
                                  topic, 0, NULL);

  /* Raise any previously failed verify_assignment calls and fail the test */
  TEST_LATER_CHECK();

  for (i = 0; i < _NUM_CONS; i++)
    rd_kafka_destroy(c[i]);

  SUB_TEST_PASS();

#undef _NUM_CONS
}

/* Local tests not needing a cluster */
int main_0113_cooperative_rebalance_local(int argc, char **argv) {
  a_assign_rapid();
  p_lost_partitions_heartbeat_illegal_generation_test();
  q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/);
  q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/);
  r_lost_partitions_commit_illegal_generation_test_local();
  s_no_segfault_before_first_rebalance();
  return 0;
}

int main_0113_cooperative_rebalance(int argc, char **argv) {
  int i;

  a_assign_tests();
  b_subscribe_with_cb_test(true /*close consumer*/);
  b_subscribe_with_cb_test(false /*don't close consumer*/);
  c_subscribe_no_cb_test(true /*close consumer*/);

  if (test_quick) {
    Test::Say("Skipping tests >= c_ .. due to quick mode\n");
    return 0;
  }

  c_subscribe_no_cb_test(false /*don't close consumer*/);
  d_change_subscription_add_topic(true /*close consumer*/);
  d_change_subscription_add_topic(false /*don't close consumer*/);
  e_change_subscription_remove_topic(true /*close consumer*/);
  e_change_subscription_remove_topic(false /*don't close consumer*/);
  f_assign_call_cooperative();
  g_incremental_assign_call_eager();
  h_delete_topic();
  i_delete_topic_2();
  j_delete_topic_no_rb_callback();
  k_add_partition();
  l_unsubscribe();
  m_unsubscribe_2();
  n_wildcard();
  o_java_interop();
  for (i = 1; i <= 6; i++) /* iterate over 6 different test variations */
    s_subscribe_when_rebalancing(i);
  for (i = 1; i <= 2; i++)
    t_max_poll_interval_exceeded(i);
  /* Run all 2*3 variations of the u_.. test */
  for (i = 0; i < 3; i++) {
    u_multiple_subscription_changes(true /*with rebalance_cb*/, i);
    u_multiple_subscription_changes(false /*without rebalance_cb*/, i);
  }
  v_commit_during_rebalance(true /*with rebalance callback*/,
                            true /*auto commit*/);
  v_commit_during_rebalance(false /*without rebalance callback*/,
                            true /*auto commit*/);
  v_commit_during_rebalance(true /*with rebalance callback*/,
                            false /*manual commit*/);
  x_incremental_rebalances();

  return 0;
}
}
