UNPKG

6.34 kBPlain TextView Raw
1import * as kafka from '..';
2
3/**
4 * KAFKA CLIENT
5 */
6const basicClient = new kafka.Client('localhost:2181/', 'sendMessage');
7
8const optionsClient = new kafka.Client(
9 'localhost:2181/',
10 'sendMessage',
11 { sessionTimeout: 30000, spinDelay: 1000, retries: 0 },
12 { noAckBatchSize: 1000, noAckBatchAge: 1000 * 10 },
13 { rejectUnauthorized: false }
14);
15
16optionsClient.topicExists(['topic'], (error: any) => { });
17optionsClient.refreshMetadata(['topic'], (error: any) => { });
18optionsClient.close();
19optionsClient.sendOffsetCommitV2Request('group', 0, 'memberId', [], () => { });
20optionsClient.close(() => { });
21
22const basicKafkaClient = new kafka.KafkaClient();
23
24const optionsKafkaClient = new kafka.KafkaClient({
25 kafkaHost: 'localhost:2181',
26 connectTimeout: 1000,
27 requestTimeout: 1000,
28 autoConnect: true,
29 sslOptions: {},
30 clientId: 'client id',
31 connectRetryOptions: {
32 retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: true
33 }
34});
35
36optionsKafkaClient.connect();
37
38/**
39 * KAFKA PRODUCER
40 */
41const optionsProducer = new kafka.Producer(basicClient, { requireAcks: 0, ackTimeoutMs: 0, partitionerType: 0 });
42
43const producer = new kafka.Producer(basicClient);
44producer.on('error', (error: Error) => { });
45producer.on('ready', () => {
46 const messages = [
47 { topic: 'topicName', messages: ['message body'], partition: 0, attributes: 2 },
48 { topic: 'topicName', messages: ['message body'], partition: 0 },
49 { topic: 'topicName', messages: ['message body'], attributes: 0 },
50 { topic: 'topicName', messages: ['message body'] },
51 { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
52 ];
53
54 producer.send(messages, (err: Error) => { });
55 producer.send(messages, (err: Error, data: any) => { });
56
57 producer.createTopics(['t'], true, (err: Error, data: any) => { });
58 producer.createTopics(['t'], (err: Error, data: any) => { });
59 producer.createTopics(['t'], false, () => { });
60 producer.close();
61});
62
63/**
64 * KAFKA HIGH LEVEL PRODUCER
65 */
66const highLevelProducer = new kafka.HighLevelProducer(basicClient);
67
68highLevelProducer.on('error', (error: Error) => { });
69highLevelProducer.on('ready', () => {
70 const messages = [
71 { topic: 'topicName', messages: ['message body'], attributes: 2 },
72 { topic: 'topicName', messages: ['message body'], partition: 0 },
73 { topic: 'topicName', messages: ['message body'], attributes: 0 },
74 { topic: 'topicName', messages: ['message body'] },
75 { topic: 'topicName', messages: [new kafka.KeyedMessage('key', 'message')] }
76 ];
77
78 highLevelProducer.send(messages, (err: Error) => { });
79 highLevelProducer.send(messages, (err: Error, data: any) => { });
80
81 producer.createTopics(['t'], true, (err: Error, data: any) => { });
82 producer.createTopics(['t'], (err: Error, data: any) => { });
83 producer.createTopics(['t'], false, () => { });
84 producer.close();
85});
86
87/**
88 * KAFKA CONSUMER
89 */
90const fetchRequests = [{ topic: 'awesome' }];
91const consumer = new kafka.Consumer(basicClient, fetchRequests, { groupId: 'abcde', autoCommit: true });
92
93consumer.on('error', (error: Error) => { });
94consumer.on('offsetOutOfRange', (error: Error) => { });
95consumer.on('message', (message: kafka.Message) => {
96 const topic = message.topic;
97 const value = message.value;
98 const offset = message.offset;
99 const partition = message.partition;
100 const highWaterOffset = message.highWaterOffset;
101 const key = message.key;
102});
103
104consumer.addTopics(['t1', 't2'], (err: any, added: any) => { });
105consumer.addTopics([{ topic: 't1', offset: 10 }], (err: any, added: any) => { }, true);
106
107consumer.removeTopics(['t1', 't2'], (err: any, removed: number) => { });
108consumer.removeTopics('t2', (err: any, removed: number) => { });
109
110consumer.commit((err: any, data: any) => { });
111consumer.commit(true, (err: any, data: any) => { });
112
113consumer.setOffset('topic', 0, 0);
114
115consumer.pause();
116consumer.resume();
117consumer.pauseTopics(['topic1', { topic: 'topic2', partition: 0 }]);
118consumer.resumeTopics(['topic1', { topic: 'topic2', partition: 0 }]);
119
120consumer.close(true, () => { });
121consumer.close((err: any) => { });
122
123/**
124 * KAFKA HIGH LEVEL CONSUMER
125 */
126const fetchRequests1 = [{ topic: 'awesome' }];
127const hlConsumer = new kafka.HighLevelConsumer(basicClient, fetchRequests1, { groupId: 'abcde', autoCommit: true });
128
129hlConsumer.on('error', (error: Error) => { });
130hlConsumer.on('offsetOutOfRange', (error: Error) => { });
131hlConsumer.on('message', (message: kafka.Message) => {
132 const topic = message.topic;
133 const value = message.value;
134 const offset = message.offset;
135 const partition = message.partition;
136 const highWaterOffset = message.highWaterOffset;
137 const key = message.key;
138});
139
140hlConsumer.addTopics(['t1', 't2'], (err: any, added: any) => { });
141hlConsumer.addTopics([{ topic: 't1', offset: 10 }], (err: any, added: any) => { });
142
143hlConsumer.removeTopics(['t1', 't2'], (err: any, removed: number) => { });
144hlConsumer.removeTopics('t2', (err: any, removed: number) => { });
145
146hlConsumer.commit((err: any, data: any) => { });
147hlConsumer.commit(true, (err: any, data: any) => { });
148
149hlConsumer.setOffset('topic', 0, 0);
150
151hlConsumer.pause();
152hlConsumer.resume();
153
154hlConsumer.close(true, () => { });
155hlConsumer.close(() => { });
156
157/**
158 * KAFKA CONSUMER GROUP
159 */
160const ackBatchOptions = { noAckBatchSize: 1024, noAckBatchAge: 10 };
161const cgOptions: kafka.ConsumerGroupOptions = {
162 host: 'localhost:2181',
163 batch: ackBatchOptions,
164 groupId: 'groupID',
165 id: 'consumerID',
166 encoding: 'buffer',
167 keyEncoding: 'buffer',
168 sessionTimeout: 15000,
169 protocol: ['roundrobin'],
170 fromOffset: 'latest',
171 migrateHLC: false,
172 migrateRolling: true
173};
174
175const consumerGroup = new kafka.ConsumerGroup(cgOptions, ['topic1']);
176consumerGroup.on('error', (err) => { });
177consumerGroup.on('message', (msg) => { });
178consumerGroup.close(true, (err: Error) => { });
179
180const offset = new kafka.Offset(basicClient);
181
182offset.on('ready', () => { });
183
184offset.fetch([{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }, { topic: 't' }], (err: any, data: any) => { });
185
186offset.commit('groupId', [{ topic: 't', partition: 0, offset: 10 }], (err, data) => { });
187
188offset.fetchCommits('groupId', [{ topic: 't', partition: 0 }], (err, data) => { });
189
190offset.fetchLatestOffsets(['t'], (err, offsets) => { });
191offset.fetchEarliestOffsets(['t'], (err, offsets) => { });