UNPKG

5.92 kBJavaScriptView Raw
1const {
2 createLogger,
3 LEVELS: { INFO },
4} = require('./loggers')
5
6const InstrumentationEventEmitter = require('./instrumentation/emitter')
7const LoggerConsole = require('./loggers/console')
8const Cluster = require('./cluster')
9const createProducer = require('./producer')
10const createConsumer = require('./consumer')
11const createAdmin = require('./admin')
12const ISOLATION_LEVEL = require('./protocol/isolationLevel')
13const defaultSocketFactory = require('./network/socketFactory')
14const once = require('./utils/once')
15const websiteUrl = require('./utils/websiteUrl')
16
17const PRIVATE = {
18 CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
19 CLUSTER_RETRY: Symbol('private:Kafka:clusterRetry'),
20 LOGGER: Symbol('private:Kafka:logger'),
21 OFFSETS: Symbol('private:Kafka:offsets'),
22}
23
24const DEFAULT_METADATA_MAX_AGE = 300000
25const warnOfDefaultPartitioner = once(logger => {
26 if (process.env.KAFKAJS_NO_PARTITIONER_WARNING == null) {
27 logger.warn(
28 `KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at ${websiteUrl(
29 'docs/migration-guide-v2.0.0',
30 'producer-new-default-partitioner'
31 )} for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"`
32 )
33 }
34})
35
36module.exports = class Client {
37 /**
38 * @param {Object} options
39 * @param {Array<string>} options.brokers example: ['127.0.0.1:9092', '127.0.0.1:9094']
40 * @param {Object} options.ssl
41 * @param {Object} options.sasl
42 * @param {string} options.clientId
43 * @param {number} [options.connectionTimeout=1000] - in milliseconds
44 * @param {number} options.authenticationTimeout - in milliseconds
45 * @param {number} options.reauthenticationThreshold - in milliseconds
46 * @param {number} [options.requestTimeout=30000] - in milliseconds
47 * @param {boolean} [options.enforceRequestTimeout]
48 * @param {import("../types").RetryOptions} [options.retry]
49 * @param {import("../types").ISocketFactory} [options.socketFactory]
50 */
51 constructor({
52 brokers,
53 ssl,
54 sasl,
55 clientId,
56 connectionTimeout = 1000,
57 authenticationTimeout,
58 reauthenticationThreshold,
59 requestTimeout,
60 enforceRequestTimeout = true,
61 retry,
62 socketFactory = defaultSocketFactory(),
63 logLevel = INFO,
64 logCreator = LoggerConsole,
65 }) {
66 this[PRIVATE.OFFSETS] = new Map()
67 this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator })
68 this[PRIVATE.CLUSTER_RETRY] = retry
69 this[PRIVATE.CREATE_CLUSTER] = ({
70 metadataMaxAge,
71 allowAutoTopicCreation = true,
72 maxInFlightRequests = null,
73 instrumentationEmitter = null,
74 isolationLevel,
75 }) =>
76 new Cluster({
77 logger: this[PRIVATE.LOGGER],
78 retry: this[PRIVATE.CLUSTER_RETRY],
79 offsets: this[PRIVATE.OFFSETS],
80 socketFactory,
81 brokers,
82 ssl,
83 sasl,
84 clientId,
85 connectionTimeout,
86 authenticationTimeout,
87 reauthenticationThreshold,
88 requestTimeout,
89 enforceRequestTimeout,
90 metadataMaxAge,
91 instrumentationEmitter,
92 allowAutoTopicCreation,
93 maxInFlightRequests,
94 isolationLevel,
95 })
96 }
97
98 /**
99 * @public
100 */
101 producer({
102 createPartitioner,
103 retry,
104 metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
105 allowAutoTopicCreation,
106 idempotent,
107 transactionalId,
108 transactionTimeout,
109 maxInFlightRequests,
110 } = {}) {
111 const instrumentationEmitter = new InstrumentationEventEmitter()
112 const cluster = this[PRIVATE.CREATE_CLUSTER]({
113 metadataMaxAge,
114 allowAutoTopicCreation,
115 maxInFlightRequests,
116 instrumentationEmitter,
117 })
118
119 if (createPartitioner == null) {
120 warnOfDefaultPartitioner(this[PRIVATE.LOGGER])
121 }
122
123 return createProducer({
124 retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
125 logger: this[PRIVATE.LOGGER],
126 cluster,
127 createPartitioner,
128 idempotent,
129 transactionalId,
130 transactionTimeout,
131 instrumentationEmitter,
132 })
133 }
134
135 /**
136 * @public
137 */
138 consumer({
139 groupId,
140 partitionAssigners,
141 metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
142 sessionTimeout,
143 rebalanceTimeout,
144 heartbeatInterval,
145 maxBytesPerPartition,
146 minBytes,
147 maxBytes,
148 maxWaitTimeInMs,
149 retry = { retries: 5 },
150 allowAutoTopicCreation,
151 maxInFlightRequests,
152 readUncommitted = false,
153 rackId = '',
154 } = {}) {
155 const isolationLevel = readUncommitted
156 ? ISOLATION_LEVEL.READ_UNCOMMITTED
157 : ISOLATION_LEVEL.READ_COMMITTED
158
159 const instrumentationEmitter = new InstrumentationEventEmitter()
160 const cluster = this[PRIVATE.CREATE_CLUSTER]({
161 metadataMaxAge,
162 allowAutoTopicCreation,
163 maxInFlightRequests,
164 isolationLevel,
165 instrumentationEmitter,
166 })
167
168 return createConsumer({
169 retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
170 logger: this[PRIVATE.LOGGER],
171 cluster,
172 groupId,
173 partitionAssigners,
174 sessionTimeout,
175 rebalanceTimeout,
176 heartbeatInterval,
177 maxBytesPerPartition,
178 minBytes,
179 maxBytes,
180 maxWaitTimeInMs,
181 isolationLevel,
182 instrumentationEmitter,
183 rackId,
184 metadataMaxAge,
185 })
186 }
187
188 /**
189 * @public
190 */
191 admin({ retry } = {}) {
192 const instrumentationEmitter = new InstrumentationEventEmitter()
193 const cluster = this[PRIVATE.CREATE_CLUSTER]({
194 allowAutoTopicCreation: false,
195 instrumentationEmitter,
196 })
197
198 return createAdmin({
199 retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
200 logger: this[PRIVATE.LOGGER],
201 instrumentationEmitter,
202 cluster,
203 })
204 }
205
206 /**
207 * @public
208 */
209 logger() {
210 return this[PRIVATE.LOGGER]
211 }
212}