'use strict'; var fp = require('fastify-plugin'); var core = require('@kafka-ts/core'); async function createConsumers(options) { if (!Array.isArray(options)) { options = [options]; } const [clientOptions, consumerOptions] = options.reduce((result, option) => { const { consumerOptions, ...rest } = option; result[0].push(rest); result[1][rest.clientId || core.DEFAULT_CLIENT] = consumerOptions; return result; }, [[], {}]); const kafkaClients = await core.createKafkaClients(clientOptions); const kafkaConsumers = await Promise.allSettled(Object.entries(kafkaClients.getAllClients()).map(async ([clientId, client]) => { const consumer = client.consumer(consumerOptions[clientId]); await consumer.connect(); return { clientId, consumer, }; })); return kafkaConsumers.reduce((result, settledKafkaConsumer) => { if (settledKafkaConsumer.status === 'fulfilled') { const { clientId, consumer } = settledKafkaConsumer.value; result[clientId] = consumer; } return result; }, {}); } const kafkaConsumer = async (fastify, options) => { const clientConsumers = await createConsumers(options); const subscribe = async (subscribeOpts, impl, opts) => { const { topics, fromBeginning, type = 'batch_message', clientId = core.DEFAULT_CLIENT, } = subscribeOpts; const consumer = clientConsumers[clientId]; if (!consumer) { throw `Unknown client ${clientId}`; } await consumer.subscribe({ topics, fromBeginning, }); let providedOpts = {}; if (type === 'batch_message') { providedOpts = { eachBatch: async ({ batch, ...rest }) => { const { messages } = batch; const context = new core.KafkaBatchMessageContext({ ...rest, batch, }); const formattedMessages = messages.map((message) => { return core.parseMessage(message); }); return impl(formattedMessages, context); }, }; } else { providedOpts = { eachMessage: async ({ message, ...rest }) => { const context = new core.KafkaMessageContext({ ...rest, message, }); return impl(core.parseMessage(message), context); }, }; } consumer.run({ ...opts, ...providedOpts, }); }; const getConsumer = (clientId = core.DEFAULT_CLIENT) => { return clientConsumers[clientId]; }; fastify.decorate('kafkaConsumer', { subscribe, getConsumer, }); fastify.addHook('onClose', onClose); function onClose() { Object.values(clientConsumers).forEach((consumer) => { consumer.disconnect(); }); } }; var index = fp(kafkaConsumer, { fastify: '>=3', name: '@kafka-ts/fastify-consumer', }); module.exports = index;