UNPKG

9.41 kBTypeScriptView Raw
1import { Readable, Writable } from 'stream';
2import { EventEmitter } from 'events';
3
4export class KafkaClient extends EventEmitter {
5 constructor (options?: KafkaClientOptions);
6
7 close (cb?: () => void): void;
8
9 topicExists (topics: string[], cb: (error?: TopicsNotExistError | any) => any): void;
10
11 refreshMetadata (topics: string[], cb?: (error?: any) => any): void;
12
13 sendOffsetCommitV2Request (group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
14
15 // Note: socket_error is currently KafkaClient only, and zkReconnect is currently Client only.
16 on (eventName: 'brokersChanged' | 'close' | 'connect' | 'ready' | 'reconnect' | 'zkReconnect', cb: () => any): this;
17 on (eventName: 'error' | 'socket_error', cb: (error: any) => any): this;
18
19 connect (): void;
20
21 createTopics (topics: CreateTopicRequest[], callback: (error: any, result: CreateTopicResponse[]) => any): void;
22
23 loadMetadataForTopics (topics: string[], callback: (error: any, result: MetadataResponse) => any): void;
24}
25
26export class Producer {
27 constructor (client: KafkaClient, options?: ProducerOptions, customPartitioner?: CustomPartitioner);
28
29 on (eventName: 'ready', cb: () => any): void;
30 on (eventName: 'error', cb: (error: any) => any): void;
31
32 send (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
33
34 createTopics (topics: string[], async: boolean, cb: (error: any, data: any) => any): void;
35 createTopics (topics: string[], cb: (error: any, data: any) => any): void;
36
37 close (cb?: () => any): void;
38}
39
40export class HighLevelProducer extends Producer {
41}
42
43export class Consumer {
44 client: KafkaClient;
45
46 constructor (client: KafkaClient, fetchRequests: Array<OffsetFetchRequest | string>, options: ConsumerOptions);
47
48 on (eventName: 'message', cb: (message: Message) => any): void;
49 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
50
51 addTopics<T extends string[] | Topic[]> (topics: T, cb: (error: any, added: T) => any, fromOffset?: boolean): void;
52
53 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
54
55 commit (cb: (error: any, data: any) => any): void;
56 commit (force: boolean, cb: (error: any, data: any) => any): void;
57
58 setOffset (topic: string, partition: number, offset: number): void;
59
60 pause (): void;
61
62 resume (): void;
63
64 pauseTopics (topics: any[] /* Array<string|Topic> */): void;
65
66 resumeTopics (topics: any[] /* Array<string|Topic> */): void;
67
68 close (force: boolean, cb: (error: Error) => any): void;
69 close (cb: (error: Error) => any): void;
70}
71
72export class ConsumerGroupStream extends Readable {
73 client: KafkaClient;
74 consumerGroup: ConsumerGroup;
75
76 constructor (options: ConsumerGroupStreamOptions, topics: string | string[]);
77
78 commit (message: Message, force?: boolean, cb?: (error: any, data: any) => any): void;
79
80 transmitMessages (): void;
81
82 close (cb: () => any): void;
83}
84
85export class ConsumerGroup {
86 generationId: number;
87 memberId: string;
88 client: KafkaClient;
89
90 constructor (options: ConsumerGroupOptions, topics: string[] | string);
91
92 close (force: boolean, cb: (error: Error) => any): void;
93 close (cb: (error: Error) => any): void;
94
95 on (eventName: 'message', cb: (message: Message) => any): void;
96 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
97 on (eventName: 'rebalancing' | 'rebalanced', cb: () => any): void;
98
99 addTopics (topics: string[] | Topic[], cb?: (error: any, added: string[] | Topic[]) => any): void;
100
101 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
102
103 commit (cb: (error: any, data: any) => any): void;
104 commit (force: boolean, cb: (error: any, data: any) => any): void;
105
106 sendOffsetCommitRequest (commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
107
108 setOffset (topic: string, partition: number, offset: number): void;
109
110 pause (): void;
111
112 resume (): void;
113}
114
115export class Offset {
116 constructor (client: KafkaClient);
117
118 on (eventName: 'ready' | 'connect', cb: () => any): void;
119 on (eventName: 'error', cb: (error: any) => any): void;
120
121 fetch (payloads: OffsetRequest[], cb: (error: any, data: any) => any): void;
122
123 commit (groupId: string, payloads: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
124
125 fetchCommits (groupId: string, payloads: OffsetFetchRequest[], cb: (error: any, data: any) => any): void;
126
127 fetchLatestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
128
129 fetchEarliestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
130}
131
132export class KeyedMessage {
133 constructor (key: string | Buffer, value: string | Buffer);
134}
135
136export class ProducerStream extends Writable {
137 constructor (options?: ProducerStreamOptions);
138
139 sendPayload (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
140
141 close (cb?: () => any): void;
142
143 _write (message: ProduceRequest, encoding: 'buffer' | 'utf8', cb: (error: any, data: any) => any): void;
144
145 _writev (chunks: Chunk[], cb: (error: any, data: any) => any): void;
146}
147
148// # Interfaces
149
150export interface Message {
151 topic: string;
152 value: string | Buffer;
153 offset?: number;
154 partition?: number;
155 highWaterOffset?: number;
156 key?: string | Buffer;
157}
158
159export interface KeyedMessage {
160 key: string | Buffer;
161 value: string | Buffer;
162}
163
164export interface ProducerOptions {
165 requireAcks?: number;
166 ackTimeoutMs?: number;
167 partitionerType?: number;
168}
169
170export interface KafkaClientOptions {
171 kafkaHost?: string;
172 connectTimeout?: number;
173 requestTimeout?: number;
174 autoConnect?: boolean;
175 connectRetryOptions?: RetryOptions;
176 sslOptions?: any;
177 clientId?: string;
178 idleConnection?: number;
179 maxAsyncRequests?: number;
180 sasl?: any;
181}
182
183export interface ProducerStreamOptions {
184 kafkaClient?: KafkaClientOptions;
185 producer?: ProducerOptions;
186 highWaterMark?: number;
187}
188
189export interface RetryOptions {
190 retries?: number;
191 factor?: number;
192 minTimeout?: number;
193 maxTimeout?: number;
194 randomize?: boolean;
195}
196
197export interface AckBatchOptions {
198 noAckBatchSize: number | null;
199 noAckBatchAge: number | null;
200}
201
202export interface ProduceRequest {
203 topic: string;
204 messages: any; // string[] | Array<KeyedMessage> | string | KeyedMessage
205 key?: string | Buffer;
206 partition?: number;
207 attributes?: number;
208}
209
210export interface ConsumerOptions {
211 groupId?: string;
212 autoCommit?: boolean;
213 autoCommitIntervalMs?: number;
214 fetchMaxWaitMs?: number;
215 fetchMinBytes?: number;
216 fetchMaxBytes?: number;
217 fromOffset?: boolean;
218 encoding?: 'buffer' | 'utf8';
219 keyEncoding?: 'buffer' | 'utf8';
220}
221
222export interface CustomPartitionAssignmentProtocol {
223 name: string;
224 version: number;
225 userData: {};
226
227 assign (topicPattern: any, groupMembers: any, cb: (error: any, result: any) => void): void;
228}
229
230export interface ConsumerGroupOptions {
231 kafkaHost?: string;
232 batch?: AckBatchOptions;
233 ssl?: boolean;
234 sslOptions?: any;
235 id?: string;
236 groupId: string;
237 sessionTimeout?: number;
238 encoding?: 'buffer' | 'utf8';
239 keyEncoding?: 'buffer' | 'utf8';
240 protocol?: Array<'roundrobin' | 'range' | CustomPartitionAssignmentProtocol>;
241 fromOffset?: 'earliest' | 'latest' | 'none';
242 outOfRangeOffset?: 'earliest' | 'latest' | 'none';
243 migrateHLC?: boolean;
244 migrateRolling?: boolean;
245 autoCommit?: boolean;
246 autoCommitIntervalMs?: number;
247 fetchMaxWaitMs?: number;
248 maxNumSegments?: number;
249 maxTickMessages?: number;
250 fetchMinBytes?: number;
251 fetchMaxBytes?: number;
252 retries?: number;
253 retryFactor?: number;
254 retryMinTimeout?: number;
255 connectOnReady?: boolean;
256 heartbeatInterval?: number;
257 onRebalance?: () => Promise<void>;
258}
259
260export interface ConsumerGroupStreamOptions extends ConsumerGroupOptions {
261 highWaterMark?: number;
262}
263
264export interface Topic {
265 topic: string;
266 offset?: number;
267 encoding?: string;
268 autoCommit?: boolean;
269}
270
271export interface OffsetRequest {
272 topic: string;
273 partition?: number;
274 time?: number;
275 maxNum?: number;
276}
277
278export interface OffsetCommitRequest {
279 topic: string;
280 partition?: number;
281 offset: number;
282 metadata?: string;
283}
284
285export interface OffsetFetchRequest {
286 topic: string;
287 partition?: number;
288 offset?: number;
289}
290
291export interface Chunk {
292 chunk: ProduceRequest;
293}
294
295export class TopicsNotExistError extends Error {
296 topics: string | string[];
297}
298
299export type CustomPartitioner = (partitions: number[], key: string | Buffer) => number;
300
301export interface CreateTopicRequest {
302 topic: string;
303 partitions: number;
304 replicationFactor: number;
305 configEntries?: {
306 name: string;
307 value: string;
308 }[];
309 replicaAssignment?: {
310 partition: number;
311 replicas: number[];
312 }[];
313}
314
315export interface CreateTopicResponse {
316 topic: string;
317 error: string;
318}
319
320export interface BrokerMetadataResponse {
321 [id: number]: {
322 host: string;
323 nodeId: number;
324 port: number;
325 };
326}
327
328export interface ClusterMetadataResponse {
329 clusterMetadata: {
330 controllerId: number;
331 };
332 metadata: {
333 [topic: string]: {
334 [partition: number]: {
335 leader: number;
336 partition: number;
337 topic: string;
338 replicas: number[];
339 isr: number[];
340 };
341 };
342 };
343}
344
345export interface MetadataResponse extends Array<BrokerMetadataResponse|ClusterMetadataResponse> {
346 0: BrokerMetadataResponse;
347 1: ClusterMetadataResponse;
348}