import * as kafka from '..';

/**
 * KAFKA CLIENT
 */

const basicKafkaClient = new kafka.KafkaClient();

const optionsKafkaClient = new kafka.KafkaClient({
  kafkaHost: 'localhost:9092',
  connectTimeout: 1000,
  requestTimeout: 1000,
  autoConnect: true,
  sslOptions: {},
  clientId: 'client id',
  connectRetryOptions: {
    retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: true
  }
});

optionsKafkaClient.connect();

/**
 * KAFKA PRODUCER
 */
const optionsProducer = new kafka.Producer(basicKafkaClient, { requireAcks: 0, ackTimeoutMs: 0, partitionerType: 0 });

const producer = new kafka.Producer(basicKafkaClient);
producer.on('error', (error: Error) => { });
producer.on('ready', () => {
  const messages = [
    { topic: 'topicName', messages: ['message body'], partition: 0, attributes: 2 },
    { topic: 'topicName', messages: ['message body'], partition: 0 },
    { topic: 'topicName', messages: ['message body'], attributes: 0 },
    { topic: 'topicName', messages: ['message body'] },
    { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
  ];

  producer.send(messages, (err: Error) => { });
  producer.send(messages, (err: Error, data: any) => { });

  producer.createTopics(['t'], true, (err: Error, data: any) => { });
  producer.createTopics(['t'], (err: Error, data: any) => { });
  producer.createTopics(['t'], false, () => { });
  producer.close();
});

/**
 * KAFKA HIGH LEVEL PRODUCER
 */
const highLevelProducer = new kafka.HighLevelProducer(basicKafkaClient);

highLevelProducer.on('error', (error: Error) => { });
highLevelProducer.on('ready', () => {
  const messages = [
    { topic: 'topicName', messages: ['message body'], attributes: 2 },
    { topic: 'topicName', messages: ['message body'], partition: 0 },
    { topic: 'topicName', messages: ['message body'], attributes: 0 },
    { topic: 'topicName', messages: ['message body'] },
    { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
  ];

  highLevelProducer.send(messages, (err: Error) => { });
  highLevelProducer.send(messages, (err: Error, data: any) => { });

  producer.createTopics(['t'], true, (err: Error, data: any) => { });
  producer.createTopics(['t'], (err: Error, data: any) => { });
  producer.createTopics(['t'], false, () => { });
  producer.close();
});

/**
 * KAFKA CONSUMER
 */
const fetchRequests = [{ topic: 'awesome' }];
const consumer = new kafka.Consumer(basicKafkaClient, fetchRequests, { groupId: 'abcde', autoCommit: true });

consumer.on('error', (error: Error) => { });
consumer.on('offsetOutOfRange', (error: Error) => { });
consumer.on('message', (message: kafka.Message) => {
  const topic = message.topic;
  const value = message.value;
  const offset = message.offset;
  const partition = message.partition;
  const highWaterOffset = message.highWaterOffset;
  const key = message.key;
});

consumer.addTopics(['t1', 't2'], (err: any, added: any) => { });
consumer.addTopics([{ topic: 't1', offset: 10 }], (err: any, added: any) => { }, true);

consumer.removeTopics(['t1', 't2'], (err: any, removed: number) => { });
consumer.removeTopics('t2', (err: any, removed: number) => { });

consumer.commit((err: any, data: any) => { });
consumer.commit(true, (err: any, data: any) => { });

consumer.setOffset('topic', 0, 0);

consumer.pause();
consumer.resume();
consumer.pauseTopics(['topic1', { topic: 'topic2', partition: 0 }]);
consumer.resumeTopics(['topic1', { topic: 'topic2', partition: 0 }]);

consumer.close(true, () => { });
consumer.close((err: any) => { });

/**
 * KAFKA CONSUMER GROUP
 */
const ackBatchOptions = { noAckBatchSize: 1024, noAckBatchAge: 10 };
const cgOptions: kafka.ConsumerGroupOptions = {
  kafkaHost: 'localhost:9092',
  batch: ackBatchOptions,
  groupId: 'groupID',
  id: 'consumerID',
  encoding: 'buffer',
  keyEncoding: 'buffer',
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  migrateHLC: false,
  migrateRolling: true
};

const consumerGroup = new kafka.ConsumerGroup(cgOptions, ['topic1']);
consumerGroup.on('error', (err) => { });
consumerGroup.on('message', (msg) => { });
consumerGroup.close(true, (err: Error) => { });

const offset = new kafka.Offset(basicKafkaClient);

offset.on('ready', () => { });

offset.fetch([{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }, { topic: 't' }], (err: any, data: any) => { });

offset.commit('groupId', [{ topic: 't', partition: 0, offset: 10 }], (err, data) => { });

offset.fetchCommits('groupId', [{ topic: 't', partition: 0 }], (err, data) => { });

offset.fetchLatestOffsets(['t'], (err, offsets) => { });
offset.fetchEarliestOffsets(['t'], (err, offsets) => { });
