1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerKafka = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
6 | const constants_1 = require("../constants");
|
7 | const ctx_host_1 = require("../ctx-host");
|
8 | const enums_1 = require("../enums");
|
9 | const helpers_1 = require("../helpers");
|
10 | const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
|
11 | const server_1 = require("./server");
|
12 | let kafkaPackage = {};
|
13 | class ServerKafka extends server_1.Server {
|
14 | constructor(options) {
|
15 | super();
|
16 | this.options = options;
|
17 | this.transportId = enums_1.Transport.KAFKA;
|
18 | this.logger = new logger_service_1.Logger(ServerKafka.name);
|
19 | this.client = null;
|
20 | this.consumer = null;
|
21 | this.producer = null;
|
22 | const clientOptions = this.getOptionsProp(this.options, 'client') || {};
|
23 | const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
|
24 | const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-server';
|
25 | this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
|
26 |
|
27 |
|
28 | this.clientId =
|
29 | (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
|
30 | this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
|
31 | kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
|
32 | this.initializeSerializer(options);
|
33 | this.initializeDeserializer(options);
|
34 | }
|
35 | async listen(callback) {
|
36 | this.client = this.createClient();
|
37 | await this.start(callback);
|
38 | }
|
39 | async close() {
|
40 | this.consumer && (await this.consumer.disconnect());
|
41 | this.producer && (await this.producer.disconnect());
|
42 | this.consumer = null;
|
43 | this.producer = null;
|
44 | this.client = null;
|
45 | }
|
46 | async start(callback) {
|
47 | const consumerOptions = Object.assign(this.options.consumer || {}, {
|
48 | groupId: this.groupId,
|
49 | });
|
50 | this.consumer = this.client.consumer(consumerOptions);
|
51 | this.producer = this.client.producer(this.options.producer);
|
52 | await this.consumer.connect();
|
53 | await this.producer.connect();
|
54 | await this.bindEvents(this.consumer);
|
55 | callback();
|
56 | }
|
57 | createClient() {
|
58 | return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
|
59 | clientId: this.clientId,
|
60 | brokers: this.brokers,
|
61 | logCreator: helpers_1.KafkaLogger.bind(null, this.logger),
|
62 | }));
|
63 | }
|
64 | async bindEvents(consumer) {
|
65 | const registeredPatterns = [...this.messageHandlers.keys()];
|
66 | const consumerSubscribeOptions = this.options.subscribe || {};
|
67 | const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
|
68 | await Promise.all(registeredPatterns.map(subscribeToPattern));
|
69 | const consumerRunOptions = Object.assign(this.options.run || {}, {
|
70 | eachMessage: this.getMessageHandler(),
|
71 | });
|
72 | await consumer.run(consumerRunOptions);
|
73 | }
|
74 | getMessageHandler() {
|
75 | return async (payload) => this.handleMessage(payload);
|
76 | }
|
77 | getPublisher(replyTopic, replyPartition, correlationId) {
|
78 | return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
|
79 | }
|
80 | async handleMessage(payload) {
|
81 | const channel = payload.topic;
|
82 | const rawMessage = helpers_1.KafkaParser.parse(Object.assign(payload.message, {
|
83 | topic: payload.topic,
|
84 | partition: payload.partition,
|
85 | }));
|
86 | const headers = rawMessage.headers;
|
87 | const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
|
88 | const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
|
89 | const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
|
90 | const packet = this.deserializer.deserialize(rawMessage, { channel });
|
91 | const kafkaContext = new ctx_host_1.KafkaContext([
|
92 | rawMessage,
|
93 | payload.partition,
|
94 | payload.topic,
|
95 | ]);
|
96 |
|
97 |
|
98 | if (!correlationId || !replyTopic) {
|
99 | return this.handleEvent(packet.pattern, packet, kafkaContext);
|
100 | }
|
101 | const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
|
102 | const handler = this.getHandlerByPattern(packet.pattern);
|
103 | if (!handler) {
|
104 | return publish({
|
105 | id: correlationId,
|
106 | err: constants_1.NO_MESSAGE_HANDLER,
|
107 | });
|
108 | }
|
109 | const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
|
110 | response$ && this.send(response$, publish);
|
111 | }
|
112 | sendMessage(message, replyTopic, replyPartition, correlationId) {
|
113 | const outgoingMessage = this.serializer.serialize(message.response);
|
114 | this.assignReplyPartition(replyPartition, outgoingMessage);
|
115 | this.assignCorrelationIdHeader(correlationId, outgoingMessage);
|
116 | this.assignErrorHeader(message, outgoingMessage);
|
117 | this.assignIsDisposedHeader(message, outgoingMessage);
|
118 | const replyMessage = Object.assign({
|
119 | topic: replyTopic,
|
120 | messages: [outgoingMessage],
|
121 | }, this.options.send || {});
|
122 | return this.producer.send(replyMessage);
|
123 | }
|
124 | assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
|
125 | if (!outgoingResponse.isDisposed) {
|
126 | return;
|
127 | }
|
128 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
|
129 | }
|
130 | assignErrorHeader(outgoingResponse, outgoingMessage) {
|
131 | if (!outgoingResponse.err) {
|
132 | return;
|
133 | }
|
134 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] = Buffer.from(outgoingResponse.err);
|
135 | }
|
136 | assignCorrelationIdHeader(correlationId, outgoingMessage) {
|
137 | outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] = Buffer.from(correlationId);
|
138 | }
|
139 | assignReplyPartition(replyPartition, outgoingMessage) {
|
140 | if (shared_utils_1.isNil(replyPartition)) {
|
141 | return;
|
142 | }
|
143 | outgoingMessage.partition = parseFloat(replyPartition);
|
144 | }
|
145 | initializeSerializer(options) {
|
146 | this.serializer =
|
147 | (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
|
148 | }
|
149 | }
|
150 | exports.ServerKafka = ServerKafka;
|