UNPKG

7 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerKafka = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
6const constants_1 = require("../constants");
7const ctx_host_1 = require("../ctx-host");
8const enums_1 = require("../enums");
9const helpers_1 = require("../helpers");
10const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
11const server_1 = require("./server");
12let kafkaPackage = {};
13class ServerKafka extends server_1.Server {
14 constructor(options) {
15 super();
16 this.options = options;
17 this.transportId = enums_1.Transport.KAFKA;
18 this.logger = new logger_service_1.Logger(ServerKafka.name);
19 this.client = null;
20 this.consumer = null;
21 this.producer = null;
22 this.parser = null;
23 const clientOptions = this.getOptionsProp(this.options, 'client') || {};
24 const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
25 const postfixId = this.getOptionsProp(this.options, 'postfixId') || '-server';
26 this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
27 // append a unique id to the clientId and groupId
28 // so they don't collide with a microservices client
29 this.clientId =
30 (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + postfixId;
31 this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + postfixId;
32 kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
33 this.parser = new helpers_1.KafkaParser((options && options.parser) || undefined);
34 this.initializeSerializer(options);
35 this.initializeDeserializer(options);
36 }
37 async listen(callback) {
38 try {
39 this.client = this.createClient();
40 await this.start(callback);
41 }
42 catch (err) {
43 callback(err);
44 }
45 }
46 async close() {
47 this.consumer && (await this.consumer.disconnect());
48 this.producer && (await this.producer.disconnect());
49 this.consumer = null;
50 this.producer = null;
51 this.client = null;
52 }
53 async start(callback) {
54 const consumerOptions = Object.assign(this.options.consumer || {}, {
55 groupId: this.groupId,
56 });
57 this.consumer = this.client.consumer(consumerOptions);
58 this.producer = this.client.producer(this.options.producer);
59 await this.consumer.connect();
60 await this.producer.connect();
61 await this.bindEvents(this.consumer);
62 callback();
63 }
64 createClient() {
65 return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
66 clientId: this.clientId,
67 brokers: this.brokers,
68 logCreator: helpers_1.KafkaLogger.bind(null, this.logger),
69 }));
70 }
71 async bindEvents(consumer) {
72 const registeredPatterns = [...this.messageHandlers.keys()];
73 const consumerSubscribeOptions = this.options.subscribe || {};
74 const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
75 await Promise.all(registeredPatterns.map(subscribeToPattern));
76 const consumerRunOptions = Object.assign(this.options.run || {}, {
77 eachMessage: this.getMessageHandler(),
78 });
79 await consumer.run(consumerRunOptions);
80 }
81 getMessageHandler() {
82 return async (payload) => this.handleMessage(payload);
83 }
84 getPublisher(replyTopic, replyPartition, correlationId) {
85 return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
86 }
87 async handleMessage(payload) {
88 const channel = payload.topic;
89 const rawMessage = this.parser.parse(Object.assign(payload.message, {
90 topic: payload.topic,
91 partition: payload.partition,
92 }));
93 const headers = rawMessage.headers;
94 const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
95 const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
96 const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
97 const packet = await this.deserializer.deserialize(rawMessage, { channel });
98 const kafkaContext = new ctx_host_1.KafkaContext([
99 rawMessage,
100 payload.partition,
101 payload.topic,
102 ]);
103 // if the correlation id or reply topic is not set
104 // then this is an event (events could still have correlation id)
105 if (!correlationId || !replyTopic) {
106 return this.handleEvent(packet.pattern, packet, kafkaContext);
107 }
108 const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
109 const handler = this.getHandlerByPattern(packet.pattern);
110 if (!handler) {
111 return publish({
112 id: correlationId,
113 err: constants_1.NO_MESSAGE_HANDLER,
114 });
115 }
116 const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
117 response$ && this.send(response$, publish);
118 }
119 sendMessage(message, replyTopic, replyPartition, correlationId) {
120 const outgoingMessage = this.serializer.serialize(message.response);
121 this.assignReplyPartition(replyPartition, outgoingMessage);
122 this.assignCorrelationIdHeader(correlationId, outgoingMessage);
123 this.assignErrorHeader(message, outgoingMessage);
124 this.assignIsDisposedHeader(message, outgoingMessage);
125 const replyMessage = Object.assign({
126 topic: replyTopic,
127 messages: [outgoingMessage],
128 }, this.options.send || {});
129 return this.producer.send(replyMessage);
130 }
131 assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
132 if (!outgoingResponse.isDisposed) {
133 return;
134 }
135 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
136 }
137 assignErrorHeader(outgoingResponse, outgoingMessage) {
138 if (!outgoingResponse.err) {
139 return;
140 }
141 outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] = Buffer.from(outgoingResponse.err);
142 }
143 assignCorrelationIdHeader(correlationId, outgoingMessage) {
144 outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] =
145 Buffer.from(correlationId);
146 }
147 assignReplyPartition(replyPartition, outgoingMessage) {
148 if (shared_utils_1.isNil(replyPartition)) {
149 return;
150 }
151 outgoingMessage.partition = parseFloat(replyPartition);
152 }
153 initializeSerializer(options) {
154 this.serializer =
155 (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
156 }
157}
158exports.ServerKafka = ServerKafka;