UNPKG

7.86 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const constants_1 = require("../constants");
8const kafka_response_deserializer_1 = require("../deserializers/kafka-response.deserializer");
9const enums_1 = require("../enums");
10const invalid_kafka_client_topic_exception_1 = require("../errors/invalid-kafka-client-topic.exception");
11const helpers_1 = require("../helpers");
12const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
13const client_proxy_1 = require("./client-proxy");
14let kafkaPackage = {};
15class ClientKafka extends client_proxy_1.ClientProxy {
16 constructor(options) {
17 super();
18 this.options = options;
19 this.logger = new logger_service_1.Logger(ClientKafka.name);
20 this.client = null;
21 this.consumer = null;
22 this.producer = null;
23 this.parser = null;
24 this.responsePatterns = [];
25 this.consumerAssignments = {};
26 const clientOptions = this.getOptionsProp(this.options, 'client') || {};
27 const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
28 const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-client';
29 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
30 // Append a unique id to the clientId and groupId
31 // so they don't collide with a microservices client
32 this.clientId =
33 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
34 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
35 kafkaPackage = load_package_util_1.loadPackage('kafkajs', ClientKafka.name, () => require('kafkajs'));
36 this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
37 this.initializeSerializer(options);
38 this.initializeDeserializer(options);
39 }
40 subscribeToResponseOf(pattern) {
41 const request = this.normalizePattern(pattern);
42 this.responsePatterns.push(this.getResponsePatternName(request));
43 }
44 async close() {
45 this.producer && (await this.producer.disconnect());
46 this.consumer && (await this.consumer.disconnect());
47 this.producer = null;
48 this.consumer = null;
49 this.client = null;
50 }
51 async connect() {
52 if (this.client) {
53 return this.producer;
54 }
55 this.client = this.createClient();
56 const partitionAssigners = [
57 (config) => new helpers_1.KafkaReplyPartitionAssigner(this, config),
58 ];
59 const consumerOptions = Object.assign({
60 partitionAssigners,
61 }, this.options.consumer || {}, {
62 groupId: this.groupId,
63 });
64 this.producer = this.client.producer(this.options.producer || {});
65 this.consumer = this.client.consumer(consumerOptions);
66 // set member assignments on join and rebalance
67 this.consumer.on(this.consumer.events.GROUP_JOIN, this.setConsumerAssignments.bind(this));
68 await this.producer.connect();
69 await this.consumer.connect();
70 await this.bindTopics();
71 return this.producer;
72 }
73 async bindTopics() {
74 const consumerSubscribeOptions = this.options.subscribe || {};
75 const subscribeTo = async (responsePattern) => this.consumer.subscribe(Object.assign({ topic: responsePattern }, consumerSubscribeOptions));
76 await Promise.all(this.responsePatterns.map(subscribeTo));
77 await this.consumer.run(Object.assign(this.options.run || {}, {
78 eachMessage: this.createResponseCallback(),
79 }));
80 }
81 createClient() {
82 const kafkaConfig = Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { brokers: this.brokers, clientId: this.clientId });
83 return new kafkaPackage.Kafka(kafkaConfig);
84 }
85 createResponseCallback() {
86 return async (payload) => {
87 const rawMessage = this.parser.parse(Object.assign(payload.message, {
88 topic: payload.topic,
89 partition: payload.partition,
90 }));
91 if (shared_utils_1.isUndefined(rawMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID])) {
92 return;
93 }
94 const { err, response, isDisposed, id } = await this.deserializer.deserialize(rawMessage);
95 const callback = this.routingMap.get(id);
96 if (!callback) {
97 return;
98 }
99 if (err || isDisposed) {
100 return callback({
101 err,
102 response,
103 isDisposed,
104 });
105 }
106 callback({
107 err,
108 response,
109 });
110 };
111 }
112 getConsumerAssignments() {
113 return this.consumerAssignments;
114 }
115 dispatchEvent(packet) {
116 const pattern = this.normalizePattern(packet.pattern);
117 const outgoingEvent = this.serializer.serialize(packet.data);
118 const message = Object.assign({
119 topic: pattern,
120 messages: [outgoingEvent],
121 }, this.options.send || {});
122 return this.producer.send(message);
123 }
124 getReplyTopicPartition(topic) {
125 const minimumPartition = this.consumerAssignments[topic];
126 if (shared_utils_1.isUndefined(minimumPartition)) {
127 throw new invalid_kafka_client_topic_exception_1.InvalidKafkaClientTopicException(topic);
128 }
129 // get the minimum partition
130 return minimumPartition.toString();
131 }
132 publish(partialPacket, callback) {
133 try {
134 const packet = this.assignPacketId(partialPacket);
135 const pattern = this.normalizePattern(partialPacket.pattern);
136 const replyTopic = this.getResponsePatternName(pattern);
137 const replyPartition = this.getReplyTopicPartition(replyTopic);
138 const serializedPacket = this.serializer.serialize(packet.data);
139 serializedPacket.headers[enums_1.KafkaHeaders.CORRELATION_ID] = packet.id;
140 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_TOPIC] = replyTopic;
141 serializedPacket.headers[enums_1.KafkaHeaders.REPLY_PARTITION] = replyPartition;
142 this.routingMap.set(packet.id, callback);
143 const message = Object.assign({
144 topic: pattern,
145 messages: [serializedPacket],
146 }, this.options.send || {});
147 this.producer.send(message).catch(err => callback({ err }));
148 return () => this.routingMap.delete(packet.id);
149 }
150 catch (err) {
151 callback({ err });
152 }
153 }
154 getResponsePatternName(pattern) {
155 return `${pattern}.reply`;
156 }
157 setConsumerAssignments(data) {
158 const consumerAssignments = {};
159 // only need to set the minimum
160 Object.keys(data.payload.memberAssignment).forEach(memberId => {
161 const minimumPartition = Math.min(...data.payload.memberAssignment[memberId]);
162 consumerAssignments[memberId] = minimumPartition;
163 });
164 this.consumerAssignments = consumerAssignments;
165 }
166 initializeSerializer(options) {
167 this.serializer =
168 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
169 }
170 initializeDeserializer(options) {
171 this.deserializer =
172 (options && options.deserializer) || new kafka_response_deserializer_1.KafkaResponseDeserializer();
173 }
174}
175exports.ClientKafka = ClientKafka;