UNPKG

8.71 kBTypeScriptView Raw
1import { Readable, Writable } from 'stream';
2
3export class Client {
4 constructor (connectionString: string, clientId?: string, options?: ZKOptions, noBatchOptions?: AckBatchOptions, sslOptions?: any);
5
6 close (cb?: () => void): void;
7
8 topicExists (topics: string[], cb: (error?: TopicsNotExistError | any) => any): void;
9
10 refreshMetadata (topics: string[], cb?: (error?: any) => any): void;
11
12 sendOffsetCommitV2Request (group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
13
14 // Note: socket_error is currently KafkaClient only, and zkReconnect is currently Client only.
15 on (eventName: 'brokersChanged' | 'close' | 'connect' | 'ready' | 'reconnect' | 'zkReconnect', cb: () => any): this;
16 on (eventName: 'error' | 'socket_error', cb: (error: any) => any): this;
17}
18
19export class KafkaClient extends Client {
20 constructor (options?: KafkaClientOptions);
21
22 connect (): void;
23}
24
25export class Producer {
26 constructor (client: Client, options?: ProducerOptions, customPartitioner?: CustomPartitioner);
27
28 on (eventName: 'ready', cb: () => any): void;
29 on (eventName: 'error', cb: (error: any) => any): void;
30
31 send (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
32
33 createTopics (topics: string[], async: boolean, cb: (error: any, data: any) => any): void;
34 createTopics (topics: string[], cb: (error: any, data: any) => any): void;
35
36 close (cb?: () => any): void;
37}
38
39export class HighLevelProducer extends Producer {
40}
41
42export class Consumer {
43 client: Client;
44
45 constructor (client: Client, fetchRequests: Array<OffsetFetchRequest | string>, options: ConsumerOptions);
46
47 on (eventName: 'message', cb: (message: Message) => any): void;
48 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
49
50 addTopics<T extends string[] | Topic[]> (topics: T, cb: (error: any, added: T) => any, fromOffset?: boolean): void;
51
52 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
53
54 commit (cb: (error: any, data: any) => any): void;
55 commit (force: boolean, cb: (error: any, data: any) => any): void;
56
57 setOffset (topic: string, partition: number, offset: number): void;
58
59 pause (): void;
60
61 resume (): void;
62
63 pauseTopics (topics: any[] /* Array<string|Topic> */): void;
64
65 resumeTopics (topics: any[] /* Array<string|Topic> */): void;
66
67 close (force: boolean, cb: (error: Error) => any): void;
68 close (cb: (error: Error) => any): void;
69}
70
71export class ConsumerGroupStream extends Readable {
72 client: Client;
73 consumerGroup: ConsumerGroup;
74
75 constructor (options: ConsumerGroupStreamOptions, topics: string | string[]);
76
77 commit (message: Message, force?: boolean, cb?: (error: any, data: any) => any): void;
78
79 transmitMessages(): void;
80
81 close (cb: () => any): void;
82}
83
84export class HighLevelConsumer {
85 client: Client;
86
87 constructor (client: Client, payloads: Topic[], options: HighLevelConsumerOptions);
88
89 on (eventName: 'message', cb: (message: Message) => any): void;
90 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
91 on (eventName: 'rebalancing' | 'rebalanced', cb: () => any): void;
92
93 addTopics (topics: string[] | Topic[], cb?: (error: any, added: string[] | Topic[]) => any): void;
94
95 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
96
97 commit (cb: (error: any, data: any) => any): void;
98 commit (force: boolean, cb: (error: any, data: any) => any): void;
99
100 sendOffsetCommitRequest (commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
101
102 setOffset (topic: string, partition: number, offset: number): void;
103
104 pause (): void;
105
106 resume (): void;
107
108 close (force: boolean, cb: () => any): void;
109 close (cb: () => any): void;
110}
111
112export class ConsumerGroup extends HighLevelConsumer {
113 generationId: number;
114 memberId: string;
115 client: KafkaClient & Client;
116
117 constructor (options: ConsumerGroupOptions, topics: string[] | string);
118
119 close (force: boolean, cb: (error: Error) => any): void;
120 close (cb: (error: Error) => any): void;
121}
122
123export class Offset {
124 constructor (client: Client);
125
126 on (eventName: 'ready' | 'connect', cb: () => any): void;
127 on (eventName: 'error', cb: (error: any) => any): void;
128
129 fetch (payloads: OffsetRequest[], cb: (error: any, data: any) => any): void;
130
131 commit (groupId: string, payloads: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
132
133 fetchCommits (groupId: string, payloads: OffsetFetchRequest[], cb: (error: any, data: any) => any): void;
134
135 fetchLatestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
136
137 fetchEarliestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
138}
139
140export class KeyedMessage {
141 constructor (key: string, value: string | Buffer);
142}
143
144export class ProducerStream extends Writable {
145 constructor (options?: ProducerStreamOptions);
146
147 sendPayload (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
148
149 close (cb?: () => any): void;
150
151 _write (message: ProduceRequest, encoding: 'buffer' | 'utf8', cb: (error: any, data: any) => any): void;
152
153 _writev (chunks: Chunk[], cb: (error: any, data: any) => any): void;
154}
155
156// # Interfaces
157
158export interface Message {
159 topic: string;
160 value: string | Buffer;
161 offset?: number;
162 partition?: number;
163 highWaterOffset?: number;
164 key?: string;
165}
166
167export interface ProducerOptions {
168 requireAcks?: number;
169 ackTimeoutMs?: number;
170 partitionerType?: number;
171}
172
173export interface KafkaClientOptions {
174 kafkaHost?: string;
175 connectTimeout?: number;
176 requestTimeout?: number;
177 autoConnect?: boolean;
178 connectRetryOptions?: RetryOptions;
179 sslOptions?: any;
180 clientId?: string;
181}
182
183export interface ProducerStreamOptions {
184 kafkaClient?: KafkaClientOptions;
185 producer?: ProducerOptions;
186 highWaterMark?: number;
187}
188
189export interface RetryOptions {
190 retries?: number;
191 factor?: number;
192 minTimeout?: number;
193 maxTimeout?: number;
194 randomize?: boolean;
195}
196
197export interface AckBatchOptions {
198 noAckBatchSize: number | null;
199 noAckBatchAge: number | null;
200}
201
202export interface ZKOptions {
203 sessionTimeout?: number;
204 spinDelay?: number;
205 retries?: number;
206}
207
208export interface ProduceRequest {
209 topic: string;
210 messages: any; // string[] | Array<KeyedMessage> | string | KeyedMessage
211 key?: string;
212 partition?: number;
213 attributes?: number;
214}
215
216export interface ConsumerOptions {
217 groupId?: string;
218 autoCommit?: boolean;
219 autoCommitIntervalMs?: number;
220 fetchMaxWaitMs?: number;
221 fetchMinBytes?: number;
222 fetchMaxBytes?: number;
223 fromOffset?: boolean;
224 encoding?: 'buffer' | 'utf8';
225 keyEncoding?: 'buffer' | 'utf8';
226}
227
228export interface HighLevelConsumerOptions extends ConsumerOptions {
229 id?: string;
230 maxNumSegments?: number;
231 maxTickMessages?: number;
232 rebalanceRetry?: RetryOptions;
233}
234
235export interface CustomPartitionAssignmentProtocol {
236 name: string;
237 version: number;
238 userData: {};
239
240 assign (topicPattern: any, groupMembers: any, cb: (error: any, result: any) => void): void;
241}
242
243export interface ConsumerGroupOptions {
244 kafkaHost?: string;
245 host?: string;
246 zk?: ZKOptions;
247 batch?: AckBatchOptions;
248 ssl?: boolean;
249 id?: string;
250 groupId: string;
251 sessionTimeout?: number;
252 encoding?: 'buffer' | 'utf8';
253 keyEncoding?: 'buffer' | 'utf8';
254 protocol?: Array<'roundrobin' | 'range' | CustomPartitionAssignmentProtocol>;
255 fromOffset?: 'earliest' | 'latest' | 'none';
256 outOfRangeOffset?: 'earliest' | 'latest' | 'none';
257 migrateHLC?: boolean;
258 migrateRolling?: boolean;
259 autoCommit?: boolean;
260 autoCommitIntervalMs?: number;
261 fetchMaxWaitMs?: number;
262 maxNumSegments?: number;
263 maxTickMessages?: number;
264 fetchMinBytes?: number;
265 fetchMaxBytes?: number;
266 retries?: number;
267 retryFactor?: number;
268 retryMinTimeout?: number;
269 connectOnReady?: boolean;
270 heartbeatInterval?: number;
271 onRebalance?: () => Promise<void>;
272}
273
274export interface ConsumerGroupStreamOptions extends ConsumerGroupOptions {
275 highWaterMark?: number;
276}
277
278export interface Topic {
279 topic: string;
280 offset?: number;
281 encoding?: string;
282 autoCommit?: boolean;
283}
284
285export interface OffsetRequest {
286 topic: string;
287 partition?: number;
288 time?: number;
289 maxNum?: number;
290}
291
292export interface OffsetCommitRequest {
293 topic: string;
294 partition?: number;
295 offset: number;
296 metadata?: string;
297}
298
299export interface OffsetFetchRequest {
300 topic: string;
301 partition?: number;
302 offset?: number;
303}
304
305export interface Chunk {
306 chunk: ProduceRequest;
307}
308
309export class TopicsNotExistError extends Error {
310 topics: string | string[];
311}
312
313export type CustomPartitioner = (partitions: number[], key: any) => number;