/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012,2013 Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_PROTO_H_
#define _RDKAFKA_PROTO_H_


#include "rdendian.h"
#include "rdvarint.h"

/* Protocol defines */
#include "rdkafka_protocol.h"



/** Default generic retry count for failed requests.
 *  This may be overriden for specific request types. */
#define RD_KAFKA_REQUEST_DEFAULT_RETRIES 2

/** Max (practically infinite) retry count */
#define RD_KAFKA_REQUEST_MAX_RETRIES  INT_MAX

/** Do not retry request */
#define RD_KAFKA_REQUEST_NO_RETRIES 0


/**
 * Request types
 */
struct rd_kafkap_reqhdr {
        int32_t  Size;
        int16_t  ApiKey;
        int16_t  ApiVersion;
        int32_t  CorrId;
        /* ClientId follows */
};

#define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
#define RD_KAFKAP_RESHDR_SIZE (4+4)

/**
 * Response header
 */
struct rd_kafkap_reshdr {
	int32_t  Size;
	int32_t  CorrId;
};


/**
 * Request type v1 (flexible version)
 *
 * i32            Size
 * i16            ApiKey
 * i16            ApiVersion
 * i32            CorrId
 * string         ClientId   (2-byte encoding, not compact string)
 * uvarint        Tags
 * <Request payload>
 * uvarint        EndTags
 *
 * Any struct-type (non-primitive or array type) field in the request payload
 * must also have a trailing tags list, this goes for structs in arrays as well.
 */

/**
 * @brief Protocol request type (ApiKey) to name/string.
 *
 * Generate updates to this list with generate_proto.sh.
 */
static RD_UNUSED
const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
        static const char *names[] = {
                [RD_KAFKAP_Produce] = "Produce",
                [RD_KAFKAP_Fetch] = "Fetch",
                [RD_KAFKAP_Offset] = "Offset",
                [RD_KAFKAP_Metadata] = "Metadata",
                [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
                [RD_KAFKAP_StopReplica] = "StopReplica",
                [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata",
                [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown",
                [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
                [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
                [RD_KAFKAP_FindCoordinator] = "FindCoordinator",
                [RD_KAFKAP_JoinGroup] = "JoinGroup",
                [RD_KAFKAP_Heartbeat] = "Heartbeat",
                [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
                [RD_KAFKAP_SyncGroup] = "SyncGroup",
                [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
                [RD_KAFKAP_ListGroups] = "ListGroups",
                [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
                [RD_KAFKAP_ApiVersion] = "ApiVersion",
                [RD_KAFKAP_CreateTopics] = "CreateTopics",
                [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
                [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
                [RD_KAFKAP_InitProducerId] = "InitProducerId",
                [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
                [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
                [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
                [RD_KAFKAP_EndTxn] = "EndTxn",
                [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
                [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
                [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
                [RD_KAFKAP_CreateAcls] = "CreateAcls",
                [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
                [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
                [RD_KAFKAP_AlterConfigs] = "AlterConfigs",
                [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs",
                [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs",
                [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate",
                [RD_KAFKAP_CreatePartitions] = "CreatePartitions",
                [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken",
                [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken",
                [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken",
                [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken",
                [RD_KAFKAP_DeleteGroups] = "DeleteGroups",
                [RD_KAFKAP_ElectLeaders] = "ElectLeadersRequest",
                [RD_KAFKAP_IncrementalAlterConfigs] =
                "IncrementalAlterConfigsRequest",
                [RD_KAFKAP_AlterPartitionReassignments] =
                "AlterPartitionReassignmentsRequest",
                [RD_KAFKAP_ListPartitionReassignments] =
                "ListPartitionReassignmentsRequest",
                [RD_KAFKAP_OffsetDelete] = "OffsetDeleteRequest",
                [RD_KAFKAP_DescribeClientQuotas] =
                "DescribeClientQuotasRequest",
                [RD_KAFKAP_AlterClientQuotas] =
                "AlterClientQuotasRequest",
                [RD_KAFKAP_DescribeUserScramCredentials] =
                "DescribeUserScramCredentialsRequest",
                [RD_KAFKAP_AlterUserScramCredentials] =
                "AlterUserScramCredentialsRequest",
                [RD_KAFKAP_Vote] = "VoteRequest",
                [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest",
                [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest",
                [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest",
                [RD_KAFKAP_AlterIsr] = "AlterIsrRequest",
                [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest",
                [RD_KAFKAP_Envelope] = "EnvelopeRequest",
        };
        static RD_TLS char ret[64];

        if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) ||
            !names[ApiKey]) {
                rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
                return ret;
        }

        return names[ApiKey];
}








/**
 * @brief ApiKey version support tuple.
 */
struct rd_kafka_ApiVersion {
	int16_t ApiKey;
	int16_t MinVer;
	int16_t MaxVer;
};

/**
 * @brief ApiVersion.ApiKey comparator.
 */
static RD_UNUSED
int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
        const struct rd_kafka_ApiVersion *a =
                (const struct rd_kafka_ApiVersion *)_a;
        const struct rd_kafka_ApiVersion *b =
                (const struct rd_kafka_ApiVersion *)_b;
        return RD_CMP(a->ApiKey, b->ApiKey);
}



typedef enum {
        RD_KAFKA_READ_UNCOMMITTED = 0,
        RD_KAFKA_READ_COMMITTED = 1
} rd_kafka_isolation_level_t;



#define RD_KAFKA_CTRL_MSG_ABORT 0
#define RD_KAFKA_CTRL_MSG_COMMIT 1


/**
 * @enum Coordinator type, used with FindCoordinatorRequest
 */
typedef enum rd_kafka_coordtype_t {
        RD_KAFKA_COORD_GROUP = 0,
        RD_KAFKA_COORD_TXN = 1
} rd_kafka_coordtype_t;


/**
 *
 * Kafka protocol string representation prefixed with a convenience header
 *
 * Serialized format:
 *  { uint16, data.. }
 *
 */
typedef struct rd_kafkap_str_s {
	/* convenience header (aligned access, host endian) */
	int         len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
	const char *str; /* points into data[] or other memory,
			  * not NULL-terminated */
} rd_kafkap_str_t;


#define RD_KAFKAP_STR_LEN_NULL -1
#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)

/* Returns the length of the string of a kafka protocol string representation */
#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)

/* Returns the actual size of a kafka protocol string representation. */
#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)


/** @returns true if kstr is pre-serialized through .._new() */
#define RD_KAFKAP_STR_IS_SERIALIZED(kstr)                               \
        (((const char *)((kstr)+1))+2 == (const char *)((kstr)->str))

/* Serialized Kafka string: only works for _new() kstrs.
 * Check with RD_KAFKAP_STR_IS_SERIALIZED */
#define RD_KAFKAP_STR_SER(kstr)  ((kstr)+1)

/* Macro suitable for "%.*s" printing. */
#define RD_KAFKAP_STR_PR(kstr)						\
	(int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
		(kstr)->str

/* strndupa() a Kafka string */
#define RD_KAFKAP_STR_DUPA(destptr,kstr) \
	rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))

/* strndup() a Kafka string */
#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))

#define RD_KAFKAP_STR_INITIALIZER { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL }

/**
 * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
 */
static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
	rd_free(kstr);
}



/**
 * Allocate a new Kafka string and make a copy of 'str'.
 * If 'len' is -1 the length will be calculated.
 * Supports Kafka NULL strings.
 * Nul-terminates the string, but the trailing \0 is not part of
 * the serialized string.
 */
static RD_INLINE RD_UNUSED
rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
	rd_kafkap_str_t *kstr;
	int16_t klen;

	if (!str)
		len = RD_KAFKAP_STR_LEN_NULL;
	else if (len == -1)
		len = (int)strlen(str);

	kstr = (rd_kafkap_str_t *)rd_malloc(sizeof(*kstr) + 2 +
			(len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
	kstr->len = len;

	/* Serialised format: 16-bit string length */
	klen = htobe16(len);
	memcpy(kstr+1, &klen, 2);

	/* Pre-Serialised format: non null-terminated string */
	if (len == RD_KAFKAP_STR_LEN_NULL)
		kstr->str = NULL;
	else {
		kstr->str = ((const char *)(kstr+1))+2;
		memcpy((void *)kstr->str, str, len);
		((char *)kstr->str)[len] = '\0';
	}

	return kstr;
}


/**
 * Makes a copy of `src`. The copy will be fully allocated and should
 * be freed with rd_kafka_pstr_destroy()
 */
static RD_INLINE RD_UNUSED
rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
        return rd_kafkap_str_new(src->str, src->len);
}

static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
						 const rd_kafkap_str_t *b) {
	int minlen = RD_MIN(a->len, b->len);
	int r = memcmp(a->str, b->str, minlen);
	if (r)
		return r;
	else
                return RD_CMP(a->len, b->len);
}

static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
						     const char *str) {
	int len = (int)strlen(str);
	int minlen = RD_MIN(a->len, len);
	int r = memcmp(a->str, str, minlen);
	if (r)
		return r;
	else
                return RD_CMP(a->len, len);
}

static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
						      const rd_kafkap_str_t *b){
	int len = (int)strlen(str);
	int minlen = RD_MIN(b->len, len);
	int r = memcmp(str, b->str, minlen);
	if (r)
		return r;
	else
                return RD_CMP(len, b->len);
}



/**
 *
 * Kafka protocol bytes array representation prefixed with a convenience header
 *
 * Serialized format:
 *  { uint32, data.. }
 *
 */
typedef struct rd_kafkap_bytes_s {
	/* convenience header (aligned access, host endian) */
	int32_t     len;   /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
	const void *data;  /* points just past the struct, or other memory,
			    * not NULL-terminated */
	const char _data[1]; /* Bytes following struct when new()ed */
} rd_kafkap_bytes_t;


#define RD_KAFKAP_BYTES_LEN_NULL -1
#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
	((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)

/* Returns the length of the bytes of a kafka protocol bytes representation */
#define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)

/* Returns the actual size of a kafka protocol bytes representation. */
#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)

/** @returns true if kbyes is pre-serialized through .._new() */
#define RD_KAFKAP_BYTES_IS_SERIALIZED(kstr)                             \
        (((const char *)((kbytes)+1))+2 == (const char *)((kbytes)->data))

/* Serialized Kafka bytes: only works for _new() kbytes */
#define RD_KAFKAP_BYTES_SER(kbytes)  ((kbytes)+1)


/**
 * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
 */
static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
	rd_free(kbytes);
}


/**
 * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
 * If \p len > 0 but \p bytes is NULL no copying is performed by
 * the bytes structure will be allocated to fit \p size bytes.
 *
 * Supports:
 *  - Kafka NULL bytes (bytes==NULL,len==0),
 *  - Empty bytes (bytes!=NULL,len==0)
 *  - Copy data (bytes!=NULL,len>0)
 *  - No-copy, just alloc (bytes==NULL,len>0)
 */
static RD_INLINE RD_UNUSED
rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
	rd_kafkap_bytes_t *kbytes;
	int32_t klen;

	if (!bytes && !len)
		len = RD_KAFKAP_BYTES_LEN_NULL;

	kbytes = (rd_kafkap_bytes_t *)rd_malloc(sizeof(*kbytes) + 4 +
			(len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
	kbytes->len = len;

	klen = htobe32(len);
	memcpy((void *)(kbytes+1), &klen, 4);

	if (len == RD_KAFKAP_BYTES_LEN_NULL)
		kbytes->data = NULL;
	else {
		kbytes->data = ((const char *)(kbytes+1))+4;
                if (bytes)
                        memcpy((void *)kbytes->data, bytes, len);
	}

	return kbytes;
}


/**
 * Makes a copy of `src`. The copy will be fully allocated and should
 * be freed with rd_kafkap_bytes_destroy()
 */
static RD_INLINE RD_UNUSED
rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
        return rd_kafkap_bytes_new(
                (const char *)src->data, src->len);
}


static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
						   const rd_kafkap_bytes_t *b) {
	int minlen = RD_MIN(a->len, b->len);
	int r = memcmp(a->data, b->data, minlen);
	if (r)
		return r;
	else
                return RD_CMP(a->len, b->len);
}

static RD_INLINE RD_UNUSED
int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
			      const char *data, int len) {
	int minlen = RD_MIN(a->len, len);
	int r = memcmp(a->data, data, minlen);
	if (r)
		return r;
	else
                return RD_CMP(a->len, len);
}




typedef struct rd_kafka_buf_s rd_kafka_buf_t;


#define RD_KAFKA_NODENAME_SIZE  256




/**
 * @brief Message overheads (worst-case)
 */

/**
 * MsgVersion v0..v1
 */
/* Offset + MessageSize */
#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
/* CRC + Magic + Attr + KeyLen + ValueLen */
#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE    (4+1+1+4+4)
/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE    (4+1+1+8+4+4)
/* Maximum per-message overhead */
#define RD_KAFKAP_MESSAGE_V0_OVERHEAD                                   \
        (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
#define RD_KAFKAP_MESSAGE_V1_OVERHEAD                                   \
        (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)

/**
 * MsgVersion v2
 */
#define RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD                              \
        (                                                              \
        /* Length (varint) */                                          \
        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
        /* Attributes */                                               \
        1 +                                                            \
        /* TimestampDelta (varint) */                                  \
        RD_UVARINT_ENC_SIZEOF(int64_t) +                               \
        /* OffsetDelta (varint) */                                     \
        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
        /* KeyLen (varint) */                                          \
        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
        /* ValueLen (varint) */                                        \
        RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
        /* HeaderCnt (varint): */                                      \
        RD_UVARINT_ENC_SIZEOF(int32_t)                                 \
        )

#define RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD                              \
        (                                                              \
        /* Length (varint) */                                          \
        RD_UVARINT_ENC_SIZE_0() +                                      \
        /* Attributes */                                               \
        1 +                                                            \
        /* TimestampDelta (varint) */                                  \
        RD_UVARINT_ENC_SIZE_0() +                                      \
        /* OffsetDelta (varint) */                                     \
        RD_UVARINT_ENC_SIZE_0() +                                      \
        /* KeyLen (varint) */                                          \
        RD_UVARINT_ENC_SIZE_0() +                                      \
        /* ValueLen (varint) */                                        \
        RD_UVARINT_ENC_SIZE_0() +                                      \
        /* HeaderCnt (varint): */                                      \
        RD_UVARINT_ENC_SIZE_0()                                        \
        )


/**
 * @brief MessageSets are not explicitly versioned but depends on the
 *        Produce/Fetch API version and the encompassed Message versions.
 *        We use the Message version (MsgVersion, aka MagicByte) to describe
 *        the MessageSet version, that is, MsgVersion <= 1 uses the old
 *        MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
 */

/* Old MessageSet header: none */
#define RD_KAFKAP_MSGSET_V0_SIZE                0

/* MessageSet v2 header */
#define RD_KAFKAP_MSGSET_V2_SIZE                (8+4+4+1+4+2+4+8+8+8+2+4+4)

/* Byte offsets for MessageSet fields */
#define RD_KAFKAP_MSGSET_V2_OF_Length           (8)
#define RD_KAFKAP_MSGSET_V2_OF_CRC              (8+4+4+1)
#define RD_KAFKAP_MSGSET_V2_OF_Attributes       (8+4+4+1+4)
#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta  (8+4+4+1+4+2)
#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp    (8+4+4+1+4+2+4)
#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp     (8+4+4+1+4+2+4+8)
#define RD_KAFKAP_MSGSET_V2_OF_BaseSequence     (8+4+4+1+4+2+4+8+8+8+2)
#define RD_KAFKAP_MSGSET_V2_OF_RecordCount      (8+4+4+1+4+2+4+8+8+8+2+4)




/**
 * @name Producer ID and Epoch for the Idempotent Producer
 * @{
 *
 */

/**
 * @brief Producer ID and Epoch
 */
typedef struct rd_kafka_pid_s {
        int64_t id;     /**< Producer Id */
        int16_t epoch;  /**< Producer Epoch */
} rd_kafka_pid_t;

#define RD_KAFKA_PID_INITIALIZER {-1,-1}

/**
 * @returns true if \p PID is valid
 */
#define rd_kafka_pid_valid(PID) ((PID).id != -1)

/**
 * @brief Check two pids for equality
 */
static RD_UNUSED RD_INLINE int rd_kafka_pid_eq (const rd_kafka_pid_t a,
                                                const rd_kafka_pid_t b) {
        return a.id == b.id && a.epoch == b.epoch;
}

/**
 * @brief Pid+epoch comparator
 */
static RD_UNUSED int rd_kafka_pid_cmp (const void *_a, const void *_b) {
        const rd_kafka_pid_t *a = _a, *b = _b;

        if (a->id < b->id)
                return -1;
        else if (a->id > b->id)
                return 1;

        return (int)a->epoch - (int)b->epoch;
}


/**
 * @brief Pid (not epoch) comparator
 */
static RD_UNUSED int rd_kafka_pid_cmp_pid (const void *_a, const void *_b) {
        const rd_kafka_pid_t *a = _a, *b = _b;

        if (a->id < b->id)
                return -1;
        else if (a->id > b->id)
                return 1;

        return 0;
}


/**
 * @returns the string representation of a PID in a thread-safe
 *          static buffer.
 */
static RD_UNUSED const char *
rd_kafka_pid2str (const rd_kafka_pid_t pid) {
        static RD_TLS char buf[2][64];
        static RD_TLS int i;

        if (!rd_kafka_pid_valid(pid))
                return "PID{Invalid}";

        i = (i + 1) % 2;

        rd_snprintf(buf[i], sizeof(buf[i]),
                    "PID{Id:%" PRId64",Epoch:%hd}", pid.id, pid.epoch);

        return buf[i];
}

/**
 * @brief Reset the PID to invalid/init state
 */
static RD_UNUSED RD_INLINE void rd_kafka_pid_reset (rd_kafka_pid_t *pid) {
        pid->id = -1;
        pid->epoch = -1;
}


/**
 * @brief Bump the epoch of a valid PID
 */
static RD_UNUSED RD_INLINE rd_kafka_pid_t
rd_kafka_pid_bump (const rd_kafka_pid_t old) {
        rd_kafka_pid_t new_pid = {
                old.id,
                (int16_t)(((int)old.epoch + 1) & (int)INT16_MAX) };
        return new_pid;
}

/**@}*/


#endif /* _RDKAFKA_PROTO_H_ */
