UNPKG

9.16 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
6const rxjs_1 = require("rxjs");
7const constants_1 = require("../constants");
8const ctx_host_1 = require("../ctx-host");
9const kafka_request_deserializer_1 = require("../deserializers/kafka-request.deserializer");
10const enums_1 = require("../enums");
11const exceptions_1 = require("../exceptions");
12const helpers_1 = require("../helpers");
13const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
14const server_1 = require("./server");
15let kafkaPackage = {};
16class ServerKafka extends server_1.Server {
17 constructor(options) {
18 var _a;
19 super();
20 this.options = options;
21 this.transportId = enums_1.Transport.KAFKA;
22 this.logger = new logger_service_1.Logger(ServerKafka.name);
23 this.client = null;
24 this.consumer = null;
25 this.producer = null;
26 this.parser = null;
27 const clientOptions = this.getOptionsProp(this.options, 'client') || {};
28 const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
29 const postfixId = (_a = this.getOptionsProp(this.options, 'postfixId')) !== null && _a !== void 0 ? _a : '-server';
30 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
31 // append a unique id to the clientId and groupId
32 // so they don't collide with a microservices client
33 this.clientId =
34 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
35 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
36 kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
37 this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
38 this.initializeSerializer(options);
39 this.initializeDeserializer(options);
40 }
41 async listen(callback) {
42 try {
43 this.client = this.createClient();
44 await this.start(callback);
45 }
46 catch (err) {
47 callback(err);
48 }
49 }
50 async close() {
51 this.consumer && (await this.consumer.disconnect());
52 this.producer && (await this.producer.disconnect());
53 this.consumer = null;
54 this.producer = null;
55 this.client = null;
56 }
57 async start(callback) {
58 const consumerOptions = Object.assign(this.options.consumer || {}, {
59 groupId: this.groupId,
60 });
61 this.consumer = this.client.consumer(consumerOptions);
62 this.producer = this.client.producer(this.options.producer);
63 await this.consumer.connect();
64 await this.producer.connect();
65 await this.bindEvents(this.consumer);
66 callback();
67 }
68 createClient() {
69 return new kafkaPackage.Kafka(Object.assign({ logCreator: helpers_1.KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }));
70 }
71 async bindEvents(consumer) {
72 const registeredPatterns = [...this.messageHandlers.keys()];
73 const consumerSubscribeOptions = this.options.subscribe || {};
74 const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
75 await Promise.all(registeredPatterns.map(subscribeToPattern));
76 const consumerRunOptions = Object.assign(this.options.run || {}, {
77 eachMessage: this.getMessageHandler(),
78 });
79 await consumer.run(consumerRunOptions);
80 }
81 getMessageHandler() {
82 return async (payload) => this.handleMessage(payload);
83 }
84 getPublisher(replyTopic, replyPartition, correlationId) {
85 return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
86 }
87 async handleMessage(payload) {
88 const channel = payload.topic;
89 const rawMessage = this.parser.parse(Object.assign(payload.message, {
90 topic: payload.topic,
91 partition: payload.partition,
92 }));
93 const headers = rawMessage.headers;
94 const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
95 const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
96 const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
97 const packet = await this.deserializer.deserialize(rawMessage, { channel });
98 const kafkaContext = new ctx_host_1.KafkaContext([
99 rawMessage,
100 payload.partition,
101 payload.topic,
102 this.consumer,
103 payload.heartbeat,
104 ]);
105 const handler = this.getHandlerByPattern(packet.pattern);
106 // if the correlation id or reply topic is not set
107 // then this is an event (events could still have correlation id)
108 if ((handler === null || handler === void 0 ? void 0 : handler.isEventHandler) || !correlationId || !replyTopic) {
109 return this.handleEvent(packet.pattern, packet, kafkaContext);
110 }
111 const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
112 if (!handler) {
113 return publish({
114 id: correlationId,
115 err: constants_1.NO_MESSAGE_HANDLER,
116 });
117 }
118 const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
119 const replayStream$ = new rxjs_1.ReplaySubject();
120 await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
121 this.send(replayStream$, publish);
122 }
123 combineStreamsAndThrowIfRetriable(response$, replayStream$) {
124 return new Promise((resolve, reject) => {
125 let isPromiseResolved = false;
126 response$.subscribe({
127 next: val => {
128 replayStream$.next(val);
129 if (!isPromiseResolved) {
130 isPromiseResolved = true;
131 resolve();
132 }
133 },
134 error: err => {
135 if (err instanceof exceptions_1.KafkaRetriableException && !isPromiseResolved) {
136 isPromiseResolved = true;
137 reject(err);
138 }
139 replayStream$.error(err);
140 },
141 complete: () => replayStream$.complete(),
142 });
143 });
144 }
145 async sendMessage(message, replyTopic, replyPartition, correlationId) {
146 const outgoingMessage = await this.serializer.serialize(message.response);
147 this.assignReplyPartition(replyPartition, outgoingMessage);
148 this.assignCorrelationIdHeader(correlationId, outgoingMessage);
149 this.assignErrorHeader(message, outgoingMessage);
150 this.assignIsDisposedHeader(message, outgoingMessage);
151 const replyMessage = Object.assign({
152 topic: replyTopic,
153 messages: [outgoingMessage],
154 }, this.options.send || {});
155 return this.producer.send(replyMessage);
156 }
157 assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
158 if (!outgoingResponse.isDisposed) {
159 return;
160 }
161 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
162 }
163 assignErrorHeader(outgoingResponse, outgoingMessage) {
164 if (!outgoingResponse.err) {
165 return;
166 }
167 const stringifiedError = typeof outgoingResponse.err === 'object'
168 ? JSON.stringify(outgoingResponse.err)
169 : outgoingResponse.err;
170 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] =
171 Buffer.from(stringifiedError);
172 }
173 assignCorrelationIdHeader(correlationId, outgoingMessage) {
174 outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] =
175 Buffer.from(correlationId);
176 }
177 assignReplyPartition(replyPartition, outgoingMessage) {
178 if ((0, shared_utils_1.isNil)(replyPartition)) {
179 return;
180 }
181 outgoingMessage.partition = parseFloat(replyPartition);
182 }
183 async handleEvent(pattern, packet, context) {
184 const handler = this.getHandlerByPattern(pattern);
185 if (!handler) {
186 return this.logger.error((0, constants_1.NO_EVENT_HANDLER) `${pattern}`);
187 }
188 const resultOrStream = await handler(packet.data, context);
189 if ((0, rxjs_1.isObservable)(resultOrStream)) {
190 await (0, rxjs_1.lastValueFrom)(resultOrStream);
191 }
192 }
193 initializeSerializer(options) {
194 this.serializer =
195 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
196 }
197 initializeDeserializer(options) {
198 var _a;
199 this.deserializer = (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new kafka_request_deserializer_1.KafkaRequestDeserializer();
200 }
201}
202exports.ServerKafka = ServerKafka;