1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ClientKafka = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
|
6 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
7 | const constants_1 = require("../constants");
|
8 | const kafka_response_deserializer_1 = require("../deserializers/kafka-response.deserializer");
|
9 | const enums_1 = require("../enums");
|
10 | const invalid_kafka_client_topic_exception_1 = require("../errors/invalid-kafka-client-topic.exception");
|
11 | const helpers_1 = require("../helpers");
|
12 | const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
|
13 | const client_proxy_1 = require("./client-proxy");
|
14 | let kafkaPackage = {};
|
15 |
|
16 |
|
17 |
|
18 | class ClientKafka extends client_proxy_1.ClientProxy {
|
19 | constructor(options) {
|
20 | super();
|
21 | this.options = options;
|
22 | this.logger = new logger_service_1.Logger(ClientKafka.name);
|
23 | this.client = null;
|
24 | this.consumer = null;
|
25 | this.producer = null;
|
26 | this.parser = null;
|
27 | this.initialized = null;
|
28 | this.responsePatterns = [];
|
29 | this.consumerAssignments = {};
|
30 | const clientOptions = this.getOptionsProp(this.options, 'client', {});
|
31 | const consumerOptions = this.getOptionsProp(this.options, 'consumer', {});
|
32 | const postfixId = this.getOptionsProp(this.options, 'postfixId', '-client');
|
33 | this.producerOnlyMode = this.getOptionsProp(this.options, 'producerOnlyMode', false);
|
34 | this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
|
35 |
|
36 |
|
37 | this.clientId =
|
38 | (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
|
39 | this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
|
40 | kafkaPackage = (0, load_package_util_1.loadPackage)('kafkajs', ClientKafka.name, () => require('kafkajs'));
|
41 | this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
|
42 | this.initializeSerializer(options);
|
43 | this.initializeDeserializer(options);
|
44 | }
|
45 | subscribeToResponseOf(pattern) {
|
46 | const request = this.normalizePattern(pattern);
|
47 | this.responsePatterns.push(this.getResponsePatternName(request));
|
48 | }
|
49 | async close() {
|
50 | this.producer && (await this.producer.disconnect());
|
51 | this.consumer && (await this.consumer.disconnect());
|
52 | this.producer = null;
|
53 | this.consumer = null;
|
54 | this.initialized = null;
|
55 | this.client = null;
|
56 | }
|
57 | async connect() {
|
58 | if (this.initialized) {
|
59 | return this.initialized.then(() => this.producer);
|
60 | }
|
61 | this.initialized = new Promise(async (resolve, reject) => {
|
62 | try {
|
63 | this.client = this.createClient();
|
64 | if (!this.producerOnlyMode) {
|
65 | const partitionAssigners = [
|
66 | (config) => new helpers_1.KafkaReplyPartitionAssigner(this, config),
|
67 | ];
|
68 | const consumerOptions = Object.assign({
|
69 | partitionAssigners,
|
70 | }, this.options.consumer || {}, {
|
71 | groupId: this.groupId,
|
72 | });
|
73 | this.consumer = this.client.consumer(consumerOptions);
|
74 |
|
75 | this.consumer.on(this.consumer.events.GROUP_JOIN, this.setConsumerAssignments.bind(this));
|
76 | await this.consumer.connect();
|
77 | await this.bindTopics();
|
78 | }
|
79 | this.producer = this.client.producer(this.options.producer || {});
|
80 | await this.producer.connect();
|
81 | resolve();
|
82 | }
|
83 | catch (err) {
|
84 | reject(err);
|
85 | }
|
86 | });
|
87 | return this.initialized.then(() => this.producer);
|
88 | }
|
89 | async bindTopics() {
|
90 | if (!this.consumer) {
|
91 | throw Error('No consumer initialized');
|
92 | }
|
93 | const consumerSubscribeOptions = this.options.subscribe || {};
|
94 | if (this.responsePatterns.length > 0) {
|
95 | await this.consumer.subscribe({
|
96 | ...consumerSubscribeOptions,
|
97 | topics: this.responsePatterns,
|
98 | });
|
99 | }
|
100 | await this.consumer.run(Object.assign(this.options.run || {}, {
|
101 | eachMessage: this.createResponseCallback(),
|
102 | }));
|
103 | }
|
104 | createClient() {
|
105 | const kafkaConfig = Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { brokers: this.brokers, clientId: this.clientId });
|
106 | return new kafkaPackage.Kafka(kafkaConfig);
|
107 | }
|
108 | createResponseCallback() {
|
109 | return async (payload) => {
|
110 | const rawMessage = this.parser.parse(Object.assign(payload.message, {
|
111 | topic: payload.topic,
|
112 | partition: payload.partition,
|
113 | }));
|
114 | if ((0, shared_utils_1.isUndefined)(rawMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID])) {
|
115 | return;
|
116 | }
|
117 | const { err, response, isDisposed, id } = await this.deserializer.deserialize(rawMessage);
|
118 | const callback = this.routingMap.get(id);
|
119 | if (!callback) {
|
120 | return;
|
121 | }
|
122 | if (err || isDisposed) {
|
123 | return callback({
|
124 | err,
|
125 | response,
|
126 | isDisposed,
|
127 | });
|
128 | }
|
129 | callback({
|
130 | err,
|
131 | response,
|
132 | });
|
133 | };
|
134 | }
|
135 | getConsumerAssignments() {
|
136 | return this.consumerAssignments;
|
137 | }
|
138 | async dispatchEvent(packet) {
|
139 | const pattern = this.normalizePattern(packet.pattern);
|
140 | const outgoingEvent = await this.serializer.serialize(packet.data, {
|
141 | pattern,
|
142 | });
|
143 | const message = Object.assign({
|
144 | topic: pattern,
|
145 | messages: [outgoingEvent],
|
146 | }, this.options.send || {});
|
147 | return this.producer.send(message);
|
148 | }
|
149 | getReplyTopicPartition(topic) {
|
150 | const minimumPartition = this.consumerAssignments[topic];
|
151 | if ((0, shared_utils_1.isUndefined)(minimumPartition)) {
|
152 | throw new invalid_kafka_client_topic_exception_1.InvalidKafkaClientTopicException(topic);
|
153 | }
|
154 |
|
155 | return minimumPartition.toString();
|
156 | }
|
157 | publish(partialPacket, callback) {
|
158 | const packet = this.assignPacketId(partialPacket);
|
159 | this.routingMap.set(packet.id, callback);
|
160 | const cleanup = () => this.routingMap.delete(packet.id);
|
161 | const errorCallback = (err) => {
|
162 | cleanup();
|
163 | callback({ err });
|
164 | };
|
165 | try {
|
166 | const pattern = this.normalizePattern(partialPacket.pattern);
|
167 | const replyTopic = this.getResponsePatternName(pattern);
|
168 | const replyPartition = this.getReplyTopicPartition(replyTopic);
|
169 | Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
|
170 | .then((serializedPacket) => {
|
171 | serializedPacket.headers[enums_1.KafkaHeaders.CORRELATION_ID] = packet.id;
|
172 | serializedPacket.headers[enums_1.KafkaHeaders.REPLY_TOPIC] = replyTopic;
|
173 | serializedPacket.headers[enums_1.KafkaHeaders.REPLY_PARTITION] =
|
174 | replyPartition;
|
175 | const message = Object.assign({
|
176 | topic: pattern,
|
177 | messages: [serializedPacket],
|
178 | }, this.options.send || {});
|
179 | return this.producer.send(message);
|
180 | })
|
181 | .catch(err => errorCallback(err));
|
182 | return cleanup;
|
183 | }
|
184 | catch (err) {
|
185 | errorCallback(err);
|
186 | }
|
187 | }
|
188 | getResponsePatternName(pattern) {
|
189 | return `${pattern}.reply`;
|
190 | }
|
191 | setConsumerAssignments(data) {
|
192 | const consumerAssignments = {};
|
193 |
|
194 | Object.keys(data.payload.memberAssignment).forEach(topic => {
|
195 | const memberPartitions = data.payload.memberAssignment[topic];
|
196 | if (memberPartitions.length) {
|
197 | consumerAssignments[topic] = Math.min(...memberPartitions);
|
198 | }
|
199 | });
|
200 | this.consumerAssignments = consumerAssignments;
|
201 | }
|
202 | initializeSerializer(options) {
|
203 | this.serializer =
|
204 | (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
|
205 | }
|
206 | initializeDeserializer(options) {
|
207 | this.deserializer =
|
208 | (options && options.deserializer) || new kafka_response_deserializer_1.KafkaResponseDeserializer();
|
209 | }
|
210 | commitOffsets(topicPartitions) {
|
211 | if (this.consumer) {
|
212 | return this.consumer.commitOffsets(topicPartitions);
|
213 | }
|
214 | else {
|
215 | throw new Error('No consumer initialized');
|
216 | }
|
217 | }
|
218 | }
|
219 | exports.ClientKafka = ClientKafka;
|