// src/streaming/kafkaAdapter.ts
import { Kafka, Consumer, EachMessagePayload, KafkaMessage } from 'kafkajs';
import { IStreamSourceAdapter } from './interfaces';
import { IStreamProcessor, PipelineContext } from '../core/interfaces';
import { Logger } from 'pino';
import { ConfigurationError, PipelineRunError } from '../core/errors';

// Define expected config structure for Kafka
interface KafkaAdapterConfig {
    brokers: string[];
    groupId: string;
    topic: string;
    // Add other KafkaJS consumer config options (sessionTimeout, etc.)
    consumerOptions?: object;
    fromBeginning?: boolean;
}

// MARK: - KafkaAdapter
export class KafkaAdapter implements IStreamSourceAdapter {
    private kafka: Kafka | null = null;
    private consumer: Consumer | null = null;
    private logger: Logger = {} as Logger; // Initialized in initialize
    private config: KafkaAdapterConfig = {} as KafkaAdapterConfig; // Initialized in initialize
    private processor: IStreamProcessor<KafkaMessage, any> | null = null;
    private pipelineContext: PipelineContext | null = null;
    private isConsuming: boolean = false;

    // MARK: - init
    async initialize(config: any, logger: Logger): Promise<void> {
        this.logger = logger.child({ adapter: 'KafkaAdapter' });
        this.config = config as KafkaAdapterConfig; // Add validation here

        if (!this.config.brokers || !this.config.groupId || !this.config.topic) {
             throw new ConfigurationError('KafkaAdapter requires "brokers", "groupId", and "topic" in source config.');
        }

        this.logger.info({ brokers: this.config.brokers, groupId: this.config.groupId }, 'Initializing Kafka adapter...');
        try {
             this.kafka = new Kafka({
                 clientId: `flowlab-data-<span class="math-inline">\{this\.config\.groupId\}\-</span>{Date.now()}`, // Unique client ID
                 brokers: this.config.brokers,
                 // Add SSL/SASL options if needed
             });
             this.consumer = this.kafka.consumer({
                  groupId: this.config.groupId,
                  ...this.config.consumerOptions
             });
             this.logger.info('Kafka client and consumer created.');
        } catch (error: any) {
             this.logger.error({ err: error }, 'Failed to initialize Kafka client/consumer.');
             throw new ConfigurationError('Failed to initialize Kafka', undefined, error.message);
        }
    }

    // MARK: - consume
    async consume(processor: IStreamProcessor<KafkaMessage, any>, context: PipelineContext): Promise<void> {
        if (!this.consumer) {
            throw new PipelineRunError('Kafka consumer not initialized.', context?.pipelineId, context?.runId);
        }
         if (this.isConsuming) {
            this.logger.warn('Kafka adapter is already consuming.');
            return;
        }

        this.processor = processor;
        this.pipelineContext = context;
        this.isConsuming = true;
        this.logger.info({ topic: this.config.topic }, 'Starting Kafka consumption...');

        try {
             await this.consumer.connect();
             this.logger.info('Kafka consumer connected.');
             await this.consumer.subscribe({ topic: this.config.topic, fromBeginning: this.config.fromBeginning || false });
             this.logger.info(`Subscribed to topic: ${this.config.topic}`);

             await this.consumer.run({
                 eachMessage: this.handleMessage.bind(this),
                 // Add eachBatch handler for batch processing if preferred
             });

             // Normally, consumer.run blocks until consumer is stopped or disconnected.
             // If it returns, it might mean consumption stopped gracefully or an error occurred handled internally by kafkajs.
             this.logger.info('Kafka consumer run loop finished.');

        } catch (error: any) {
             this.logger.error({ err: error, topic: this.config.topic }, 'Error during Kafka consumption setup or run loop.');
             this.isConsuming = false; // Ensure state is reset on error
             throw new PipelineRunError('Kafka consumption failed', context?.pipelineId, context?.runId, error);
        }
        // Note: No finally block to reset isConsuming here, as run() should block.
        // Resetting happens in shutdown or if run() throws.
    }

    // MARK: - handleMessage
    private async handleMessage(payload: EachMessagePayload): Promise<void> {
         if (!this.processor || !this.pipelineContext) {
             this.logger.error('Processor or context not available during message handling. This should not happen.');
             // Potentially stop the consumer here?
             return;
         }
         const { topic, partition, message } = payload;
         const childLogger = this.pipelineContext.logger.child({ topic, partition, offset: message.offset });
         const messageContext = { ...this.pipelineContext, logger: childLogger }; // Use child logger for message context

         childLogger.trace({ offset: message.offset }, 'Received Kafka message.');

         try {
            const result = await this.processor.process(message, messageContext);
             // TODO: Handle result (e.g., if batching is done within the processor via handleBatch)
             // Basic example: Just log completion
             childLogger.debug({ offset: message.offset, result: result !== null && result !== undefined }, 'Message processed.');

             // Implicit commit happens based on consumer config unless using manual commits
         } catch (error: any) {
              childLogger.error({ err: error, offset: message.offset }, 'Error processing Kafka message.');
              if (this.processor.onError) {
                  try {
                     await this.processor.onError(error, message, messageContext);
                     // Decide if processing should continue or consumer should stop/pause
                  } catch (onErrorError) {
                      childLogger.error({ err: onErrorError }, 'Error in processor.onError handler itself.');
                  }
              }
              // Should we stop the consumer on processing errors? Depends on requirements.
              // Rethrowing here might stop the consumer depending on kafkajs config.
              // throw error;
         }
    }

    // MARK: - shutdown
    async shutdown(): Promise<void> {
         if (!this.isConsuming && !this.consumer) {
             this.logger.warn('Kafka adapter already stopped or not initialized.');
             return;
         }
         this.logger.info('Shutting down Kafka adapter...');
         this.isConsuming = false; // Signal intention to stop
         try {
             if (this.consumer) {
                await this.consumer.stop(); // Stop the run loop first
                await this.consumer.disconnect();
                this.logger.info('Kafka consumer stopped and disconnected.');
             }
         } catch (error: any) {
              this.logger.error({ err: error }, 'Error stopping/disconnecting Kafka consumer.');
              // Continue shutdown even if consumer fails to stop cleanly
         } finally {
             this.consumer = null;
             this.kafka = null; // Allow GC
             this.processor = null;
             this.pipelineContext = null;
         }
         this.logger.info('Kafka adapter shutdown complete.');
    }
}