'use strict'; var fp = require('fastify-plugin'); var core = require('@kafka-ts/core'); async function createProducers(options) { if (!Array.isArray(options)) { options = [options]; } const [clientOptions, producerOptions] = options.reduce((result, option) => { const { producerOptions, ...rest } = option; result[0].push(rest); result[1][rest.clientId || core.DEFAULT_CLIENT] = producerOptions || {}; return result; }, [[], {}]); const kafkaClients = await core.createKafkaClients(clientOptions); const kafkaProducers = await Promise.allSettled(Object.entries(kafkaClients.getAllClients()).map(async ([clientId = core.DEFAULT_CLIENT, client]) => { const { createPartitioner, ...rest } = producerOptions[clientId]; const producer = client.producer({ ...rest, createPartitioner: createPartitioner || core.Partitioners.LegacyPartitioner, }); await producer.connect(); return { clientId, producer, }; })); return kafkaProducers.reduce((result, settledKafkaProducer) => { if (settledKafkaProducer.status === 'fulfilled') { const { clientId, producer } = settledKafkaProducer.value; result[clientId] = producer; } return result; }, {}); } const kafkaProducer = async (fastify, options) => { const clientProducers = await createProducers(options); const publish = async ({ clientId = core.DEFAULT_CLIENT, topicMessages = [], ...rest }) => { const producer = clientProducers[clientId]; if (!producer) { throw `Unknown client ${clientId}`; } if (Array.isArray(topicMessages)) { return producer.sendBatch({ ...rest, topicMessages, }); } return producer.send({ ...rest, ...topicMessages, }); }; const getProducer = (clientId = core.DEFAULT_CLIENT) => { return clientProducers[clientId]; }; fastify.decorate('kafkaProducer', { publish, getProducer, }); fastify.addHook('onClose', onClose); function onClose() { Object.values(clientProducers).forEach((producer) => { producer.disconnect(); }); } }; var index = fp(kafkaProducer, { fastify: '>=3', name: '@kafka-ts/fastify-producer', }); module.exports = index;