UNPKG

9.26 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const constants_1 = require("../constants");
8const kafka_response_deserializer_1 = require("../deserializers/kafka-response.deserializer");
9const enums_1 = require("../enums");
10const invalid_kafka_client_topic_exception_1 = require("../errors/invalid-kafka-client-topic.exception");
11const helpers_1 = require("../helpers");
12const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
13const client_proxy_1 = require("./client-proxy");
14let kafkaPackage = {};
15/**
16 * @publicApi
17 */
18class ClientKafka extends client_proxy_1.ClientProxy {
19 constructor(options) {
20 super();
21 this.options = options;
22 this.logger = new logger_service_1.Logger(ClientKafka.name);
23 this.client = null;
24 this.consumer = null;
25 this.producer = null;
26 this.parser = null;
27 this.initialized = null;
28 this.responsePatterns = [];
29 this.consumerAssignments = {};
30 const clientOptions = this.getOptionsProp(this.options, 'client', {});
31 const consumerOptions = this.getOptionsProp(this.options, 'consumer', {});
32 const postfixId = this.getOptionsProp(this.options, 'postfixId', '-client');
33 this.producerOnlyMode = this.getOptionsProp(this.options, 'producerOnlyMode', false);
34 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
35 // Append a unique id to the clientId and groupId
36 // so they don't collide with a microservices client
37 this.clientId =
38 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
39 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
40 kafkaPackage = (0, load_package_util_1.loadPackage)('kafkajs', ClientKafka.name, () => require('kafkajs'));
41 this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
42 this.initializeSerializer(options);
43 this.initializeDeserializer(options);
44 }
45 subscribeToResponseOf(pattern) {
46 const request = this.normalizePattern(pattern);
47 this.responsePatterns.push(this.getResponsePatternName(request));
48 }
49 async close() {
50 this.producer && (await this.producer.disconnect());
51 this.consumer && (await this.consumer.disconnect());
52 this.producer = null;
53 this.consumer = null;
54 this.initialized = null;
55 this.client = null;
56 }
57 async connect() {
58 if (this.initialized) {
59 return this.initialized.then(() => this.producer);
60 }
61 this.initialized = new Promise(async (resolve, reject) => {
62 try {
63 this.client = this.createClient();
64 if (!this.producerOnlyMode) {
65 const partitionAssigners = [
66 (config) => new helpers_1.KafkaReplyPartitionAssigner(this, config),
67 ];
68 const consumerOptions = Object.assign({
69 partitionAssigners,
70 }, this.options.consumer || {}, {
71 groupId: this.groupId,
72 });
73 this.consumer = this.client.consumer(consumerOptions);
74 // set member assignments on join and rebalance
75 this.consumer.on(this.consumer.events.GROUP_JOIN, this.setConsumerAssignments.bind(this));
76 await this.consumer.connect();
77 await this.bindTopics();
78 }
79 this.producer = this.client.producer(this.options.producer || {});
80 await this.producer.connect();
81 resolve();
82 }
83 catch (err) {
84 reject(err);
85 }
86 });
87 return this.initialized.then(() => this.producer);
88 }
89 async bindTopics() {
90 if (!this.consumer) {
91 throw Error('No consumer initialized');
92 }
93 const consumerSubscribeOptions = this.options.subscribe || {};
94 if (this.responsePatterns.length > 0) {
95 await this.consumer.subscribe({
96 ...consumerSubscribeOptions,
97 topics: this.responsePatterns,
98 });
99 }
100 await this.consumer.run(Object.assign(this.options.run || {}, {
101 eachMessage: this.createResponseCallback(),
102 }));
103 }
104 createClient() {
105 const kafkaConfig = Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { brokers: this.brokers, clientId: this.clientId });
106 return new kafkaPackage.Kafka(kafkaConfig);
107 }
108 createResponseCallback() {
109 return async (payload) => {
110 const rawMessage = this.parser.parse(Object.assign(payload.message, {
111 topic: payload.topic,
112 partition: payload.partition,
113 }));
114 if ((0, shared_utils_1.isUndefined)(rawMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID])) {
115 return;
116 }
117 const { err, response, isDisposed, id } = await this.deserializer.deserialize(rawMessage);
118 const callback = this.routingMap.get(id);
119 if (!callback) {
120 return;
121 }
122 if (err || isDisposed) {
123 return callback({
124 err,
125 response,
126 isDisposed,
127 });
128 }
129 callback({
130 err,
131 response,
132 });
133 };
134 }
135 getConsumerAssignments() {
136 return this.consumerAssignments;
137 }
138 async dispatchEvent(packet) {
139 const pattern = this.normalizePattern(packet.pattern);
140 const outgoingEvent = await this.serializer.serialize(packet.data, {
141 pattern,
142 });
143 const message = Object.assign({
144 topic: pattern,
145 messages: [outgoingEvent],
146 }, this.options.send || {});
147 return this.producer.send(message);
148 }
149 getReplyTopicPartition(topic) {
150 const minimumPartition = this.consumerAssignments[topic];
151 if ((0, shared_utils_1.isUndefined)(minimumPartition)) {
152 throw new invalid_kafka_client_topic_exception_1.InvalidKafkaClientTopicException(topic);
153 }
154 // get the minimum partition
155 return minimumPartition.toString();
156 }
157 publish(partialPacket, callback) {
158 const packet = this.assignPacketId(partialPacket);
159 this.routingMap.set(packet.id, callback);
160 const cleanup = () => this.routingMap.delete(packet.id);
161 const errorCallback = (err) => {
162 cleanup();
163 callback({ err });
164 };
165 try {
166 const pattern = this.normalizePattern(partialPacket.pattern);
167 const replyTopic = this.getResponsePatternName(pattern);
168 const replyPartition = this.getReplyTopicPartition(replyTopic);
169 Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
170 .then((serializedPacket) => {
171 serializedPacket.headers[enums_1.KafkaHeaders.CORRELATION_ID] = packet.id;
172 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_TOPIC] = replyTopic;
173 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_PARTITION] =
174 replyPartition;
175 const message = Object.assign({
176 topic: pattern,
177 messages: [serializedPacket],
178 }, this.options.send || {});
179 return this.producer.send(message);
180 })
181 .catch(err => errorCallback(err));
182 return cleanup;
183 }
184 catch (err) {
185 errorCallback(err);
186 }
187 }
188 getResponsePatternName(pattern) {
189 return `${pattern}.reply`;
190 }
191 setConsumerAssignments(data) {
192 const consumerAssignments = {};
193 // only need to set the minimum
194 Object.keys(data.payload.memberAssignment).forEach(topic => {
195 const memberPartitions = data.payload.memberAssignment[topic];
196 if (memberPartitions.length) {
197 consumerAssignments[topic] = Math.min(...memberPartitions);
198 }
199 });
200 this.consumerAssignments = consumerAssignments;
201 }
202 initializeSerializer(options) {
203 this.serializer =
204 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
205 }
206 initializeDeserializer(options) {
207 this.deserializer =
208 (options && options.deserializer) || new kafka_response_deserializer_1.KafkaResponseDeserializer();
209 }
210 commitOffsets(topicPartitions) {
211 if (this.consumer) {
212 return this.consumer.commitOffsets(topicPartitions);
213 }
214 else {
215 throw new Error('No consumer initialized');
216 }
217 }
218}
219exports.ClientKafka = ClientKafka;