1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerKafka = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
6 | const constants_1 = require("../constants");
|
7 | const ctx_host_1 = require("../ctx-host");
|
8 | const enums_1 = require("../enums");
|
9 | const helpers_1 = require("../helpers");
|
10 | const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
|
11 | const server_1 = require("./server");
|
12 | let kafkaPackage = {};
|
13 | class ServerKafka extends server_1.Server {
|
14 | constructor(options) {
|
15 | super();
|
16 | this.options = options;
|
17 | this.transportId = enums_1.Transport.KAFKA;
|
18 | this.logger = new logger_service_1.Logger(ServerKafka.name);
|
19 | this.client = null;
|
20 | this.consumer = null;
|
21 | this.producer = null;
|
22 | this.parser = null;
|
23 | const clientOptions = this.getOptionsProp(this.options, 'client') || {};
|
24 | const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
|
25 | const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-server';
|
26 | this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
|
27 |
|
28 |
|
29 | this.clientId =
|
30 | (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
|
31 | this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
|
32 | kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
|
33 | this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
|
34 | this.initializeSerializer(options);
|
35 | this.initializeDeserializer(options);
|
36 | }
|
37 | async listen(callback) {
|
38 | try {
|
39 | this.client = this.createClient();
|
40 | await this.start(callback);
|
41 | }
|
42 | catch (err) {
|
43 | callback(err);
|
44 | }
|
45 | }
|
46 | async close() {
|
47 | this.consumer && (await this.consumer.disconnect());
|
48 | this.producer && (await this.producer.disconnect());
|
49 | this.consumer = null;
|
50 | this.producer = null;
|
51 | this.client = null;
|
52 | }
|
53 | async start(callback) {
|
54 | const consumerOptions = Object.assign(this.options.consumer || {}, {
|
55 | groupId: this.groupId,
|
56 | });
|
57 | this.consumer = this.client.consumer(consumerOptions);
|
58 | this.producer = this.client.producer(this.options.producer);
|
59 | await this.consumer.connect();
|
60 | await this.producer.connect();
|
61 | await this.bindEvents(this.consumer);
|
62 | callback();
|
63 | }
|
64 | createClient() {
|
65 | return new kafkaPackage.Kafka(Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }));
|
66 | }
|
67 | async bindEvents(consumer) {
|
68 | const registeredPatterns = [...this.messageHandlers.keys()];
|
69 | const consumerSubscribeOptions = this.options.subscribe || {};
|
70 | const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
|
71 | await Promise.all(registeredPatterns.map(subscribeToPattern));
|
72 | const consumerRunOptions = Object.assign(this.options.run || {}, {
|
73 | eachMessage: this.getMessageHandler(),
|
74 | });
|
75 | await consumer.run(consumerRunOptions);
|
76 | }
|
77 | getMessageHandler() {
|
78 | return async (payload) => this.handleMessage(payload);
|
79 | }
|
80 | getPublisher(replyTopic, replyPartition, correlationId) {
|
81 | return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
|
82 | }
|
83 | async handleMessage(payload) {
|
84 | const channel = payload.topic;
|
85 | const rawMessage = this.parser.parse(Object.assign(payload.message, {
|
86 | topic: payload.topic,
|
87 | partition: payload.partition,
|
88 | }));
|
89 | const headers = rawMessage.headers;
|
90 | const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
|
91 | const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
|
92 | const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
|
93 | const packet = await this.deserializer.deserialize(rawMessage, { channel });
|
94 | const kafkaContext = new ctx_host_1.KafkaContext([
|
95 | rawMessage,
|
96 | payload.partition,
|
97 | payload.topic,
|
98 | ]);
|
99 | const handler = this.getHandlerByPattern(packet.pattern);
|
100 |
|
101 |
|
102 | if ((handler === null || handler === void 0 ? void 0 : handler.isEventHandler) || !correlationId || !replyTopic) {
|
103 | return this.handleEvent(packet.pattern, packet, kafkaContext);
|
104 | }
|
105 | const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
|
106 | if (!handler) {
|
107 | return publish({
|
108 | id: correlationId,
|
109 | err: constants_1.NO_MESSAGE_HANDLER,
|
110 | });
|
111 | }
|
112 | const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
|
113 | response$ && this.send(response$, publish);
|
114 | }
|
115 | async sendMessage(message, replyTopic, replyPartition, correlationId) {
|
116 | const outgoingMessage = await this.serializer.serialize(message.response);
|
117 | this.assignReplyPartition(replyPartition, outgoingMessage);
|
118 | this.assignCorrelationIdHeader(correlationId, outgoingMessage);
|
119 | this.assignErrorHeader(message, outgoingMessage);
|
120 | this.assignIsDisposedHeader(message, outgoingMessage);
|
121 | const replyMessage = Object.assign({
|
122 | topic: replyTopic,
|
123 | messages: [outgoingMessage],
|
124 | }, this.options.send || {});
|
125 | return this.producer.send(replyMessage);
|
126 | }
|
127 | assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
|
128 | if (!outgoingResponse.isDisposed) {
|
129 | return;
|
130 | }
|
131 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
|
132 | }
|
133 | assignErrorHeader(outgoingResponse, outgoingMessage) {
|
134 | if (!outgoingResponse.err) {
|
135 | return;
|
136 | }
|
137 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] = Buffer.from(outgoingResponse.err);
|
138 | }
|
139 | assignCorrelationIdHeader(correlationId, outgoingMessage) {
|
140 | outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] =
|
141 | Buffer.from(correlationId);
|
142 | }
|
143 | assignReplyPartition(replyPartition, outgoingMessage) {
|
144 | if ((0, shared_utils_1.isNil)(replyPartition)) {
|
145 | return;
|
146 | }
|
147 | outgoingMessage.partition = parseFloat(replyPartition);
|
148 | }
|
149 | initializeSerializer(options) {
|
150 | this.serializer =
|
151 | (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
|
152 | }
|
153 | }
|
154 | exports.ServerKafka = ServerKafka;
|