1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerKafka = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
6 | const rxjs_1 = require("rxjs");
|
7 | const constants_1 = require("../constants");
|
8 | const ctx_host_1 = require("../ctx-host");
|
9 | const kafka_request_deserializer_1 = require("../deserializers/kafka-request.deserializer");
|
10 | const enums_1 = require("../enums");
|
11 | const exceptions_1 = require("../exceptions");
|
12 | const helpers_1 = require("../helpers");
|
13 | const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
|
14 | const server_1 = require("./server");
|
15 | let kafkaPackage = {};
|
16 | class ServerKafka extends server_1.Server {
|
17 | constructor(options) {
|
18 | var _a;
|
19 | super();
|
20 | this.options = options;
|
21 | this.transportId = enums_1.Transport.KAFKA;
|
22 | this.logger = new logger_service_1.Logger(ServerKafka.name);
|
23 | this.client = null;
|
24 | this.consumer = null;
|
25 | this.producer = null;
|
26 | this.parser = null;
|
27 | const clientOptions = this.getOptionsProp(this.options, 'client') || {};
|
28 | const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
|
29 | const postfixId = (_a = this.getOptionsProp(this.options, 'postfixId')) !== null && _a !== void 0 ? _a : '-server';
|
30 | this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
|
31 |
|
32 |
|
33 | this.clientId =
|
34 | (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
|
35 | this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
|
36 | kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
|
37 | this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
|
38 | this.initializeSerializer(options);
|
39 | this.initializeDeserializer(options);
|
40 | }
|
41 | async listen(callback) {
|
42 | try {
|
43 | this.client = this.createClient();
|
44 | await this.start(callback);
|
45 | }
|
46 | catch (err) {
|
47 | callback(err);
|
48 | }
|
49 | }
|
50 | async close() {
|
51 | this.consumer && (await this.consumer.disconnect());
|
52 | this.producer && (await this.producer.disconnect());
|
53 | this.consumer = null;
|
54 | this.producer = null;
|
55 | this.client = null;
|
56 | }
|
57 | async start(callback) {
|
58 | const consumerOptions = Object.assign(this.options.consumer || {}, {
|
59 | groupId: this.groupId,
|
60 | });
|
61 | this.consumer = this.client.consumer(consumerOptions);
|
62 | this.producer = this.client.producer(this.options.producer);
|
63 | await this.consumer.connect();
|
64 | await this.producer.connect();
|
65 | await this.bindEvents(this.consumer);
|
66 | callback();
|
67 | }
|
68 | createClient() {
|
69 | return new kafkaPackage.Kafka(Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }));
|
70 | }
|
71 | async bindEvents(consumer) {
|
72 | const registeredPatterns = [...this.messageHandlers.keys()];
|
73 | const consumerSubscribeOptions = this.options.subscribe || {};
|
74 | const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
|
75 | await Promise.all(registeredPatterns.map(subscribeToPattern));
|
76 | const consumerRunOptions = Object.assign(this.options.run || {}, {
|
77 | eachMessage: this.getMessageHandler(),
|
78 | });
|
79 | await consumer.run(consumerRunOptions);
|
80 | }
|
81 | getMessageHandler() {
|
82 | return async (payload) => this.handleMessage(payload);
|
83 | }
|
84 | getPublisher(replyTopic, replyPartition, correlationId) {
|
85 | return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
|
86 | }
|
87 | async handleMessage(payload) {
|
88 | const channel = payload.topic;
|
89 | const rawMessage = this.parser.parse(Object.assign(payload.message, {
|
90 | topic: payload.topic,
|
91 | partition: payload.partition,
|
92 | }));
|
93 | const headers = rawMessage.headers;
|
94 | const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
|
95 | const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
|
96 | const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
|
97 | const packet = await this.deserializer.deserialize(rawMessage, { channel });
|
98 | const kafkaContext = new ctx_host_1.KafkaContext([
|
99 | rawMessage,
|
100 | payload.partition,
|
101 | payload.topic,
|
102 | this.consumer,
|
103 | payload.heartbeat,
|
104 | ]);
|
105 | const handler = this.getHandlerByPattern(packet.pattern);
|
106 |
|
107 |
|
108 | if ((handler === null || handler === void 0 ? void 0 : handler.isEventHandler) || !correlationId || !replyTopic) {
|
109 | return this.handleEvent(packet.pattern, packet, kafkaContext);
|
110 | }
|
111 | const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
|
112 | if (!handler) {
|
113 | return publish({
|
114 | id: correlationId,
|
115 | err: constants_1.NO_MESSAGE_HANDLER,
|
116 | });
|
117 | }
|
118 | const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
|
119 | const replayStream$ = new rxjs_1.ReplaySubject();
|
120 | await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
|
121 | this.send(replayStream$, publish);
|
122 | }
|
123 | combineStreamsAndThrowIfRetriable(response$, replayStream$) {
|
124 | return new Promise((resolve, reject) => {
|
125 | let isPromiseResolved = false;
|
126 | response$.subscribe({
|
127 | next: val => {
|
128 | replayStream$.next(val);
|
129 | if (!isPromiseResolved) {
|
130 | isPromiseResolved = true;
|
131 | resolve();
|
132 | }
|
133 | },
|
134 | error: err => {
|
135 | if (err instanceof exceptions_1.KafkaRetriableException && !isPromiseResolved) {
|
136 | isPromiseResolved = true;
|
137 | reject(err);
|
138 | }
|
139 | replayStream$.error(err);
|
140 | },
|
141 | complete: () => replayStream$.complete(),
|
142 | });
|
143 | });
|
144 | }
|
145 | async sendMessage(message, replyTopic, replyPartition, correlationId) {
|
146 | const outgoingMessage = await this.serializer.serialize(message.response);
|
147 | this.assignReplyPartition(replyPartition, outgoingMessage);
|
148 | this.assignCorrelationIdHeader(correlationId, outgoingMessage);
|
149 | this.assignErrorHeader(message, outgoingMessage);
|
150 | this.assignIsDisposedHeader(message, outgoingMessage);
|
151 | const replyMessage = Object.assign({
|
152 | topic: replyTopic,
|
153 | messages: [outgoingMessage],
|
154 | }, this.options.send || {});
|
155 | return this.producer.send(replyMessage);
|
156 | }
|
157 | assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
|
158 | if (!outgoingResponse.isDisposed) {
|
159 | return;
|
160 | }
|
161 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
|
162 | }
|
163 | assignErrorHeader(outgoingResponse, outgoingMessage) {
|
164 | if (!outgoingResponse.err) {
|
165 | return;
|
166 | }
|
167 | const stringifiedError = typeof outgoingResponse.err === 'object'
|
168 | ? JSON.stringify(outgoingResponse.err)
|
169 | : outgoingResponse.err;
|
170 | outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] =
|
171 | Buffer.from(stringifiedError);
|
172 | }
|
173 | assignCorrelationIdHeader(correlationId, outgoingMessage) {
|
174 | outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] =
|
175 | Buffer.from(correlationId);
|
176 | }
|
177 | assignReplyPartition(replyPartition, outgoingMessage) {
|
178 | if ((0, shared_utils_1.isNil)(replyPartition)) {
|
179 | return;
|
180 | }
|
181 | outgoingMessage.partition = parseFloat(replyPartition);
|
182 | }
|
183 | async handleEvent(pattern, packet, context) {
|
184 | const handler = this.getHandlerByPattern(pattern);
|
185 | if (!handler) {
|
186 | return this.logger.error((0, constants_1.NO_EVENT_HANDLER) `${pattern}`);
|
187 | }
|
188 | const resultOrStream = await handler(packet.data, context);
|
189 | if ((0, rxjs_1.isObservable)(resultOrStream)) {
|
190 | await (0, rxjs_1.lastValueFrom)(resultOrStream);
|
191 | }
|
192 | }
|
193 | initializeSerializer(options) {
|
194 | this.serializer =
|
195 | (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
|
196 | }
|
197 | initializeDeserializer(options) {
|
198 | var _a;
|
199 | this.deserializer = (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new kafka_request_deserializer_1.KafkaRequestDeserializer();
|
200 | }
|
201 | }
|
202 | exports.ServerKafka = ServerKafka;
|