UNPKG

4.69 kBPlain TextView Raw
1import * as kafka from '..';
2
3/**
4 * KAFKA CLIENT
5 */
6
7const basicKafkaClient = new kafka.KafkaClient();
8
9const optionsKafkaClient = new kafka.KafkaClient({
10 kafkaHost: 'localhost:9092',
11 connectTimeout: 1000,
12 requestTimeout: 1000,
13 autoConnect: true,
14 sslOptions: {},
15 clientId: 'client id',
16 connectRetryOptions: {
17 retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: true
18 }
19});
20
21optionsKafkaClient.connect();
22
23/**
24 * KAFKA PRODUCER
25 */
26const optionsProducer = new kafka.Producer(basicKafkaClient, { requireAcks: 0, ackTimeoutMs: 0, partitionerType: 0 });
27
28const producer = new kafka.Producer(basicKafkaClient);
29producer.on('error', (error: Error) => { });
30producer.on('ready', () => {
31 const messages = [
32 { topic: 'topicName', messages: ['message body'], partition: 0, attributes: 2 },
33 { topic: 'topicName', messages: ['message body'], partition: 0 },
34 { topic: 'topicName', messages: ['message body'], attributes: 0 },
35 { topic: 'topicName', messages: ['message body'] },
36 { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
37 ];
38
39 producer.send(messages, (err: Error) => { });
40 producer.send(messages, (err: Error, data: any) => { });
41
42 producer.createTopics(['t'], true, (err: Error, data: any) => { });
43 producer.createTopics(['t'], (err: Error, data: any) => { });
44 producer.createTopics(['t'], false, () => { });
45 producer.close();
46});
47
48/**
49 * KAFKA HIGH LEVEL PRODUCER
50 */
51const highLevelProducer = new kafka.HighLevelProducer(basicKafkaClient);
52
53highLevelProducer.on('error', (error: Error) => { });
54highLevelProducer.on('ready', () => {
55 const messages = [
56 { topic: 'topicName', messages: ['message body'], attributes: 2 },
57 { topic: 'topicName', messages: ['message body'], partition: 0 },
58 { topic: 'topicName', messages: ['message body'], attributes: 0 },
59 { topic: 'topicName', messages: ['message body'] },
60 { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
61 ];
62
63 highLevelProducer.send(messages, (err: Error) => { });
64 highLevelProducer.send(messages, (err: Error, data: any) => { });
65
66 producer.createTopics(['t'], true, (err: Error, data: any) => { });
67 producer.createTopics(['t'], (err: Error, data: any) => { });
68 producer.createTopics(['t'], false, () => { });
69 producer.close();
70});
71
72/**
73 * KAFKA CONSUMER
74 */
75const fetchRequests = [{ topic: 'awesome' }];
76const consumer = new kafka.Consumer(basicKafkaClient, fetchRequests, { groupId: 'abcde', autoCommit: true });
77
78consumer.on('error', (error: Error) => { });
79consumer.on('offsetOutOfRange', (error: Error) => { });
80consumer.on('message', (message: kafka.Message) => {
81 const topic = message.topic;
82 const value = message.value;
83 const offset = message.offset;
84 const partition = message.partition;
85 const highWaterOffset = message.highWaterOffset;
86 const key = message.key;
87});
88
89consumer.addTopics(['t1', 't2'], (err: any, added: any) => { });
90consumer.addTopics([{ topic: 't1', offset: 10 }], (err: any, added: any) => { }, true);
91
92consumer.removeTopics(['t1', 't2'], (err: any, removed: number) => { });
93consumer.removeTopics('t2', (err: any, removed: number) => { });
94
95consumer.commit((err: any, data: any) => { });
96consumer.commit(true, (err: any, data: any) => { });
97
98consumer.setOffset('topic', 0, 0);
99
100consumer.pause();
101consumer.resume();
102consumer.pauseTopics(['topic1', { topic: 'topic2', partition: 0 }]);
103consumer.resumeTopics(['topic1', { topic: 'topic2', partition: 0 }]);
104
105consumer.close(true, () => { });
106consumer.close((err: any) => { });
107
108/**
109 * KAFKA CONSUMER GROUP
110 */
111const ackBatchOptions = { noAckBatchSize: 1024, noAckBatchAge: 10 };
112const cgOptions: kafka.ConsumerGroupOptions = {
113 kafkaHost: 'localhost:9092',
114 batch: ackBatchOptions,
115 groupId: 'groupID',
116 id: 'consumerID',
117 encoding: 'buffer',
118 keyEncoding: 'buffer',
119 sessionTimeout: 15000,
120 protocol: ['roundrobin'],
121 fromOffset: 'latest',
122 migrateHLC: false,
123 migrateRolling: true
124};
125
126const consumerGroup = new kafka.ConsumerGroup(cgOptions, ['topic1']);
127consumerGroup.on('error', (err) => { });
128consumerGroup.on('message', (msg) => { });
129consumerGroup.close(true, (err: Error) => { });
130
131const offset = new kafka.Offset(basicKafkaClient);
132
133offset.on('ready', () => { });
134
135offset.fetch([{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }, { topic: 't' }], (err: any, data: any) => { });
136
137offset.commit('groupId', [{ topic: 't', partition: 0, offset: 10 }], (err, data) => { });
138
139offset.fetchCommits('groupId', [{ topic: 't', partition: 0 }], (err, data) => { });
140
141offset.fetchLatestOffsets(['t'], (err, offsets) => { });
142offset.fetchEarliestOffsets(['t'], (err, offsets) => { });