/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <iostream>
#include "testcpp.h"


static RdKafka::Producer *producer;
static RdKafka::KafkaConsumer *consumer;
static std::string topic;

static void assert_all_headers_match(RdKafka::Headers *actual,
                                     const RdKafka::Headers *expected) {
  if (!actual) {
    Test::Fail("Expected RdKafka::Message to contain headers");
  }
  if (actual->size() != expected->size()) {
    Test::Fail(tostr() << "Expected headers length to equal "
               << expected->size() << " instead equals " << actual->size() << "\n");
  }

  std::vector<RdKafka::Headers::Header> actual_headers = actual->get_all();
  std::vector<RdKafka::Headers::Header> expected_headers = expected->get_all();
  Test::Say(3, tostr() << "Header size " << actual_headers.size() << "\n");
  for(size_t i = 0; i < actual_headers.size(); i++) {
    RdKafka::Headers::Header actual_header = actual_headers[i];
    const RdKafka::Headers::Header expected_header = expected_headers[i];
    std::string actual_key = actual_header.key();
    std::string actual_value = std::string(
      actual_header.value_string(),
      actual_header.value_size()
      );
    std::string expected_key = expected_header.key();
    std::string expected_value = std::string(
      actual_header.value_string(),
      expected_header.value_size()
      );

    Test::Say(3,
              tostr() <<
              "Expected Key " << expected_key <<
              ", Expected val " << expected_value <<
              ", Actual key " << actual_key <<
              ", Actual val " << actual_value << "\n");

    if (actual_key != expected_key) {
      Test::Fail(tostr() << "Header key does not match, expected '"
                 << actual_key << "' but got '" << expected_key << "'\n");
    }
    if (actual_value != expected_value) {
      Test::Fail(tostr() << "Header value does not match, expected '"
                 << actual_value << "' but got '" << expected_value << "'\n");
    }
  }
}

static void test_headers (RdKafka::Headers *produce_headers,
                          const RdKafka::Headers *compare_headers) {

  RdKafka::ErrorCode err;

  err = producer->produce(topic, 0,
                          RdKafka::Producer::RK_MSG_COPY,
                          (void *)"message", 7,
                          (void *)"key", 3, 0, produce_headers, NULL);
  if (err)
    Test::Fail("produce() failed: " + RdKafka::err2str(err));

  producer->flush(tmout_multip(10*1000));

  if (producer->outq_len() > 0)
    Test::Fail(tostr() << "Expected producer to be flushed, " <<
               producer->outq_len() << " messages remain");

  int cnt = 0;
  bool running = true;

  while (running) {
    RdKafka::Message *msg = consumer->consume(10*1000);

    if (msg->err() == RdKafka::ERR_NO_ERROR) {
      cnt++;
      RdKafka::Headers *headers = msg->headers();
      if (compare_headers->size() > 0) {
        assert_all_headers_match(headers, compare_headers);
      } else {
        if (headers != 0) {
          Test::Fail("Expected headers to return a NULL pointer");
        }
      }
      running = false;
    } else {
      Test::Fail("consume() failed: " + msg->errstr());
    }
    delete msg;
  }
}

static void test_headers (int num_hdrs) {
  Test::Say(tostr() << "Test " << num_hdrs <<
            " headers in consumed message.\n");
  RdKafka::Headers *produce_headers = RdKafka::Headers::create();
  RdKafka::Headers *compare_headers = RdKafka::Headers::create();
  for (int i = 0; i < num_hdrs; ++i) {
    std::stringstream key_s;
    key_s << "header_" << i;
    std::string key = key_s.str();

    if ((i % 4) == 0) {
      /* NULL value */
      produce_headers->add(key, NULL, 0);
      compare_headers->add(key, NULL, 0);
    } else if ((i % 5) == 0) {
      /* Empty value, use different methods for produce
       * and compare to make sure they behave the same way. */
      std::string val = "";
      produce_headers->add(key, val);
      compare_headers->add(key, "", 0);
    } else if ((i % 6) == 0) {
      /* Binary value (no nul-term) */
      produce_headers->add(key, "binary", 6);
      compare_headers->add(key, "binary"); /* auto-nul-terminated */
    } else {
      /* Standard string value */
      std::stringstream val_s;
      val_s << "value_" << i;
      std::string val = val_s.str();
      produce_headers->add(key, val);
      compare_headers->add(key, val);
    }
  }
  test_headers(produce_headers, compare_headers);
  delete compare_headers;
}

static void  test_duplicate_keys () {
  Test::Say("Test multiple headers with duplicate keys.\n");
  int num_hdrs = 4;
  RdKafka::Headers *produce_headers = RdKafka::Headers::create();
  RdKafka::Headers *compare_headers = RdKafka::Headers::create();
  for (int i = 0; i < num_hdrs; ++i) {
    std::string dup_key = "dup_key";
    std::stringstream val_s;
    val_s << "value_" << i;
    std::string val = val_s.str();
    produce_headers->add(dup_key, val);
    compare_headers->add(dup_key, val);
  }
  test_headers(produce_headers, compare_headers);
  delete compare_headers;
}

static void test_remove_after_add () {
  Test::Say("Test removing after adding headers.\n");
  RdKafka::Headers *headers = RdKafka::Headers::create();

  // Add one unique key
  std::string key_one = "key1";
  std::string val_one = "val_one";
  headers->add(key_one, val_one);

  // Add a second unique key
  std::string key_two = "key2";
  std::string val_two = "val_two";
  headers->add(key_two, val_one);

  // Assert header length is 2
  size_t expected_size = 2;
  if (headers->size() != expected_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_size << ", instead got "
                       << headers->size() << "\n");
  }

  // Remove key_one and assert headers == 1
  headers->remove(key_one);
  size_t expected_remove_size = 1;
  if (headers->size() != expected_remove_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_remove_size << ", instead got "
                       << headers->size() << "\n");
  }

  delete headers;
}

static void test_remove_all_duplicate_keys () {
  Test::Say("Test removing duplicate keys removes all headers.\n");
  RdKafka::Headers *headers = RdKafka::Headers::create();

  // Add one unique key
  std::string key_one = "key1";
  std::string val_one = "val_one";
  headers->add(key_one, val_one);

  // Add 2 duplicate keys
  std::string dup_key = "dup_key";
  std::string val_two = "val_two";
  headers->add(dup_key, val_one);
  headers->add(dup_key, val_two);

  // Assert header length is 3
  size_t expected_size = 3;
  if (headers->size() != expected_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_size << ", instead got "
                       << headers->size() << "\n");
  }

  // Remove key_one and assert headers == 1
  headers->remove(dup_key);
  size_t expected_size_remove = 1;
  if (headers->size() != expected_size_remove) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_size_remove << ", instead got "
                       << headers->size() << "\n");
  }

  delete headers;
}

static void test_get_last_gives_last_added_val () {
  Test::Say("Test get_last returns the last added value of duplicate keys.\n");
  RdKafka::Headers *headers = RdKafka::Headers::create();

  // Add two duplicate keys
  std::string dup_key = "dup_key";
  std::string val_one = "val_one";
  std::string val_two = "val_two";
  std::string val_three = "val_three";
  headers->add(dup_key, val_one);
  headers->add(dup_key, val_two);
  headers->add(dup_key, val_three);

  // Assert header length is 3
  size_t expected_size = 3;
  if (headers->size() != expected_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_size << ", instead got "
                       << headers->size() << "\n");
  }

  // Get last of duplicate key and assert it equals val_two
  RdKafka::Headers::Header last = headers->get_last(dup_key);
  std::string value = std::string(last.value_string());
  if (value != val_three) {
    Test::Fail(tostr() << "Expected get_last to return " << val_two
                       << " as the value of the header instead got "
                       << value << "\n");
  }

  delete headers;
}

static void test_get_of_key_returns_all () {
  Test::Say("Test get returns all the headers of a duplicate key.\n");
  RdKafka::Headers *headers = RdKafka::Headers::create();

  // Add two duplicate keys
  std::string unique_key = "unique";
  std::string dup_key = "dup_key";
  std::string val_one = "val_one";
  std::string val_two = "val_two";
  std::string val_three = "val_three";
  headers->add(unique_key, val_one);
  headers->add(dup_key, val_one);
  headers->add(dup_key, val_two);
  headers->add(dup_key, val_three);

  // Assert header length is 4
  size_t expected_size = 4;
  if (headers->size() != expected_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_size << ", instead got "
                       << headers->size() << "\n");
  }

  // Get all of the duplicate key
  std::vector<RdKafka::Headers::Header> get = headers->get(dup_key);
  size_t expected_get_size = 3;
  if (get.size() != expected_get_size) {
    Test::Fail(tostr() << "Expected header->size() to equal "
                       << expected_get_size << ", instead got "
                       << headers->size() << "\n");
  }

  delete headers;
}

static void test_failed_produce () {

  RdKafka::Headers *headers = RdKafka::Headers::create();
  headers->add("my", "header");

  RdKafka::ErrorCode err;

  err = producer->produce(topic, 999 /* invalid partition */,
                          RdKafka::Producer::RK_MSG_COPY,
                          (void *)"message", 7,
                          (void *)"key", 3, 0, headers, NULL);
  if (!err)
    Test::Fail("Expected produce() to fail");

  delete headers;
}

static void test_assignment_op () {
  Test::Say("Test Header assignment operator\n");

  RdKafka::Headers *headers = RdKafka::Headers::create();

  headers->add("abc", "123");
  headers->add("def", "456");

  RdKafka::Headers::Header h = headers->get_last("abc");
  h = headers->get_last("def");
  RdKafka::Headers::Header h2 = h;
  h = headers->get_last("nope");
  RdKafka::Headers::Header h3 = h;
  h = headers->get_last("def");

  delete headers;
}


extern "C" {
  int main_0085_headers (int argc, char **argv) {
    topic = Test::mk_topic_name("0085-headers", 1);

    RdKafka::Conf *conf;
    std::string errstr;

    Test::conf_init(&conf, NULL, 0);

    RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
    if (!p)
      Test::Fail("Failed to create Producer: " + errstr);

    Test::conf_set(conf, "group.id", topic);

    RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!c)
      Test::Fail("Failed to create KafkaConsumer: " + errstr);

    delete conf;

    std::vector<RdKafka::TopicPartition*> parts;
    parts.push_back(RdKafka::TopicPartition::create(topic, 0,
                                                    RdKafka::Topic::
                                                    OFFSET_BEGINNING));
    RdKafka::ErrorCode err = c->assign(parts);
    if (err != RdKafka::ERR_NO_ERROR)
      Test::Fail("assign() failed: " + RdKafka::err2str(err));
    RdKafka::TopicPartition::destroy(parts);

    producer = p;
    consumer = c;

    test_headers(0);
    test_headers(1);
    test_headers(261);
    test_duplicate_keys();
    test_remove_after_add();
    test_remove_all_duplicate_keys();
    test_get_last_gives_last_added_val();
    test_get_of_key_returns_all();
    test_failed_produce();
    test_assignment_op();

    c->close();
    delete c;
    delete p;

    return 0;
  }
}
