UNPKG

8.73 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const constants_1 = require("../constants");
8const kafka_response_deserializer_1 = require("../deserializers/kafka-response.deserializer");
9const enums_1 = require("../enums");
10const invalid_kafka_client_topic_exception_1 = require("../errors/invalid-kafka-client-topic.exception");
11const helpers_1 = require("../helpers");
12const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
13const client_proxy_1 = require("./client-proxy");
14let kafkaPackage = {};
15class ClientKafka extends client_proxy_1.ClientProxy {
16 constructor(options) {
17 super();
18 this.options = options;
19 this.logger = new logger_service_1.Logger(ClientKafka.name);
20 this.client = null;
21 this.consumer = null;
22 this.producer = null;
23 this.parser = null;
24 this.responsePatterns = [];
25 this.consumerAssignments = {};
26 const clientOptions = this.getOptionsProp(this.options, 'client') || {};
27 const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
28 const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-client';
29 this.producerOnlyMode =
30 this.getOptionsProp(this.options, 'producerOnlyMode') || false;
31 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
32 // Append a unique id to the clientId and groupId
33 // so they don't collide with a microservices client
34 this.clientId =
35 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
36 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
37 kafkaPackage = (0, load_package_util_1.loadPackage)('kafkajs', ClientKafka.name, () => require('kafkajs'));
38 this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
39 this.initializeSerializer(options);
40 this.initializeDeserializer(options);
41 }
42 subscribeToResponseOf(pattern) {
43 const request = this.normalizePattern(pattern);
44 this.responsePatterns.push(this.getResponsePatternName(request));
45 }
46 async close() {
47 this.producer && (await this.producer.disconnect());
48 this.consumer && (await this.consumer.disconnect());
49 this.producer = null;
50 this.consumer = null;
51 this.client = null;
52 }
53 async connect() {
54 if (this.client) {
55 return this.producer;
56 }
57 this.client = this.createClient();
58 if (!this.producerOnlyMode) {
59 const partitionAssigners = [
60 (config) => new helpers_1.KafkaReplyPartitionAssigner(this, config),
61 ];
62 const consumerOptions = Object.assign({
63 partitionAssigners,
64 }, this.options.consumer || {}, {
65 groupId: this.groupId,
66 });
67 this.consumer = this.client.consumer(consumerOptions);
68 // set member assignments on join and rebalance
69 this.consumer.on(this.consumer.events.GROUP_JOIN, this.setConsumerAssignments.bind(this));
70 await this.consumer.connect();
71 await this.bindTopics();
72 }
73 this.producer = this.client.producer(this.options.producer || {});
74 await this.producer.connect();
75 return this.producer;
76 }
77 async bindTopics() {
78 if (!this.consumer) {
79 throw Error('No consumer initialized');
80 }
81 const consumerSubscribeOptions = this.options.subscribe || {};
82 const subscribeTo = async (responsePattern) => this.consumer.subscribe(Object.assign({ topic: responsePattern }, consumerSubscribeOptions));
83 await Promise.all(this.responsePatterns.map(subscribeTo));
84 await this.consumer.run(Object.assign(this.options.run || {}, {
85 eachMessage: this.createResponseCallback(),
86 }));
87 }
88 createClient() {
89 const kafkaConfig = Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { brokers: this.brokers, clientId: this.clientId });
90 return new kafkaPackage.Kafka(kafkaConfig);
91 }
92 createResponseCallback() {
93 return async (payload) => {
94 const rawMessage = this.parser.parse(Object.assign(payload.message, {
95 topic: payload.topic,
96 partition: payload.partition,
97 }));
98 if ((0, shared_utils_1.isUndefined)(rawMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID])) {
99 return;
100 }
101 const { err, response, isDisposed, id } = await this.deserializer.deserialize(rawMessage);
102 const callback = this.routingMap.get(id);
103 if (!callback) {
104 return;
105 }
106 if (err || isDisposed) {
107 return callback({
108 err,
109 response,
110 isDisposed,
111 });
112 }
113 callback({
114 err,
115 response,
116 });
117 };
118 }
119 getConsumerAssignments() {
120 return this.consumerAssignments;
121 }
122 async dispatchEvent(packet) {
123 const pattern = this.normalizePattern(packet.pattern);
124 const outgoingEvent = await this.serializer.serialize(packet.data, {
125 pattern,
126 });
127 const message = Object.assign({
128 topic: pattern,
129 messages: [outgoingEvent],
130 }, this.options.send || {});
131 return this.producer.send(message);
132 }
133 getReplyTopicPartition(topic) {
134 const minimumPartition = this.consumerAssignments[topic];
135 if ((0, shared_utils_1.isUndefined)(minimumPartition)) {
136 throw new invalid_kafka_client_topic_exception_1.InvalidKafkaClientTopicException(topic);
137 }
138 // get the minimum partition
139 return minimumPartition.toString();
140 }
141 publish(partialPacket, callback) {
142 const packet = this.assignPacketId(partialPacket);
143 this.routingMap.set(packet.id, callback);
144 const cleanup = () => this.routingMap.delete(packet.id);
145 const errorCallback = (err) => {
146 cleanup();
147 callback({ err });
148 };
149 try {
150 const pattern = this.normalizePattern(partialPacket.pattern);
151 const replyTopic = this.getResponsePatternName(pattern);
152 const replyPartition = this.getReplyTopicPartition(replyTopic);
153 Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
154 .then((serializedPacket) => {
155 serializedPacket.headers[enums_1.KafkaHeaders.CORRELATION_ID] = packet.id;
156 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_TOPIC] = replyTopic;
157 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_PARTITION] =
158 replyPartition;
159 const message = Object.assign({
160 topic: pattern,
161 messages: [serializedPacket],
162 }, this.options.send || {});
163 return this.producer.send(message);
164 })
165 .catch(err => errorCallback(err));
166 return cleanup;
167 }
168 catch (err) {
169 errorCallback(err);
170 }
171 }
172 getResponsePatternName(pattern) {
173 return `${pattern}.reply`;
174 }
175 setConsumerAssignments(data) {
176 const consumerAssignments = {};
177 // only need to set the minimum
178 Object.keys(data.payload.memberAssignment).forEach(memberId => {
179 const minimumPartition = Math.min(...data.payload.memberAssignment[memberId]);
180 consumerAssignments[memberId] = minimumPartition;
181 });
182 this.consumerAssignments = consumerAssignments;
183 }
184 initializeSerializer(options) {
185 this.serializer =
186 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
187 }
188 initializeDeserializer(options) {
189 this.deserializer =
190 (options && options.deserializer) || new kafka_response_deserializer_1.KafkaResponseDeserializer();
191 }
192 commitOffsets(topicPartitions) {
193 if (this.consumer) {
194 return this.consumer.commitOffsets(topicPartitions);
195 }
196 else {
197 throw new Error('No consumer initialized');
198 }
199 }
200}
201exports.ClientKafka = ClientKafka;