UNPKG

32.4 kBTypeScriptView Raw
1/// <reference types="node" />
2
3import * as tls from 'tls'
4import * as net from 'net'
5
6type Without<T, U> = { [P in Exclude<keyof T, keyof U>]?: never }
7type XOR<T, U> = T | U extends object ? (Without<T, U> & U) | (Without<U, T> & T) : T | U
8
9export class Kafka {
10 constructor(config: KafkaConfig)
11 producer(config?: ProducerConfig): Producer
12 consumer(config: ConsumerConfig): Consumer
13 admin(config?: AdminConfig): Admin
14 logger(): Logger
15}
16
17export type BrokersFunction = () => string[] | Promise<string[]>
18
19export interface KafkaConfig {
20 brokers: string[] | BrokersFunction
21 ssl?: tls.ConnectionOptions | boolean
22 sasl?: SASLOptions
23 clientId?: string
24 connectionTimeout?: number
25 authenticationTimeout?: number
26 reauthenticationThreshold?: number
27 requestTimeout?: number
28 enforceRequestTimeout?: boolean
29 retry?: RetryOptions
30 socketFactory?: ISocketFactory
31 logLevel?: logLevel
32 logCreator?: logCreator
33}
34
35export interface ISocketFactoryArgs {
36 host: string
37 port: number
38 ssl: tls.ConnectionOptions
39 onConnect: () => void
40}
41
42export type ISocketFactory = (args: ISocketFactoryArgs) => net.Socket
43
44export interface OauthbearerProviderResponse {
45 value: string
46}
47
48type SASLMechanismOptionsMap = {
49 plain: { username: string; password: string }
50 'scram-sha-256': { username: string; password: string }
51 'scram-sha-512': { username: string; password: string }
52 aws: {
53 authorizationIdentity: string
54 accessKeyId: string
55 secretAccessKey: string
56 sessionToken?: string
57 }
58 oauthbearer: { oauthBearerProvider: () => Promise<OauthbearerProviderResponse> }
59}
60
61export type SASLMechanism = keyof SASLMechanismOptionsMap
62type SASLMechanismOptions<T> = T extends SASLMechanism
63 ? { mechanism: T } & SASLMechanismOptionsMap[T]
64 : never
65export type SASLOptions = SASLMechanismOptions<SASLMechanism>
66
67export interface ProducerConfig {
68 createPartitioner?: ICustomPartitioner
69 retry?: RetryOptions
70 metadataMaxAge?: number
71 allowAutoTopicCreation?: boolean
72 idempotent?: boolean
73 transactionalId?: string
74 transactionTimeout?: number
75 maxInFlightRequests?: number
76}
77
78export interface Message {
79 key?: Buffer | string | null
80 value: Buffer | string | null
81 partition?: number
82 headers?: IHeaders
83 timestamp?: string
84}
85
86export interface PartitionerArgs {
87 topic: string
88 partitionMetadata: PartitionMetadata[]
89 message: Message
90}
91
92export type ICustomPartitioner = () => (args: PartitionerArgs) => number
93export type DefaultPartitioner = ICustomPartitioner
94export type LegacyPartitioner = ICustomPartitioner
95
96export const Partitioners: {
97 DefaultPartitioner: DefaultPartitioner,
98 LegacyPartitioner: LegacyPartitioner,
99 /**
100 * @deprecated Use DefaultPartitioner instead
101 *
102 * The JavaCompatiblePartitioner was renamed DefaultPartitioner
103 * and made to be the default in 2.0.0.
104 */
105 JavaCompatiblePartitioner: DefaultPartitioner
106}
107
108export type PartitionMetadata = {
109 partitionErrorCode: number
110 partitionId: number
111 leader: number
112 replicas: number[]
113 isr: number[]
114 offlineReplicas?: number[]
115}
116
117export interface IHeaders {
118 [key: string]: Buffer | string | (Buffer | string)[] | undefined
119}
120
121export interface ConsumerConfig {
122 groupId: string
123 partitionAssigners?: PartitionAssigner[]
124 metadataMaxAge?: number
125 sessionTimeout?: number
126 rebalanceTimeout?: number
127 heartbeatInterval?: number
128 maxBytesPerPartition?: number
129 minBytes?: number
130 maxBytes?: number
131 maxWaitTimeInMs?: number
132 retry?: RetryOptions & { restartOnFailure?: (err: Error) => Promise<boolean> }
133 allowAutoTopicCreation?: boolean
134 maxInFlightRequests?: number
135 readUncommitted?: boolean
136 rackId?: string
137}
138
139export type PartitionAssigner = (config: {
140 cluster: Cluster
141 groupId: string
142 logger: Logger
143}) => Assigner
144
145export interface CoordinatorMetadata {
146 errorCode: number
147 coordinator: {
148 nodeId: number
149 host: string
150 port: number
151 }
152}
153
154export type Cluster = {
155 getNodeIds(): number[]
156 metadata(): Promise<BrokerMetadata>
157 removeBroker(options: { host: string; port: number }): void
158 addMultipleTargetTopics(topics: string[]): Promise<void>
159 isConnected(): boolean
160 connect(): Promise<void>
161 disconnect(): Promise<void>
162 refreshMetadata(): Promise<void>
163 refreshMetadataIfNecessary(): Promise<void>
164 addTargetTopic(topic: string): Promise<void>
165 findBroker(node: { nodeId: string }): Promise<Broker>
166 findControllerBroker(): Promise<Broker>
167 findTopicPartitionMetadata(topic: string): PartitionMetadata[]
168 findLeaderForPartitions(topic: string, partitions: number[]): { [leader: string]: number[] }
169 findGroupCoordinator(group: { groupId: string }): Promise<Broker>
170 findGroupCoordinatorMetadata(group: { groupId: string }): Promise<CoordinatorMetadata>
171 defaultOffset(config: { fromBeginning: boolean }): number
172 fetchTopicsOffset(
173 topics: Array<
174 {
175 topic: string
176 partitions: Array<{ partition: number }>
177 } & XOR<{ fromBeginning: boolean }, { fromTimestamp: number }>
178 >
179 ): Promise<TopicOffsets[]>
180}
181
182export type Assignment = { [topic: string]: number[] }
183
184export type GroupMember = { memberId: string; memberMetadata: Buffer }
185
186export type GroupMemberAssignment = { memberId: string; memberAssignment: Buffer }
187
188export type GroupState = { name: string; metadata: Buffer }
189
190export type Assigner = {
191 name: string
192 version: number
193 assign(group: { members: GroupMember[]; topics: string[] }): Promise<GroupMemberAssignment[]>
194 protocol(subscription: { topics: string[] }): GroupState
195}
196
197export interface RetryOptions {
198 maxRetryTime?: number
199 initialRetryTime?: number
200 factor?: number
201 multiplier?: number
202 retries?: number
203 restartOnFailure?: (e: Error) => Promise<boolean>
204}
205
206export interface AdminConfig {
207 retry?: RetryOptions
208}
209
210export interface ITopicConfig {
211 topic: string
212 numPartitions?: number
213 replicationFactor?: number
214 replicaAssignment?: object[]
215 configEntries?: IResourceConfigEntry[]
216}
217
218export interface ITopicPartitionConfig {
219 topic: string
220 count: number
221 assignments?: Array<Array<number>>
222}
223
224export interface ITopicMetadata {
225 name: string
226 partitions: PartitionMetadata[]
227}
228
229export enum AclResourceTypes {
230 UNKNOWN = 0,
231 ANY = 1,
232 TOPIC = 2,
233 GROUP = 3,
234 CLUSTER = 4,
235 TRANSACTIONAL_ID = 5,
236 DELEGATION_TOKEN = 6,
237}
238
239export enum ConfigResourceTypes {
240 UNKNOWN = 0,
241 TOPIC = 2,
242 BROKER = 4,
243 BROKER_LOGGER = 8,
244}
245
246export enum ConfigSource {
247 UNKNOWN = 0,
248 TOPIC_CONFIG = 1,
249 DYNAMIC_BROKER_CONFIG = 2,
250 DYNAMIC_DEFAULT_BROKER_CONFIG = 3,
251 STATIC_BROKER_CONFIG = 4,
252 DEFAULT_CONFIG = 5,
253 DYNAMIC_BROKER_LOGGER_CONFIG = 6,
254}
255
256export enum AclPermissionTypes {
257 UNKNOWN = 0,
258 ANY = 1,
259 DENY = 2,
260 ALLOW = 3,
261}
262
263export enum AclOperationTypes {
264 UNKNOWN = 0,
265 ANY = 1,
266 ALL = 2,
267 READ = 3,
268 WRITE = 4,
269 CREATE = 5,
270 DELETE = 6,
271 ALTER = 7,
272 DESCRIBE = 8,
273 CLUSTER_ACTION = 9,
274 DESCRIBE_CONFIGS = 10,
275 ALTER_CONFIGS = 11,
276 IDEMPOTENT_WRITE = 12,
277}
278
279export enum ResourcePatternTypes {
280 UNKNOWN = 0,
281 ANY = 1,
282 MATCH = 2,
283 LITERAL = 3,
284 PREFIXED = 4,
285}
286
287export interface ResourceConfigQuery {
288 type: ConfigResourceTypes
289 name: string
290 configNames?: string[]
291}
292
293export interface ConfigEntries {
294 configName: string
295 configValue: string
296 isDefault: boolean
297 configSource: ConfigSource
298 isSensitive: boolean
299 readOnly: boolean
300 configSynonyms: ConfigSynonyms[]
301}
302
303export interface ConfigSynonyms {
304 configName: string
305 configValue: string
306 configSource: ConfigSource
307}
308
309export interface DescribeConfigResponse {
310 resources: {
311 configEntries: ConfigEntries[]
312 errorCode: number
313 errorMessage: string
314 resourceName: string
315 resourceType: ConfigResourceTypes
316 }[]
317 throttleTime: number
318}
319
320export interface IResourceConfigEntry {
321 name: string
322 value: string
323}
324
325export interface IResourceConfig {
326 type: ConfigResourceTypes
327 name: string
328 configEntries: IResourceConfigEntry[]
329}
330
331type ValueOf<T> = T[keyof T]
332
333export type AdminEvents = {
334 CONNECT: 'admin.connect'
335 DISCONNECT: 'admin.disconnect'
336 REQUEST: 'admin.network.request'
337 REQUEST_TIMEOUT: 'admin.network.request_timeout'
338 REQUEST_QUEUE_SIZE: 'admin.network.request_queue_size'
339}
340
341export interface InstrumentationEvent<T> {
342 id: string
343 type: string
344 timestamp: number
345 payload: T
346}
347
348export type RemoveInstrumentationEventListener<T> = () => void
349
350export type ConnectEvent = InstrumentationEvent<null>
351export type DisconnectEvent = InstrumentationEvent<null>
352export type RequestEvent = InstrumentationEvent<{
353 apiKey: number
354 apiName: string
355 apiVersion: number
356 broker: string
357 clientId: string
358 correlationId: number
359 createdAt: number
360 duration: number
361 pendingDuration: number
362 sentAt: number
363 size: number
364}>
365export type RequestTimeoutEvent = InstrumentationEvent<{
366 apiKey: number
367 apiName: string
368 apiVersion: number
369 broker: string
370 clientId: string
371 correlationId: number
372 createdAt: number
373 pendingDuration: number
374 sentAt: number
375}>
376export type RequestQueueSizeEvent = InstrumentationEvent<{
377 broker: string
378 clientId: string
379 queueSize: number
380}>
381
382export type SeekEntry = PartitionOffset
383
384export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null }
385export interface Acl {
386 principal: string
387 host: string
388 operation: AclOperationTypes
389 permissionType: AclPermissionTypes
390}
391
392export interface AclResource {
393 resourceType: AclResourceTypes
394 resourceName: string
395 resourcePatternType: ResourcePatternTypes
396}
397
398export type AclEntry = Acl & AclResource
399
400export type DescribeAclResource = AclResource & {
401 acls: Acl[]
402}
403
404export interface DescribeAclResponse {
405 throttleTime: number
406 errorCode: number
407 errorMessage?: string
408 resources: DescribeAclResource[]
409}
410
411export interface AclFilter {
412 resourceType: AclResourceTypes
413 resourceName?: string
414 resourcePatternType: ResourcePatternTypes
415 principal?: string
416 host?: string
417 operation: AclOperationTypes
418 permissionType: AclPermissionTypes
419}
420
421export interface MatchingAcl {
422 errorCode: number
423 errorMessage?: string
424 resourceType: AclResourceTypes
425 resourceName: string
426 resourcePatternType: ResourcePatternTypes
427 principal: string
428 host: string
429 operation: AclOperationTypes
430 permissionType: AclPermissionTypes
431}
432
433export interface DeleteAclFilterResponses {
434 errorCode: number
435 errorMessage?: string
436 matchingAcls: MatchingAcl[]
437}
438
439export interface DeleteAclResponse {
440 throttleTime: number
441 filterResponses: DeleteAclFilterResponses[]
442}
443
444export type Admin = {
445 connect(): Promise<void>
446 disconnect(): Promise<void>
447 listTopics(): Promise<string[]>
448 createTopics(options: {
449 validateOnly?: boolean
450 waitForLeaders?: boolean
451 timeout?: number
452 topics: ITopicConfig[]
453 }): Promise<boolean>
454 deleteTopics(options: { topics: string[]; timeout?: number }): Promise<void>
455 createPartitions(options: {
456 validateOnly?: boolean
457 timeout?: number
458 topicPartitions: ITopicPartitionConfig[]
459 }): Promise<boolean>
460 fetchTopicMetadata(options?: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }>
461 fetchOffsets(options: {
462 groupId: string
463 topics?: string[]
464 resolveOffsets?: boolean
465 }): Promise<Array<{ topic: string; partitions: FetchOffsetsPartition[] }>>
466 fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>>
467 fetchTopicOffsetsByTimestamp(topic: string, timestamp?: number): Promise<Array<SeekEntry>>
468 describeCluster(): Promise<{
469 brokers: Array<{ nodeId: number; host: string; port: number }>
470 controller: number | null
471 clusterId: string
472 }>
473 setOffsets(options: { groupId: string; topic: string; partitions: SeekEntry[] }): Promise<void>
474 resetOffsets(options: { groupId: string; topic: string; earliest: boolean }): Promise<void>
475 describeConfigs(configs: {
476 resources: ResourceConfigQuery[]
477 includeSynonyms: boolean
478 }): Promise<DescribeConfigResponse>
479 alterConfigs(configs: { validateOnly: boolean; resources: IResourceConfig[] }): Promise<any>
480 listGroups(): Promise<{ groups: GroupOverview[] }>
481 deleteGroups(groupIds: string[]): Promise<DeleteGroupsResult[]>
482 describeGroups(groupIds: string[]): Promise<GroupDescriptions>
483 describeAcls(options: AclFilter): Promise<DescribeAclResponse>
484 deleteAcls(options: { filters: AclFilter[] }): Promise<DeleteAclResponse>
485 createAcls(options: { acl: AclEntry[] }): Promise<boolean>
486 deleteTopicRecords(options: { topic: string; partitions: SeekEntry[] }): Promise<void>
487 logger(): Logger
488 on(
489 eventName: AdminEvents['CONNECT'],
490 listener: (event: ConnectEvent) => void
491 ): RemoveInstrumentationEventListener<typeof eventName>
492 on(
493 eventName: AdminEvents['DISCONNECT'],
494 listener: (event: DisconnectEvent) => void
495 ): RemoveInstrumentationEventListener<typeof eventName>
496 on(
497 eventName: AdminEvents['REQUEST'],
498 listener: (event: RequestEvent) => void
499 ): RemoveInstrumentationEventListener<typeof eventName>
500 on(
501 eventName: AdminEvents['REQUEST_QUEUE_SIZE'],
502 listener: (event: RequestQueueSizeEvent) => void
503 ): RemoveInstrumentationEventListener<typeof eventName>
504 on(
505 eventName: AdminEvents['REQUEST_TIMEOUT'],
506 listener: (event: RequestTimeoutEvent) => void
507 ): RemoveInstrumentationEventListener<typeof eventName>
508 on(
509 eventName: ValueOf<AdminEvents>,
510 listener: (event: InstrumentationEvent<any>) => void
511 ): RemoveInstrumentationEventListener<typeof eventName>
512 readonly events: AdminEvents
513}
514
515export const PartitionAssigners: { roundRobin: PartitionAssigner }
516
517export interface ISerializer<T> {
518 encode(value: T): Buffer
519 decode(buffer: Buffer): T | null
520}
521
522export type MemberMetadata = {
523 version: number
524 topics: string[]
525 userData: Buffer
526}
527
528export type MemberAssignment = {
529 version: number
530 assignment: Assignment
531 userData: Buffer
532}
533
534export const AssignerProtocol: {
535 MemberMetadata: ISerializer<MemberMetadata>
536 MemberAssignment: ISerializer<MemberAssignment>
537}
538
539export enum logLevel {
540 NOTHING = 0,
541 ERROR = 1,
542 WARN = 2,
543 INFO = 4,
544 DEBUG = 5,
545}
546
547export interface LogEntry {
548 namespace: string
549 level: logLevel
550 label: string
551 log: LoggerEntryContent
552}
553
554export interface LoggerEntryContent {
555 readonly timestamp: string
556 readonly message: string
557 [key: string]: any
558}
559
560export type logCreator = (logLevel: logLevel) => (entry: LogEntry) => void
561
562export type Logger = {
563 info: (message: string, extra?: object) => void
564 error: (message: string, extra?: object) => void
565 warn: (message: string, extra?: object) => void
566 debug: (message: string, extra?: object) => void
567
568 namespace: (namespace: string, logLevel?: logLevel) => Logger
569 setLogLevel: (logLevel: logLevel) => void
570}
571
572export interface BrokerMetadata {
573 brokers: Array<{ nodeId: number; host: string; port: number; rack?: string }>
574 topicMetadata: Array<{
575 topicErrorCode: number
576 topic: string
577 partitionMetadata: PartitionMetadata[]
578 }>
579}
580
581export interface ApiVersions {
582 [apiKey: number]: {
583 minVersion: number
584 maxVersion: number
585 }
586}
587
588export type Broker = {
589 isConnected(): boolean
590 connect(): Promise<void>
591 disconnect(): Promise<void>
592 apiVersions(): Promise<ApiVersions>
593 metadata(topics: string[]): Promise<BrokerMetadata>
594 describeGroups: (options: { groupIds: string[] }) => Promise<any>
595 offsetCommit(request: {
596 groupId: string
597 groupGenerationId: number
598 memberId: string
599 retentionTime?: number
600 topics: TopicOffsets[]
601 }): Promise<any>
602 offsetFetch(request: {
603 groupId: string
604 topics: TopicOffsets[]
605 }): Promise<{
606 responses: TopicOffsets[]
607 }>
608 fetch(request: {
609 replicaId?: number
610 isolationLevel?: number
611 maxWaitTime?: number
612 minBytes?: number
613 maxBytes?: number
614 topics: Array<{
615 topic: string
616 partitions: Array<{ partition: number; fetchOffset: string; maxBytes: number }>
617 }>
618 rackId?: string
619 }): Promise<any>
620 produce(request: {
621 topicData: Array<{
622 topic: string
623 partitions: Array<{ partition: number; firstSequence?: number; messages: Message[] }>
624 }>
625 transactionalId?: string
626 producerId?: number
627 producerEpoch?: number
628 acks?: number
629 timeout?: number
630 compression?: CompressionTypes
631 }): Promise<any>
632}
633
634export type KafkaMessage = {
635 key: Buffer | null
636 value: Buffer | null
637 timestamp: string
638 size: number
639 attributes: number
640 offset: string
641 headers?: IHeaders
642}
643
644export interface ProducerRecord {
645 topic: string
646 messages: Message[]
647 acks?: number
648 timeout?: number
649 compression?: CompressionTypes
650}
651
652export type RecordMetadata = {
653 topicName: string
654 partition: number
655 errorCode: number
656 offset?: string
657 timestamp?: string
658 baseOffset?: string
659 logAppendTime?: string
660 logStartOffset?: string
661}
662
663export interface TopicMessages {
664 topic: string
665 messages: Message[]
666}
667
668export interface ProducerBatch {
669 acks?: number
670 timeout?: number
671 compression?: CompressionTypes
672 topicMessages?: TopicMessages[]
673}
674
675export interface PartitionOffset {
676 partition: number
677 offset: string
678}
679
680export interface TopicOffsets {
681 topic: string
682 partitions: PartitionOffset[]
683}
684
685export interface Offsets {
686 topics: TopicOffsets[]
687}
688
689type Sender = {
690 send(record: ProducerRecord): Promise<RecordMetadata[]>
691 sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]>
692}
693
694export type ProducerEvents = {
695 CONNECT: 'producer.connect'
696 DISCONNECT: 'producer.disconnect'
697 REQUEST: 'producer.network.request'
698 REQUEST_TIMEOUT: 'producer.network.request_timeout'
699 REQUEST_QUEUE_SIZE: 'producer.network.request_queue_size'
700}
701
702export type Producer = Sender & {
703 connect(): Promise<void>
704 disconnect(): Promise<void>
705 isIdempotent(): boolean
706 readonly events: ProducerEvents
707 on(
708 eventName: ProducerEvents['CONNECT'],
709 listener: (event: ConnectEvent) => void
710 ): RemoveInstrumentationEventListener<typeof eventName>
711 on(
712 eventName: ProducerEvents['DISCONNECT'],
713 listener: (event: DisconnectEvent) => void
714 ): RemoveInstrumentationEventListener<typeof eventName>
715 on(
716 eventName: ProducerEvents['REQUEST'],
717 listener: (event: RequestEvent) => void
718 ): RemoveInstrumentationEventListener<typeof eventName>
719 on(
720 eventName: ProducerEvents['REQUEST_QUEUE_SIZE'],
721 listener: (event: RequestQueueSizeEvent) => void
722 ): RemoveInstrumentationEventListener<typeof eventName>
723 on(
724 eventName: ProducerEvents['REQUEST_TIMEOUT'],
725 listener: (event: RequestTimeoutEvent) => void
726 ): RemoveInstrumentationEventListener<typeof eventName>
727 on(
728 eventName: ValueOf<ProducerEvents>,
729 listener: (event: InstrumentationEvent<any>) => void
730 ): RemoveInstrumentationEventListener<typeof eventName>
731 transaction(): Promise<Transaction>
732 logger(): Logger
733}
734
735export type Transaction = Sender & {
736 sendOffsets(offsets: Offsets & { consumerGroupId: string }): Promise<void>
737 commit(): Promise<void>
738 abort(): Promise<void>
739 isActive(): boolean
740}
741
742export type ConsumerGroup = {
743 groupId: string
744 generationId: number
745 memberId: string
746 coordinator: Broker
747}
748
749export type MemberDescription = {
750 clientHost: string
751 clientId: string
752 memberId: string
753 memberAssignment: Buffer
754 memberMetadata: Buffer
755}
756
757// See https://github.com/apache/kafka/blob/2.4.0/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java#L25
758export type ConsumerGroupState =
759 | 'Unknown'
760 | 'PreparingRebalance'
761 | 'CompletingRebalance'
762 | 'Stable'
763 | 'Dead'
764 | 'Empty'
765
766export type GroupDescription = {
767 groupId: string
768 members: MemberDescription[]
769 protocol: string
770 protocolType: string
771 state: ConsumerGroupState
772}
773
774export type GroupDescriptions = {
775 groups: GroupDescription[]
776}
777
778export type TopicPartitions = { topic: string; partitions: number[] }
779
780export type TopicPartition = {
781 topic: string
782 partition: number
783}
784export type TopicPartitionOffset = TopicPartition & {
785 offset: string
786}
787export type TopicPartitionOffsetAndMetadata = TopicPartitionOffset & {
788 metadata?: string | null
789}
790
791export type Batch = {
792 topic: string
793 partition: number
794 highWatermark: string
795 messages: KafkaMessage[]
796 isEmpty(): boolean
797 firstOffset(): string | null
798 lastOffset(): string
799 offsetLag(): string
800 offsetLagLow(): string
801}
802
803export type GroupOverview = {
804 groupId: string
805 protocolType: string
806}
807
808export type DeleteGroupsResult = {
809 groupId: string
810 errorCode?: number
811 error?: KafkaJSProtocolError
812}
813
814export type ConsumerEvents = {
815 HEARTBEAT: 'consumer.heartbeat'
816 COMMIT_OFFSETS: 'consumer.commit_offsets'
817 GROUP_JOIN: 'consumer.group_join'
818 FETCH_START: 'consumer.fetch_start'
819 FETCH: 'consumer.fetch'
820 START_BATCH_PROCESS: 'consumer.start_batch_process'
821 END_BATCH_PROCESS: 'consumer.end_batch_process'
822 CONNECT: 'consumer.connect'
823 DISCONNECT: 'consumer.disconnect'
824 STOP: 'consumer.stop'
825 CRASH: 'consumer.crash'
826 REBALANCING: 'consumer.rebalancing'
827 RECEIVED_UNSUBSCRIBED_TOPICS: 'consumer.received_unsubscribed_topics'
828 REQUEST: 'consumer.network.request'
829 REQUEST_TIMEOUT: 'consumer.network.request_timeout'
830 REQUEST_QUEUE_SIZE: 'consumer.network.request_queue_size'
831}
832export type ConsumerHeartbeatEvent = InstrumentationEvent<{
833 groupId: string
834 memberId: string
835 groupGenerationId: number
836}>
837export type ConsumerCommitOffsetsEvent = InstrumentationEvent<{
838 groupId: string
839 memberId: string
840 groupGenerationId: number
841 topics: TopicOffsets[]
842}>
843export interface IMemberAssignment {
844 [key: string]: number[]
845}
846export type ConsumerGroupJoinEvent = InstrumentationEvent<{
847 duration: number
848 groupId: string
849 isLeader: boolean
850 leaderId: string
851 groupProtocol: string
852 memberId: string
853 memberAssignment: IMemberAssignment
854}>
855export type ConsumerFetchStartEvent = InstrumentationEvent<{ nodeId: number }>
856export type ConsumerFetchEvent = InstrumentationEvent<{
857 numberOfBatches: number
858 duration: number
859 nodeId: number
860}>
861interface IBatchProcessEvent {
862 topic: string
863 partition: number
864 highWatermark: string
865 offsetLag: string
866 offsetLagLow: string
867 batchSize: number
868 firstOffset: string
869 lastOffset: string
870}
871export type ConsumerStartBatchProcessEvent = InstrumentationEvent<IBatchProcessEvent>
872export type ConsumerEndBatchProcessEvent = InstrumentationEvent<
873 IBatchProcessEvent & { duration: number }
874>
875export type ConsumerCrashEvent = InstrumentationEvent<{
876 error: Error
877 groupId: string
878 restart: boolean
879}>
880export type ConsumerRebalancingEvent = InstrumentationEvent<{
881 groupId: string
882 memberId: string
883}>
884export type ConsumerReceivedUnsubcribedTopicsEvent = InstrumentationEvent<{
885 groupId: string
886 generationId: number
887 memberId: string
888 assignedTopics: string[]
889 topicsSubscribed: string[]
890 topicsNotSubscribed: string[]
891}>
892
893export interface OffsetsByTopicPartition {
894 topics: TopicOffsets[]
895}
896
897export interface EachMessagePayload {
898 topic: string
899 partition: number
900 message: KafkaMessage
901 heartbeat(): Promise<void>
902}
903
904export interface EachBatchPayload {
905 batch: Batch
906 resolveOffset(offset: string): void
907 heartbeat(): Promise<void>
908 commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
909 uncommittedOffsets(): OffsetsByTopicPartition
910 isRunning(): boolean
911 isStale(): boolean
912}
913
914/**
915 * Type alias to keep compatibility with @types/kafkajs
916 * @see https://github.com/DefinitelyTyped/DefinitelyTyped/blob/712ad9d59ccca6a3cc92f347fea0d1c7b02f5eeb/types/kafkajs/index.d.ts#L321-L325
917 */
918export type ConsumerEachMessagePayload = EachMessagePayload
919
920/**
921 * Type alias to keep compatibility with @types/kafkajs
922 * @see https://github.com/DefinitelyTyped/DefinitelyTyped/blob/712ad9d59ccca6a3cc92f347fea0d1c7b02f5eeb/types/kafkajs/index.d.ts#L327-L336
923 */
924export type ConsumerEachBatchPayload = EachBatchPayload
925
926export type EachBatchHandler = (payload: EachBatchPayload) => Promise<void>
927export type EachMessageHandler = (payload: EachMessagePayload) => Promise<void>
928
929export type ConsumerRunConfig = {
930 autoCommit?: boolean
931 autoCommitInterval?: number | null
932 autoCommitThreshold?: number | null
933 eachBatchAutoResolve?: boolean
934 partitionsConsumedConcurrently?: number
935 eachBatch?: EachBatchHandler
936 eachMessage?: EachMessageHandler
937}
938
939/**
940 * @deprecated Replaced by ConsumerSubscribeTopics
941 */
942export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean }
943export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; fromBeginning?: boolean }
944
945export type Consumer = {
946 connect(): Promise<void>
947 disconnect(): Promise<void>
948 subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
949 stop(): Promise<void>
950 run(config?: ConsumerRunConfig): Promise<void>
951 commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
952 seek(topicPartitionOffset: TopicPartitionOffset): void
953 describeGroup(): Promise<GroupDescription>
954 pause(topics: Array<{ topic: string; partitions?: number[] }>): void
955 paused(): TopicPartitions[]
956 resume(topics: Array<{ topic: string; partitions?: number[] }>): void
957 on(
958 eventName: ConsumerEvents['HEARTBEAT'],
959 listener: (event: ConsumerHeartbeatEvent) => void
960 ): RemoveInstrumentationEventListener<typeof eventName>
961 on(
962 eventName: ConsumerEvents['COMMIT_OFFSETS'],
963 listener: (event: ConsumerCommitOffsetsEvent) => void
964 ): RemoveInstrumentationEventListener<typeof eventName>
965 on(
966 eventName: ConsumerEvents['GROUP_JOIN'],
967 listener: (event: ConsumerGroupJoinEvent) => void
968 ): RemoveInstrumentationEventListener<typeof eventName>
969 on(
970 eventName: ConsumerEvents['FETCH_START'],
971 listener: (event: ConsumerFetchStartEvent) => void
972 ): RemoveInstrumentationEventListener<typeof eventName>
973 on(
974 eventName: ConsumerEvents['FETCH'],
975 listener: (event: ConsumerFetchEvent) => void
976 ): RemoveInstrumentationEventListener<typeof eventName>
977 on(
978 eventName: ConsumerEvents['START_BATCH_PROCESS'],
979 listener: (event: ConsumerStartBatchProcessEvent) => void
980 ): RemoveInstrumentationEventListener<typeof eventName>
981 on(
982 eventName: ConsumerEvents['END_BATCH_PROCESS'],
983 listener: (event: ConsumerEndBatchProcessEvent) => void
984 ): RemoveInstrumentationEventListener<typeof eventName>
985 on(
986 eventName: ConsumerEvents['CONNECT'],
987 listener: (event: ConnectEvent) => void
988 ): RemoveInstrumentationEventListener<typeof eventName>
989 on(
990 eventName: ConsumerEvents['DISCONNECT'],
991 listener: (event: DisconnectEvent) => void
992 ): RemoveInstrumentationEventListener<typeof eventName>
993 on(
994 eventName: ConsumerEvents['STOP'],
995 listener: (event: InstrumentationEvent<null>) => void
996 ): RemoveInstrumentationEventListener<typeof eventName>
997 on(
998 eventName: ConsumerEvents['CRASH'],
999 listener: (event: ConsumerCrashEvent) => void
1000 ): RemoveInstrumentationEventListener<typeof eventName>
1001 on(
1002 eventName: ConsumerEvents['REBALANCING'],
1003 listener: (event: ConsumerRebalancingEvent) => void
1004 ): RemoveInstrumentationEventListener<typeof eventName>
1005 on(
1006 eventName: ConsumerEvents['RECEIVED_UNSUBSCRIBED_TOPICS'],
1007 listener: (event: ConsumerReceivedUnsubcribedTopicsEvent) => void
1008 ): RemoveInstrumentationEventListener<typeof eventName>
1009 on(
1010 eventName: ConsumerEvents['REQUEST'],
1011 listener: (event: RequestEvent) => void
1012 ): RemoveInstrumentationEventListener<typeof eventName>
1013 on(
1014 eventName: ConsumerEvents['REQUEST_TIMEOUT'],
1015 listener: (event: RequestTimeoutEvent) => void
1016 ): RemoveInstrumentationEventListener<typeof eventName>
1017 on(
1018 eventName: ConsumerEvents['REQUEST_QUEUE_SIZE'],
1019 listener: (event: RequestQueueSizeEvent) => void
1020 ): RemoveInstrumentationEventListener<typeof eventName>
1021 on(
1022 eventName: ValueOf<ConsumerEvents>,
1023 listener: (event: InstrumentationEvent<any>) => void
1024 ): RemoveInstrumentationEventListener<typeof eventName>
1025 logger(): Logger
1026 readonly events: ConsumerEvents
1027}
1028
1029export enum CompressionTypes {
1030 None = 0,
1031 GZIP = 1,
1032 Snappy = 2,
1033 LZ4 = 3,
1034 ZSTD = 4,
1035}
1036
1037export var CompressionCodecs: {
1038 [CompressionTypes.GZIP]: () => any
1039 [CompressionTypes.Snappy]: () => any
1040 [CompressionTypes.LZ4]: () => any
1041 [CompressionTypes.ZSTD]: () => any
1042}
1043
1044export class KafkaJSError extends Error {
1045 readonly message: Error['message']
1046 readonly name: string
1047 readonly retriable: boolean
1048 readonly helpUrl?: string
1049 readonly cause?: Error
1050
1051 constructor(e: Error | string, metadata?: KafkaJSErrorMetadata)
1052}
1053
1054export class KafkaJSNonRetriableError extends KafkaJSError {
1055 constructor(e: Error | string)
1056}
1057
1058export class KafkaJSProtocolError extends KafkaJSError {
1059 readonly code: number
1060 readonly type: string
1061 constructor(e: Error | string)
1062}
1063
1064export class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
1065 readonly topic: string
1066 readonly partition: number
1067 constructor(e: Error | string, metadata?: KafkaJSOffsetOutOfRangeMetadata)
1068}
1069
1070export class KafkaJSNumberOfRetriesExceeded extends KafkaJSNonRetriableError {
1071 readonly stack: string
1072 readonly retryCount: number
1073 readonly retryTime: number
1074 constructor(e: Error | string, metadata?: KafkaJSNumberOfRetriesExceededMetadata)
1075}
1076
1077export class KafkaJSConnectionError extends KafkaJSError {
1078 readonly broker: string
1079 constructor(e: Error | string, metadata?: KafkaJSConnectionErrorMetadata)
1080}
1081
1082export class KafkaJSRequestTimeoutError extends KafkaJSError {
1083 readonly broker: string
1084 readonly correlationId: number
1085 readonly createdAt: number
1086 readonly sentAt: number
1087 readonly pendingDuration: number
1088 constructor(e: Error | string, metadata?: KafkaJSRequestTimeoutErrorMetadata)
1089}
1090
1091export class KafkaJSMetadataNotLoaded extends KafkaJSError {
1092 constructor()
1093}
1094
1095export class KafkaJSTopicMetadataNotLoaded extends KafkaJSMetadataNotLoaded {
1096 readonly topic: string
1097 constructor(e: Error | string, metadata?: KafkaJSTopicMetadataNotLoadedMetadata)
1098}
1099
1100export class KafkaJSStaleTopicMetadataAssignment extends KafkaJSError {
1101 readonly topic: string
1102 readonly unknownPartitions: number
1103 constructor(e: Error | string, metadata?: KafkaJSStaleTopicMetadataAssignmentMetadata)
1104}
1105
1106export class KafkaJSServerDoesNotSupportApiKey extends KafkaJSNonRetriableError {
1107 readonly apiKey: number
1108 readonly apiName: string
1109 constructor(e: Error | string, metadata?: KafkaJSServerDoesNotSupportApiKeyMetadata)
1110}
1111
1112export class KafkaJSBrokerNotFound extends KafkaJSError {
1113 constructor()
1114}
1115
1116export class KafkaJSPartialMessageError extends KafkaJSError {
1117 constructor()
1118}
1119
1120export class KafkaJSSASLAuthenticationError extends KafkaJSError {
1121 constructor()
1122}
1123
1124export class KafkaJSGroupCoordinatorNotFound extends KafkaJSError {
1125 constructor()
1126}
1127
1128export class KafkaJSNotImplemented extends KafkaJSError {
1129 constructor()
1130}
1131
1132export class KafkaJSTimeout extends KafkaJSError {
1133 constructor()
1134}
1135
1136export class KafkaJSLockTimeout extends KafkaJSError {
1137 constructor()
1138}
1139
1140export class KafkaJSUnsupportedMagicByteInMessageSet extends KafkaJSError {
1141 constructor()
1142}
1143
1144export class KafkaJSDeleteGroupsError extends KafkaJSError {
1145 readonly groups: DeleteGroupsResult[]
1146 constructor(e: Error | string, groups?: KafkaJSDeleteGroupsErrorGroups[])
1147}
1148
1149export class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
1150 constructor(metadata: KafkaJSDeleteTopicRecordsErrorTopic)
1151}
1152
1153export interface KafkaJSDeleteGroupsErrorGroups {
1154 groupId: string
1155 errorCode: number
1156 error: KafkaJSError
1157}
1158
1159export interface KafkaJSDeleteTopicRecordsErrorTopic {
1160 topic: string
1161 partitions: KafkaJSDeleteTopicRecordsErrorPartition[]
1162}
1163
1164export interface KafkaJSDeleteTopicRecordsErrorPartition {
1165 partition: number
1166 offset: string
1167 error: KafkaJSError
1168}
1169
1170export interface KafkaJSErrorMetadata {
1171 retriable?: boolean
1172 topic?: string
1173 partitionId?: number
1174 metadata?: PartitionMetadata
1175}
1176
1177export interface KafkaJSOffsetOutOfRangeMetadata {
1178 topic: string
1179 partition: number
1180}
1181
1182export interface KafkaJSNumberOfRetriesExceededMetadata {
1183 retryCount: number
1184 retryTime: number
1185}
1186
1187export interface KafkaJSConnectionErrorMetadata {
1188 broker?: string
1189 code?: string
1190}
1191
1192export interface KafkaJSRequestTimeoutErrorMetadata {
1193 broker: string
1194 clientId: string
1195 correlationId: number
1196 createdAt: number
1197 sentAt: number
1198 pendingDuration: number
1199}
1200
1201export interface KafkaJSTopicMetadataNotLoadedMetadata {
1202 topic: string
1203}
1204
1205export interface KafkaJSStaleTopicMetadataAssignmentMetadata {
1206 topic: string
1207 unknownPartitions: PartitionMetadata[]
1208}
1209
1210export interface KafkaJSServerDoesNotSupportApiKeyMetadata {
1211 apiKey: number
1212 apiName: string
1213}