'use strict'; var core = require('@kafka-ts/core'); var common = require('@nestjs/common'); const PAYLOAD_TYPE = 3; const CONTEXT_TYPE = 6; const ROUTE_ARGS_METADATA = '__routeArguments__'; const PATTERN_METADATA = 'microservices:pattern'; const PATTERN_HANDLER_METADATA = 'microservices:handler_type'; function handleSubscribeParams(descriptor, params) { Reflect.defineMetadata(PATTERN_METADATA, [ { ...params, isKafka: true, }, ], descriptor.value); Reflect.defineMetadata(PATTERN_HANDLER_METADATA, 1, descriptor.value); } function Subscribe(params) { return (_target, _key, descriptor) => { handleSubscribeParams(descriptor, { ...params, type: 'batch_message', }); return descriptor; }; } function SubscribeMessage(params) { return (_target, _key, descriptor) => { handleSubscribeParams(descriptor, { ...params, type: 'message', }); return descriptor; }; } function createDecorator(type) { return () => { return (target, key = '', index) => { Reflect.defineMetadata(ROUTE_ARGS_METADATA, common.assignMetadata(Reflect.getMetadata(ROUTE_ARGS_METADATA, target.constructor, key) || {}, type, index), target.constructor, key); }; }; } const Ctx = createDecorator(CONTEXT_TYPE); const Payload = createDecorator(PAYLOAD_TYPE); async function createConsumers(options) { if (!Array.isArray(options)) { options = [options]; } const [clientOptions, consumerOptions] = options.reduce((result, option) => { const { consumerOptions, ...rest } = option; result[0].push(rest); result[1][rest.clientId || core.DEFAULT_CLIENT] = consumerOptions; return result; }, [[], {}]); const kafkaClients = await core.createKafkaClients(clientOptions); const kafkaConsumers = await Promise.allSettled(Object.entries(kafkaClients.getAllClients()).map(async ([clientId, client]) => { const consumer = client.consumer(consumerOptions[clientId]); await consumer.connect(); return { clientId, consumer, }; })); return kafkaConsumers.reduce((result, settledKafkaConsumer) => { if (settledKafkaConsumer.status === 'fulfilled') { const { clientId, consumer } = settledKafkaConsumer.value; result[clientId] = consumer; } return result; }, {}); } function omit(data, keys) { return Object.entries(data).reduce((result, [key, value]) => { if (!keys.includes(key)) { result[key] = value; } return result; }, {}); } function isObject(data) { return !(data instanceof Date) && !!data && typeof data === 'object'; } function sortedObj(obj) { return Object.keys(obj) .sort() .reduce((result, key) => { const value = obj[key]; if (Array.isArray(value)) { result[key] = value.map((v) => sortedObj(v)); } else if (isObject(value)) { result[key] = sortedObj(value); } else { result[key] = value; } return result; }, {}); } function normalizePattern(pattern) { return JSON.stringify(sortedObj(pattern)); } class KafkaConsumer { #options; #clientConsumers = {}; #patterns = []; #logger = new common.Logger('KafkaConsumer'); #messageHandlers = new Map(); constructor(props) { this.#options = props; } static createService(params) { return { strategy: new KafkaConsumer(params), }; } addHandler(pattern, callback, _isEventHandler = false, _extras = {}) { if (typeof pattern === 'object' && pattern.isKafka) { pattern = omit(pattern, ['isKafka']); this.#patterns.push(pattern); } const normalizedPattern = normalizePattern(pattern); this.#messageHandlers.set(normalizedPattern, callback); } async listen(callback) { const patterns = this.#patterns; const handleMessage = this.#handleMessage.bind(this); await this.#initClientConsumers(this.#options); patterns.forEach(async (pattern) => { const { type, topics, options = {}, fromBeginning, clientId = core.DEFAULT_CLIENT, } = pattern; const consumer = this.#clientConsumers[clientId]; if (!consumer) { return this.#logger.error(`Unknown client ${clientId}`); } await consumer.subscribe({ topics, fromBeginning, }); let providedOpts = {}; if (type === 'batch_message') { providedOpts = { eachBatch: async ({ batch, ...rest }) => { const { messages } = batch; const context = new core.KafkaBatchMessageContext({ ...rest, batch, }); const formattedMessages = messages.map((message) => { return core.parseMessage(message); }); return handleMessage(formattedMessages, context, pattern); }, }; } else { providedOpts = { eachMessage: async ({ message, ...rest }) => { const context = new core.KafkaMessageContext({ ...rest, message, }); return handleMessage(core.parseMessage(message), context, pattern); }, }; } consumer.run({ ...options, ...providedOpts, }); }); callback(); } async close() { Object.values(this.#clientConsumers).forEach((consumer) => { consumer.disconnect(); }); } async #handleMessage(message, context, pattern) { if (message === null) { return; } const patternAsString = normalizePattern(pattern); const handler = this.#messageHandlers.get(patternAsString); if (!handler) { return this.#logger.error(`There is no matching event handler defined in the consumer. Event pattern: ${patternAsString}`); } return handler(message, context); } async #initClientConsumers(options) { this.#clientConsumers = await createConsumers(options); } } Object.defineProperty(exports, 'KafkaBatchMessageContext', { enumerable: true, get: function () { return core.KafkaBatchMessageContext; } }); Object.defineProperty(exports, 'KafkaMessageContext', { enumerable: true, get: function () { return core.KafkaMessageContext; } }); exports.Ctx = Ctx; exports.KafkaConsumer = KafkaConsumer; exports.Payload = Payload; exports.Subscribe = Subscribe; exports.SubscribeMessage = SubscribeMessage;