UNPKG

8.2 kBTypeScriptView Raw
1import { Readable, Writable } from 'stream';
2import { EventEmitter } from 'events';
3
4export class KafkaClient extends EventEmitter {
5 constructor (options?: KafkaClientOptions);
6
7 close (cb?: () => void): void;
8
9 topicExists (topics: string[], cb: (error?: TopicsNotExistError | any) => any): void;
10
11 refreshMetadata (topics: string[], cb?: (error?: any) => any): void;
12
13 sendOffsetCommitV2Request (group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
14
15 // Note: socket_error is currently KafkaClient only, and zkReconnect is currently Client only.
16 on (eventName: 'brokersChanged' | 'close' | 'connect' | 'ready' | 'reconnect' | 'zkReconnect', cb: () => any): this;
17 on (eventName: 'error' | 'socket_error', cb: (error: any) => any): this;
18
19 connect (): void;
20}
21
22export class Producer {
23 constructor (client: KafkaClient, options?: ProducerOptions, customPartitioner?: CustomPartitioner);
24
25 on (eventName: 'ready', cb: () => any): void;
26 on (eventName: 'error', cb: (error: any) => any): void;
27
28 send (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
29
30 createTopics (topics: string[], async: boolean, cb: (error: any, data: any) => any): void;
31 createTopics (topics: string[], cb: (error: any, data: any) => any): void;
32
33 close (cb?: () => any): void;
34}
35
36export class HighLevelProducer extends Producer {
37}
38
39export class Consumer {
40 client: KafkaClient;
41
42 constructor (client: KafkaClient, fetchRequests: Array<OffsetFetchRequest | string>, options: ConsumerOptions);
43
44 on (eventName: 'message', cb: (message: Message) => any): void;
45 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
46
47 addTopics<T extends string[] | Topic[]> (topics: T, cb: (error: any, added: T) => any, fromOffset?: boolean): void;
48
49 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
50
51 commit (cb: (error: any, data: any) => any): void;
52 commit (force: boolean, cb: (error: any, data: any) => any): void;
53
54 setOffset (topic: string, partition: number, offset: number): void;
55
56 pause (): void;
57
58 resume (): void;
59
60 pauseTopics (topics: any[] /* Array<string|Topic> */): void;
61
62 resumeTopics (topics: any[] /* Array<string|Topic> */): void;
63
64 close (force: boolean, cb: (error: Error) => any): void;
65 close (cb: (error: Error) => any): void;
66}
67
68export class ConsumerGroupStream extends Readable {
69 client: KafkaClient;
70 consumerGroup: ConsumerGroup;
71
72 constructor (options: ConsumerGroupStreamOptions, topics: string | string[]);
73
74 commit (message: Message, force?: boolean, cb?: (error: any, data: any) => any): void;
75
76 transmitMessages (): void;
77
78 close (cb: () => any): void;
79}
80
81export class ConsumerGroup {
82 generationId: number;
83 memberId: string;
84 client: KafkaClient;
85
86 constructor (options: ConsumerGroupOptions, topics: string[] | string);
87
88 close (force: boolean, cb: (error: Error) => any): void;
89 close (cb: (error: Error) => any): void;
90
91 on (eventName: 'message', cb: (message: Message) => any): void;
92 on (eventName: 'error' | 'offsetOutOfRange', cb: (error: any) => any): void;
93 on (eventName: 'rebalancing' | 'rebalanced', cb: () => any): void;
94
95 addTopics (topics: string[] | Topic[], cb?: (error: any, added: string[] | Topic[]) => any): void;
96
97 removeTopics (topics: string | string[], cb: (error: any, removed: number) => any): void;
98
99 commit (cb: (error: any, data: any) => any): void;
100 commit (force: boolean, cb: (error: any, data: any) => any): void;
101
102 sendOffsetCommitRequest (commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
103
104 setOffset (topic: string, partition: number, offset: number): void;
105
106 pause (): void;
107
108 resume (): void;
109}
110
111export class Offset {
112 constructor (client: KafkaClient);
113
114 on (eventName: 'ready' | 'connect', cb: () => any): void;
115 on (eventName: 'error', cb: (error: any) => any): void;
116
117 fetch (payloads: OffsetRequest[], cb: (error: any, data: any) => any): void;
118
119 commit (groupId: string, payloads: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
120
121 fetchCommits (groupId: string, payloads: OffsetFetchRequest[], cb: (error: any, data: any) => any): void;
122
123 fetchLatestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
124
125 fetchEarliestOffsets (topics: string[], cb: (error: any, data: any) => any): void;
126}
127
128export class KeyedMessage {
129 constructor (key: string | Buffer, value: string | Buffer);
130}
131
132export class ProducerStream extends Writable {
133 constructor (options?: ProducerStreamOptions);
134
135 sendPayload (payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
136
137 close (cb?: () => any): void;
138
139 _write (message: ProduceRequest, encoding: 'buffer' | 'utf8', cb: (error: any, data: any) => any): void;
140
141 _writev (chunks: Chunk[], cb: (error: any, data: any) => any): void;
142}
143
144// # Interfaces
145
146export interface Message {
147 topic: string;
148 value: string | Buffer;
149 offset?: number;
150 partition?: number;
151 highWaterOffset?: number;
152 key?: string | Buffer;
153}
154
155export interface KeyedMessage {
156 key: string | Buffer;
157 value: string | Buffer;
158}
159
160export interface ProducerOptions {
161 requireAcks?: number;
162 ackTimeoutMs?: number;
163 partitionerType?: number;
164}
165
166export interface KafkaClientOptions {
167 kafkaHost?: string;
168 connectTimeout?: number;
169 requestTimeout?: number;
170 autoConnect?: boolean;
171 connectRetryOptions?: RetryOptions;
172 sslOptions?: any;
173 clientId?: string;
174}
175
176export interface ProducerStreamOptions {
177 kafkaClient?: KafkaClientOptions;
178 producer?: ProducerOptions;
179 highWaterMark?: number;
180}
181
182export interface RetryOptions {
183 retries?: number;
184 factor?: number;
185 minTimeout?: number;
186 maxTimeout?: number;
187 randomize?: boolean;
188}
189
190export interface AckBatchOptions {
191 noAckBatchSize: number | null;
192 noAckBatchAge: number | null;
193}
194
195export interface ProduceRequest {
196 topic: string;
197 messages: any; // string[] | Array<KeyedMessage> | string | KeyedMessage
198 key?: string | Buffer;
199 partition?: number;
200 attributes?: number;
201}
202
203export interface ConsumerOptions {
204 groupId?: string;
205 autoCommit?: boolean;
206 autoCommitIntervalMs?: number;
207 fetchMaxWaitMs?: number;
208 fetchMinBytes?: number;
209 fetchMaxBytes?: number;
210 fromOffset?: boolean;
211 encoding?: 'buffer' | 'utf8';
212 keyEncoding?: 'buffer' | 'utf8';
213}
214
215export interface CustomPartitionAssignmentProtocol {
216 name: string;
217 version: number;
218 userData: {};
219
220 assign (topicPattern: any, groupMembers: any, cb: (error: any, result: any) => void): void;
221}
222
223export interface ConsumerGroupOptions {
224 kafkaHost?: string;
225 batch?: AckBatchOptions;
226 ssl?: boolean;
227 sslOptions?: any;
228 id?: string;
229 groupId: string;
230 sessionTimeout?: number;
231 encoding?: 'buffer' | 'utf8';
232 keyEncoding?: 'buffer' | 'utf8';
233 protocol?: Array<'roundrobin' | 'range' | CustomPartitionAssignmentProtocol>;
234 fromOffset?: 'earliest' | 'latest' | 'none';
235 outOfRangeOffset?: 'earliest' | 'latest' | 'none';
236 migrateHLC?: boolean;
237 migrateRolling?: boolean;
238 autoCommit?: boolean;
239 autoCommitIntervalMs?: number;
240 fetchMaxWaitMs?: number;
241 maxNumSegments?: number;
242 maxTickMessages?: number;
243 fetchMinBytes?: number;
244 fetchMaxBytes?: number;
245 retries?: number;
246 retryFactor?: number;
247 retryMinTimeout?: number;
248 connectOnReady?: boolean;
249 heartbeatInterval?: number;
250 onRebalance?: () => Promise<void>;
251}
252
253export interface ConsumerGroupStreamOptions extends ConsumerGroupOptions {
254 highWaterMark?: number;
255}
256
257export interface Topic {
258 topic: string;
259 offset?: number;
260 encoding?: string;
261 autoCommit?: boolean;
262}
263
264export interface OffsetRequest {
265 topic: string;
266 partition?: number;
267 time?: number;
268 maxNum?: number;
269}
270
271export interface OffsetCommitRequest {
272 topic: string;
273 partition?: number;
274 offset: number;
275 metadata?: string;
276}
277
278export interface OffsetFetchRequest {
279 topic: string;
280 partition?: number;
281 offset?: number;
282}
283
284export interface Chunk {
285 chunk: ProduceRequest;
286}
287
288export class TopicsNotExistError extends Error {
289 topics: string | string[];
290}
291
292export type CustomPartitioner = (partitions: number[], key: string | Buffer) => number;