import { Readable, ReadableOptions, Writable, WritableOptions } from 'stream';
import { EventEmitter } from 'events';
import {
    GlobalConfig,
    TopicConfig,
    ConsumerGlobalConfig,
    ConsumerTopicConfig,
    ProducerGlobalConfig,
    ProducerTopicConfig,
} from './config';

export * from './config';
export * from './errors';

export interface LibrdKafkaError {
    message: string;
    code: number;
    errno: number;
    origin: string;
    stack?: string;
    isFatal?: boolean;
    isRetriable?: boolean;
    isTxnRequiresAbort?: boolean;
}

export interface ReadyInfo {
    name: string;
}

export interface ClientMetrics {
    connectionOpened: number;
}

export interface MetadataOptions {
    topic?: string;
    allTopics?: boolean;
    timeout?: number;
}

export interface BrokerMetadata {
    id: number;
    host: string;
    port: number;
}

export interface PartitionMetadata {
    id: number;
    leader: number;
    replicas: number[];
    isrs: number[];
}

export interface TopicMetadata {
    name: string;
    partitions: PartitionMetadata[];
}

export interface Metadata {
    orig_broker_id: number;
    orig_broker_name: string;
    topics: TopicMetadata[];
    brokers: BrokerMetadata[];
}

export interface WatermarkOffsets{
    lowOffset: number;
    highOffset: number;
}

export interface TopicPartition {
    topic: string;
    partition: number;
}

export interface TopicPartitionOffset extends TopicPartition{
    offset: number;
}

export type TopicPartitionTime = TopicPartitionOffset;

export type EofEvent = TopicPartitionOffset;

export type Assignment = TopicPartition | TopicPartitionOffset;

export interface DeliveryReport extends TopicPartitionOffset {
    value?: MessageValue;
    size: number;
    key?: MessageKey;
    timestamp?: number;
    opaque?: any;
}

export type NumberNullUndefined = number | null | undefined;

export type MessageKey = Buffer | string | null | undefined;
export type MessageHeader = { [key: string]: string | Buffer };
export type MessageValue = Buffer | null;
export type SubscribeTopic = string | RegExp;
export type SubscribeTopicList = SubscribeTopic[];

export interface Message extends TopicPartitionOffset {
    value: MessageValue;
    size: number;
    topic: string;
    key?: MessageKey;
    timestamp?: number;
    headers?: MessageHeader[];
    opaque?: any;
}

export interface ReadStreamOptions extends ReadableOptions {
    topics: SubscribeTopicList | SubscribeTopic | ((metadata: Metadata) => SubscribeTopicList);
    waitInterval?: number;
    fetchSize?: number;
    objectMode?: boolean;
    highWaterMark?: number;
    autoClose?: boolean;
    streamAsBatch?: boolean;
    connectOptions?: any;
}

export interface WriteStreamOptions extends WritableOptions {
    encoding?: string;
    objectMode?: boolean;
    topic?: string;
    autoClose?: boolean;
    pollInterval?: number;
    connectOptions?: any;
}

export interface ProducerStream extends Writable {
    producer: Producer;
    connect(metadataOptions?: MetadataOptions): void;
    close(cb?: () => void): void;
}

export interface ConsumerStream extends Readable {
    consumer: KafkaConsumer;
    connect(options: ConsumerGlobalConfig): void;
    close(cb?: () => void): void;
}

type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
type KafkaProducerEvents = 'delivery-report' | KafkaClientEvents;

type EventListenerMap = {
    // ### Client
    // connectivity events
    'disconnected': (metrics: ClientMetrics) => void,
    'ready': (info: ReadyInfo, metadata: Metadata) => void,
    'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
    // event messages
    'event.error': (error: LibrdKafkaError) => void,
    'event.stats': (eventData: any) => void,
    'event.log': (eventData: any) => void,
    'event.event': (eventData: any) => void,
    'event.throttle': (eventData: any) => void,
    // ### Consumer only
    // domain events
    'data': (arg: Message) => void,
    'partition.eof': (arg: EofEvent) => void,
    'rebalance': (err: LibrdKafkaError, assignments: TopicPartition[]) => void,
    'rebalance.error': (err: Error) => void,
    // connectivity events
    'subscribed': (topics: SubscribeTopicList) => void,
    'unsubscribe': () => void,
    'unsubscribed': () => void,
    // offsets
    'offset.commit': (error: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void,
    // ### Producer only
    // delivery
    'delivery-report': (error: LibrdKafkaError, report: DeliveryReport) => void,
}

type EventListener<K extends string> = K extends keyof EventListenerMap ? EventListenerMap[K] : never;

export abstract class Client<Events extends string> extends EventEmitter {
    constructor(globalConf: GlobalConfig, SubClientType: any, topicConf: TopicConfig);

    connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;

    getClient(): any;

    connectedTime(): number;

    getLastError(): LibrdKafkaError;

    disconnect(cb?: (err: any, data: ClientMetrics) => any): this;
    disconnect(timeout: number, cb?: (err: any, data: ClientMetrics) => any): this;

    isConnected(): boolean;

    getMetadata(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): any;

    queryWatermarkOffsets(topic: string, partition: number, timeout: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
    queryWatermarkOffsets(topic: string, partition: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;

    on<E extends Events>(event: E, listener: EventListener<E>): this;
    once<E extends Events>(event: E, listener: EventListener<E>): this;
}

export class KafkaConsumer extends Client<KafkaConsumerEvents> {
    constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);

    assign(assignments: Assignment[]): this;

    assignments(): Assignment[];

    commit(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
    commit(): this;

    commitMessage(msg: TopicPartitionOffset): this;

    commitMessageSync(msg: TopicPartitionOffset): this;

    commitSync(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;

    committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
    committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

    consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
    consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
    consume(): void;

    getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;

    offsetsStore(topicPartitions: TopicPartitionOffset[]): any;

    pause(topicPartitions: TopicPartition[]): any;

    position(toppars?: TopicPartition[]): TopicPartitionOffset[];

    resume(topicPartitions: TopicPartition[]): any;

    seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;

    setDefaultConsumeTimeout(timeoutMs: number): void;

    setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;

    subscribe(topics: SubscribeTopicList): this;

    subscription(): string[];

    unassign(): this;

    unsubscribe(): this;

    offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
    offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;

    static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
}

export class Producer extends Client<KafkaProducerEvents> {
    constructor(conf: ProducerGlobalConfig, topicConf?: ProducerTopicConfig);

    flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;

    poll(): this;

    produce(topic: string, partition: NumberNullUndefined, message: MessageValue, key?: MessageKey, timestamp?: NumberNullUndefined, opaque?: any, headers?: MessageHeader[]): any;

    setPollInterval(interval: number): this;

    static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;

    initTransactions(cb: (err: LibrdKafkaError) => void): void;
    initTransactions(timeout: number, cb: (err: LibrdKafkaError) => void): void;
    beginTransaction(cb: (err: LibrdKafkaError) => void): void;
    commitTransaction(cb: (err: LibrdKafkaError) => void): void;
    commitTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
    abortTransaction(cb: (err: LibrdKafkaError) => void): void;
    abortTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
    sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, cb: (err: LibrdKafkaError) => void): void;
    sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout: number, cb: (err: LibrdKafkaError) => void): void;
}

export class HighLevelProducer extends Producer {
  produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
  produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;

  setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
  setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
  setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
  setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
}

export const features: string[];

export const librdkafkaVersion: string;

export function createReadStream(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;

export function createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;

export interface NewTopic {
    topic: string;
    num_partitions: number;
    replication_factor: number;
    config?: {
        'cleanup.policy'?: 'delete' | 'compact' | 'delete,compact' | 'compact,delete';
        'compression.type'?: 'gzip' | 'snappy' | 'lz4' | 'zstd' | 'uncompressed' | 'producer';
        'delete.retention.ms'?: string;
        'file.delete.delay.ms'?: string;
        'flush.messages'?: string;
        'flush.ms'?: string;
        'follower.replication.throttled.replicas'?: string;
        'index.interval.bytes'?: string;
        'leader.replication.throttled.replicas'?: string;
        'max.compaction.lag.ms'?: string;
        'max.message.bytes'?: string;
        'message.format.version'?: string;
        'message.timestamp.difference.max.ms'?: string;
        'message.timestamp.type'?: string;
        'min.cleanable.dirty.ratio'?: string;
        'min.compaction.lag.ms'?: string;
        'min.insync.replicas'?: string;
        'preallocate'?: string;
        'retention.bytes'?: string;
        'retention.ms'?: string;
        'segment.bytes'?: string;
        'segment.index.bytes'?: string;
        'segment.jitter.ms'?: string;
        'segment.ms'?: string;
        'unclean.leader.election.enable'?: string;
        'message.downconversion.enable'?: string;
    } | { [cfg: string]: string; };
}

export interface IAdminClient {
    createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
    createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

    deleteTopic(topic: string, cb?: (err: LibrdKafkaError) => void): void;
    deleteTopic(topic: string, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

    createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
    createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

    disconnect(): void;
}

export abstract class AdminClient {
    static create(conf: GlobalConfig): IAdminClient;
}
