// src/streaming/redisStreamAdapter.ts
import { Redis as IORedisClient, RedisOptions } from 'ioredis';
import { IStreamSourceAdapter } from './interfaces';
import { IStreamProcessor, PipelineContext } from '../core/interfaces';
import { Logger } from 'pino';
import { ConfigurationError, PipelineRunError } from '../core/errors';

// Define expected config structure for Redis Streams
interface RedisStreamAdapterConfig {
    connectionOptions: RedisOptions; // ioredis connection options
    streamKey: string;
    groupName: string;
    consumerName: string;
    blockMs?: number; // BLOCK timeout in ms for XREADGROUP
    count?: number; // COUNT for XREADGROUP
    // Option for auto-claiming pending messages?
    claimMinIdleTimeMs?: number;
    autoCreateGroup?: boolean; // Attempt to create consumer group if it doesn't exist
}

// Helper to parse Redis Stream message format [messageId, [key1, value1, key2, value2, ...]]
function parseRedisStreamMessage(rawMessage: [string, string[]]): { id: string; data: Record<string, string> } | null {
     if (!rawMessage || rawMessage.length < 2) return null;
     const id = rawMessage[0];
     const rawData = rawMessage[1];
     const data: Record<string, string> = {};
     if (rawData && rawData.length % 2 === 0) {
         for (let i = 0; i < rawData.length; i += 2) {
             data[rawData[i]] = rawData[i + 1];
         }
     }
     return { id, data };
}


export class RedisStreamAdapter implements IStreamSourceAdapter {
    // Use separate clients: one for blocking reads, one for other commands (like ack, claim)
    private blockingClient: IORedisClient | null = null;
    private commandClient: IORedisClient | null = null;
    private logger: Logger = {} as Logger;
    private config: RedisStreamAdapterConfig = {} as RedisStreamAdapterConfig;
    private processor: IStreamProcessor<ReturnType<typeof parseRedisStreamMessage>, any> | null = null;
    private pipelineContext: PipelineContext | null = null;
    private isConsuming: boolean = false;
    private shouldStop: boolean = false; // Flag to signal the read loop to exit

    async initialize(config: any, logger: Logger): Promise<void> {
        this.logger = logger.child({ adapter: 'RedisStreamAdapter' });
        this.config = config as RedisStreamAdapterConfig; // Add validation

        if (!this.config.connectionOptions || !this.config.streamKey || !this.config.groupName || !this.config.consumerName) {
             throw new ConfigurationError('RedisStreamAdapter requires "connectionOptions", "streamKey", "groupName", and "consumerName".');
        }
        this.config.blockMs = this.config.blockMs ?? 5000; // Default block 5s
        this.config.count = this.config.count ?? 10; // Default count 10
        this.config.autoCreateGroup = this.config.autoCreateGroup !== false; // Default true

        this.logger.info({ stream: this.config.streamKey, group: this.config.groupName }, 'Initializing Redis Stream adapter...');
        try {
             // Blocking client might need different options if applicable (e.g., readOnly replicas)
             this.blockingClient = new IORedisClient(this.config.connectionOptions);
             this.commandClient = new IORedisClient(this.config.connectionOptions);

             // Add error listeners
             this.blockingClient.on('error', (err) => this.logger.error({ err }, 'Redis blocking client error'));
             this.commandClient.on('error', (err) => this.logger.error({ err }, 'Redis command client error'));

             await Promise.all([this.blockingClient.ping(), this.commandClient.ping()]); // Test connection

             // Optionally create consumer group if it doesn't exist
             if (this.config.autoCreateGroup) {
                 await this.createGroupIfNeeded();
             }

             this.logger.info('Redis clients connected and group checked/created.');
        } catch (error: any) {
             this.logger.error({ err: error }, 'Failed to initialize Redis clients or check group.');
             await this.cleanupClients(); // Ensure clients are closed on init failure
             throw new ConfigurationError('Failed to initialize Redis Streams', undefined, error.message);
        }
    }

    private async createGroupIfNeeded(): Promise<void> {
         if (!this.commandClient) return;
         try {
             // MKSTREAM creates stream+group, ignores error if group exists
             await this.commandClient.xgroup('CREATE', this.config.streamKey, this.config.groupName, '$', 'MKSTREAM');
             this.logger.info(`Ensured Redis consumer group "<span class="math-inline">\{this\.config\.groupName\}" exists for stream "</span>{this.config.streamKey}".`);
         } catch (error: any) {
              // Ignore 'BUSYGROUP Consumer Group name already exists' error
             if (error.message.includes('BUSYGROUP')) {
                  this.logger.info(`Redis consumer group "${this.config.groupName}" already exists.`);
             } else {
                  this.logger.error({ err: error }, `Failed to create Redis consumer group "${this.config.groupName}".`);
                  throw error; // Rethrow other errors
             }
         }
    }


    async consume(processor: IStreamProcessor<ReturnType<typeof parseRedisStreamMessage>, any>, context: PipelineContext): Promise<void> {
        if (!this.blockingClient || !this.commandClient) {
            throw new PipelineRunError('Redis clients not initialized.', context?.pipelineId, context?.runId);
        }
        if (this.isConsuming) {
            this.logger.warn('Redis Stream adapter is already consuming.');
            return;
        }

        this.processor = processor;
        this.pipelineContext = context;
        this.isConsuming = true;
        this.shouldStop = false;
        this.logger.info({ stream: this.config.streamKey, group: this.config.groupName, consumer: this.config.consumerName }, 'Starting Redis Stream consumption...');

        // Read loop
        while (this.isConsuming && !this.shouldStop) {
            try {
                // 1. TODO: Optionally claim old pending messages from other consumers
                // await this.claimPendingMessages();

                // 2. Read new messages for this consumer
                // '>' ID means only new messages never delivered to any consumer in this group
                const response = await this.blockingClient.xreadgroup(
                    'GROUP', this.config.groupName, this.config.consumerName,
                    'COUNT', this.config.count!,
                    'BLOCK', this.config.blockMs!,
                    'STREAMS', this.config.streamKey, '>' // Read new messages
                );

                // Check response format: [[streamName, [[messageId, [data]], ...]]] or null on timeout/empty
                if (!response || response.length === 0 || !response[0] || response[0].length < 2) {
                    // Block timeout expired, no new messages
                    this.logger.trace('XREADGROUP timed out or returned no new messages.');
                    continue; // Loop again
                }

                const messages = response[0][1]; // Array of [messageId, [data]]
                this.logger.debug(`Received ${messages.length} messages from stream ${this.config.streamKey}`);

                for (const rawMessage of messages) {
                     if (this.shouldStop) break; // Check stop flag between messages

                     const parsedMessage = parseRedisStreamMessage(rawMessage);
                     if (!parsedMessage) {
                          this.logger.warn({ rawMessage }, 'Failed to parse Redis stream message.');
                          continue;
                     }

                     const childLogger = this.pipelineContext.logger.child({ stream: this.config.streamKey, messageId: parsedMessage.id });
                     const messageContext = { ...this.pipelineContext, logger: childLogger };

                     try {
                         const result = await this.processor.process(parsedMessage, messageContext);
                          // TODO: Handle result if batching is needed

                          // Acknowledge the message
                          await this.commandClient.xack(this.config.streamKey, this.config.groupName, parsedMessage.id);
                          childLogger.trace({ messageId: parsedMessage.id }, 'Message processed and acknowledged.');

                     } catch (processError: any) {
                          childLogger.error({ err: processError, messageId: parsedMessage.id }, 'Error processing Redis stream message.');
                          if (this.processor.onError) {
                              try {
                                 await this.processor.onError(processError, parsedMessage, messageContext);
                              } catch (onErrorError) {
                                 childLogger.error({ err: onErrorError }, 'Error in processor.onError handler itself.');
                              }
                          }
                          // Decide if we should ACK on error? Probably not. Let it be re-delivered or claimed.
                          // Should we stop consumption on processing error?
                          // For now, continue processing other messages in the batch.
                     }
                } // end for loop messages

            } catch (error: any) {
                // Handle potential client connection errors or command errors
                this.logger.error({ err: error }, 'Error during Redis Stream XREADGROUP loop.');
                 if (!this.isRedisConnected()) {
                     this.logger.error('Redis connection lost. Stopping consumption.');
                     this.isConsuming = false; // Stop the loop
                     // Propagate error to allow StreamManager to handle restart/failure?
                     throw new PipelineRunError('Redis connection lost during consumption', context?.pipelineId, context?.runId, error);
                 } else {
                      // Transient error? Log and continue after a short delay?
                      this.logger.warn('Recoverable error in XREADGROUP loop, pausing before retry...');
                      await new Promise(resolve => setTimeout(resolve, 1000)); // 1s delay
                 }
            }
        } // end while loop

        this.logger.info('Redis Stream consumption loop finished.');
        this.isConsuming = false; // Ensure state is updated
    }


    // TODO: Implement claiming logic if needed
    // private async claimPendingMessages(): Promise<void> { ... }

    private isRedisConnected(): boolean {
        return (this.blockingClient?.status === 'ready' || this.blockingClient?.status === 'connect') &&
               (this.commandClient?.status === 'ready' || this.commandClient?.status === 'connect');
    }

    private async cleanupClients(): Promise<void> {
         const promises: Promise<any>[] = [];
         if (this.blockingClient) promises.push(this.blockingClient.quit().catch(e => this.logger.warn({err: e}, "Error quitting blocking Redis client")));
         if (this.commandClient) promises.push(this.commandClient.quit().catch(e => this.logger.warn({err: e}, "Error quitting command Redis client")));
         await Promise.allSettled(promises);
         this.blockingClient = null;
         this.commandClient = null;
    }

    async shutdown(): Promise<void> {
        if (!this.isConsuming && !this.blockingClient && !this.commandClient) {
            this.logger.warn('Redis Stream adapter already stopped or not initialized.');
            return;
        }
        this.logger.info('Shutting down Redis Stream adapter...');
        this.shouldStop = true; // Signal the consume loop to stop
        this.isConsuming = false; // Prevent new iterations

        // Give the loop a moment to exit gracefully if blocked
        await new Promise(resolve => setTimeout(resolve, 100));

        await this.cleanupClients();

        this.processor = null;
        this.pipelineContext = null;
        this.logger.info('Redis Stream adapter shutdown complete.');
    }
}