UNPKG

6.78 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
6const constants_1 = require("../constants");
7const ctx_host_1 = require("../ctx-host");
8const enums_1 = require("../enums");
9const helpers_1 = require("../helpers");
10const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
11const server_1 = require("./server");
12let kafkaPackage = {};
13class ServerKafka extends server_1.Server {
14 constructor(options) {
15 super();
16 this.options = options;
17 this.transportId = enums_1.Transport.KAFKA;
18 this.logger = new logger_service_1.Logger(ServerKafka.name);
19 this.client = null;
20 this.consumer = null;
21 this.producer = null;
22 const clientOptions = this.getOptionsProp(this.options, 'client') || {};
23 const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
24 const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-server';
25 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
26 // append a unique id to the clientId and groupId
27 // so they don't collide with a microservices client
28 this.clientId =
29 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
30 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
31 kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
32 this.initializeSerializer(options);
33 this.initializeDeserializer(options);
34 }
35 async listen(callback) {
36 this.client = this.createClient();
37 await this.start(callback);
38 }
39 async close() {
40 this.consumer && (await this.consumer.disconnect());
41 this.producer && (await this.producer.disconnect());
42 this.consumer = null;
43 this.producer = null;
44 this.client = null;
45 }
46 async start(callback) {
47 const consumerOptions = Object.assign(this.options.consumer || {}, {
48 groupId: this.groupId,
49 });
50 this.consumer = this.client.consumer(consumerOptions);
51 this.producer = this.client.producer(this.options.producer);
52 await this.consumer.connect();
53 await this.producer.connect();
54 await this.bindEvents(this.consumer);
55 callback();
56 }
57 createClient() {
58 return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
59 clientId: this.clientId,
60 brokers: this.brokers,
61 logCreator: helpers_1.KafkaLogger.bind(null, this.logger),
62 }));
63 }
64 async bindEvents(consumer) {
65 const registeredPatterns = [...this.messageHandlers.keys()];
66 const consumerSubscribeOptions = this.options.subscribe || {};
67 const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
68 await Promise.all(registeredPatterns.map(subscribeToPattern));
69 const consumerRunOptions = Object.assign(this.options.run || {}, {
70 eachMessage: this.getMessageHandler(),
71 });
72 await consumer.run(consumerRunOptions);
73 }
74 getMessageHandler() {
75 return async (payload) => this.handleMessage(payload);
76 }
77 getPublisher(replyTopic, replyPartition, correlationId) {
78 return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
79 }
80 async handleMessage(payload) {
81 const channel = payload.topic;
82 const rawMessage = helpers_1.KafkaParser.parse(Object.assign(payload.message, {
83 topic: payload.topic,
84 partition: payload.partition,
85 }));
86 const headers = rawMessage.headers;
87 const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
88 const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
89 const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
90 const packet = this.deserializer.deserialize(rawMessage, { channel });
91 const kafkaContext = new ctx_host_1.KafkaContext([
92 rawMessage,
93 payload.partition,
94 payload.topic,
95 ]);
96 // if the correlation id or reply topic is not set
97 // then this is an event (events could still have correlation id)
98 if (!correlationId || !replyTopic) {
99 return this.handleEvent(packet.pattern, packet, kafkaContext);
100 }
101 const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
102 const handler = this.getHandlerByPattern(packet.pattern);
103 if (!handler) {
104 return publish({
105 id: correlationId,
106 err: constants_1.NO_MESSAGE_HANDLER,
107 });
108 }
109 const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
110 response$ && this.send(response$, publish);
111 }
112 sendMessage(message, replyTopic, replyPartition, correlationId) {
113 const outgoingMessage = this.serializer.serialize(message.response);
114 this.assignReplyPartition(replyPartition, outgoingMessage);
115 this.assignCorrelationIdHeader(correlationId, outgoingMessage);
116 this.assignErrorHeader(message, outgoingMessage);
117 this.assignIsDisposedHeader(message, outgoingMessage);
118 const replyMessage = Object.assign({
119 topic: replyTopic,
120 messages: [outgoingMessage],
121 }, this.options.send || {});
122 return this.producer.send(replyMessage);
123 }
124 assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
125 if (!outgoingResponse.isDisposed) {
126 return;
127 }
128 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
129 }
130 assignErrorHeader(outgoingResponse, outgoingMessage) {
131 if (!outgoingResponse.err) {
132 return;
133 }
134 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] = Buffer.from(outgoingResponse.err);
135 }
136 assignCorrelationIdHeader(correlationId, outgoingMessage) {
137 outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] = Buffer.from(correlationId);
138 }
139 assignReplyPartition(replyPartition, outgoingMessage) {
140 if (shared_utils_1.isNil(replyPartition)) {
141 return;
142 }
143 outgoingMessage.partition = parseFloat(replyPartition);
144 }
145 initializeSerializer(options) {
146 this.serializer =
147 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
148 }
149}
150exports.ServerKafka = ServerKafka;