1 | const {
|
2 | createLogger,
|
3 | LEVELS: { INFO },
|
4 | } = require('./loggers')
|
5 |
|
6 | const InstrumentationEventEmitter = require('./instrumentation/emitter')
|
7 | const LoggerConsole = require('./loggers/console')
|
8 | const Cluster = require('./cluster')
|
9 | const createProducer = require('./producer')
|
10 | const createConsumer = require('./consumer')
|
11 | const createAdmin = require('./admin')
|
12 | const ISOLATION_LEVEL = require('./protocol/isolationLevel')
|
13 | const defaultSocketFactory = require('./network/socketFactory')
|
14 | const once = require('./utils/once')
|
15 | const websiteUrl = require('./utils/websiteUrl')
|
16 |
|
17 | const PRIVATE = {
|
18 | CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
|
19 | CLUSTER_RETRY: Symbol('private:Kafka:clusterRetry'),
|
20 | LOGGER: Symbol('private:Kafka:logger'),
|
21 | OFFSETS: Symbol('private:Kafka:offsets'),
|
22 | }
|
23 |
|
24 | const DEFAULT_METADATA_MAX_AGE = 300000
|
25 | const warnOfDefaultPartitioner = once(logger => {
|
26 | if (process.env.KAFKAJS_NO_PARTITIONER_WARNING == null) {
|
27 | logger.warn(
|
28 | `KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at ${websiteUrl(
|
29 | 'docs/migration-guide-v2.0.0',
|
30 | 'producer-new-default-partitioner'
|
31 | )} for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"`
|
32 | )
|
33 | }
|
34 | })
|
35 |
|
36 | module.exports = class Client {
|
37 | |
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 | constructor({
|
52 | brokers,
|
53 | ssl,
|
54 | sasl,
|
55 | clientId,
|
56 | connectionTimeout = 1000,
|
57 | authenticationTimeout,
|
58 | reauthenticationThreshold,
|
59 | requestTimeout,
|
60 | enforceRequestTimeout = true,
|
61 | retry,
|
62 | socketFactory = defaultSocketFactory(),
|
63 | logLevel = INFO,
|
64 | logCreator = LoggerConsole,
|
65 | }) {
|
66 | this[PRIVATE.OFFSETS] = new Map()
|
67 | this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator })
|
68 | this[PRIVATE.CLUSTER_RETRY] = retry
|
69 | this[PRIVATE.CREATE_CLUSTER] = ({
|
70 | metadataMaxAge,
|
71 | allowAutoTopicCreation = true,
|
72 | maxInFlightRequests = null,
|
73 | instrumentationEmitter = null,
|
74 | isolationLevel,
|
75 | }) =>
|
76 | new Cluster({
|
77 | logger: this[PRIVATE.LOGGER],
|
78 | retry: this[PRIVATE.CLUSTER_RETRY],
|
79 | offsets: this[PRIVATE.OFFSETS],
|
80 | socketFactory,
|
81 | brokers,
|
82 | ssl,
|
83 | sasl,
|
84 | clientId,
|
85 | connectionTimeout,
|
86 | authenticationTimeout,
|
87 | reauthenticationThreshold,
|
88 | requestTimeout,
|
89 | enforceRequestTimeout,
|
90 | metadataMaxAge,
|
91 | instrumentationEmitter,
|
92 | allowAutoTopicCreation,
|
93 | maxInFlightRequests,
|
94 | isolationLevel,
|
95 | })
|
96 | }
|
97 |
|
98 | |
99 |
|
100 |
|
101 | producer({
|
102 | createPartitioner,
|
103 | retry,
|
104 | metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
|
105 | allowAutoTopicCreation,
|
106 | idempotent,
|
107 | transactionalId,
|
108 | transactionTimeout,
|
109 | maxInFlightRequests,
|
110 | } = {}) {
|
111 | const instrumentationEmitter = new InstrumentationEventEmitter()
|
112 | const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
113 | metadataMaxAge,
|
114 | allowAutoTopicCreation,
|
115 | maxInFlightRequests,
|
116 | instrumentationEmitter,
|
117 | })
|
118 |
|
119 | if (createPartitioner == null) {
|
120 | warnOfDefaultPartitioner(this[PRIVATE.LOGGER])
|
121 | }
|
122 |
|
123 | return createProducer({
|
124 | retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
125 | logger: this[PRIVATE.LOGGER],
|
126 | cluster,
|
127 | createPartitioner,
|
128 | idempotent,
|
129 | transactionalId,
|
130 | transactionTimeout,
|
131 | instrumentationEmitter,
|
132 | })
|
133 | }
|
134 |
|
135 | |
136 |
|
137 |
|
138 | consumer({
|
139 | groupId,
|
140 | partitionAssigners,
|
141 | metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
|
142 | sessionTimeout,
|
143 | rebalanceTimeout,
|
144 | heartbeatInterval,
|
145 | maxBytesPerPartition,
|
146 | minBytes,
|
147 | maxBytes,
|
148 | maxWaitTimeInMs,
|
149 | retry = { retries: 5 },
|
150 | allowAutoTopicCreation,
|
151 | maxInFlightRequests,
|
152 | readUncommitted = false,
|
153 | rackId = '',
|
154 | } = {}) {
|
155 | const isolationLevel = readUncommitted
|
156 | ? ISOLATION_LEVEL.READ_UNCOMMITTED
|
157 | : ISOLATION_LEVEL.READ_COMMITTED
|
158 |
|
159 | const instrumentationEmitter = new InstrumentationEventEmitter()
|
160 | const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
161 | metadataMaxAge,
|
162 | allowAutoTopicCreation,
|
163 | maxInFlightRequests,
|
164 | isolationLevel,
|
165 | instrumentationEmitter,
|
166 | })
|
167 |
|
168 | return createConsumer({
|
169 | retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
170 | logger: this[PRIVATE.LOGGER],
|
171 | cluster,
|
172 | groupId,
|
173 | partitionAssigners,
|
174 | sessionTimeout,
|
175 | rebalanceTimeout,
|
176 | heartbeatInterval,
|
177 | maxBytesPerPartition,
|
178 | minBytes,
|
179 | maxBytes,
|
180 | maxWaitTimeInMs,
|
181 | isolationLevel,
|
182 | instrumentationEmitter,
|
183 | rackId,
|
184 | metadataMaxAge,
|
185 | })
|
186 | }
|
187 |
|
188 | |
189 |
|
190 |
|
191 | admin({ retry } = {}) {
|
192 | const instrumentationEmitter = new InstrumentationEventEmitter()
|
193 | const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
194 | allowAutoTopicCreation: false,
|
195 | instrumentationEmitter,
|
196 | })
|
197 |
|
198 | return createAdmin({
|
199 | retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
200 | logger: this[PRIVATE.LOGGER],
|
201 | instrumentationEmitter,
|
202 | cluster,
|
203 | })
|
204 | }
|
205 |
|
206 | |
207 |
|
208 |
|
209 | logger() {
|
210 | return this[PRIVATE.LOGGER]
|
211 | }
|
212 | }
|