// src/extractors/redisExtractor.ts
import { Redis as IORedisClient } from 'ioredis';
import { IExtractor, PipelineContext, DataSource, DatabaseSourceConfig, RedisConnection } from '../core/interfaces';
import { ComponentError } from '../core/errors';

export class RedisExtractor<TOutput> implements IExtractor<TOutput> {
    private config: DatabaseSourceConfig;
    private client: IORedisClient;
    private matchPattern: string;
    private keyType: 'string' | 'hash' | 'list'; // Add 'set', 'zset' etc. if needed
    private scanCount: number;

    constructor(config: DatabaseSourceConfig) {
        if (config.type !== 'redis' || !(config.connection as RedisConnection)?.client) {
             throw new ComponentError('RedisExtractor requires config type "redis" and "connection" object with "client" (ioredis Redis instance).');
        }
        this.config = config;
        const redisConn = config.connection as RedisConnection;
        this.client = redisConn.client;
        this.matchPattern = config.redisScanMatch || '*'; // Default to all keys
        this.keyType = config.redisKeyType || 'string'; // Default to simple strings
        this.scanCount = config.batchSize || 100; // Use batchSize for SCAN count hint
    }

    async extract(context: PipelineContext): Promise<DataSource<TOutput>> {
        context.logger.info({ pattern: this.matchPattern, type: this.keyType }, `Extracting data from Redis using SCAN (pattern: ${this.matchPattern}, type: ${this.keyType})`);

        // Use AsyncIterable with SCAN
        return this.extractWithScan(context);
    }

    private async *extractWithScan(context: PipelineContext): AsyncIterable<TOutput> {
        let cursor = '0';
        let keysProcessed = 0;

        try {
            do {
                context.logger.trace(`Scanning Redis keys with cursor ${cursor}, pattern ${this.matchPattern}`);
                const scanResult = await this.client.scan(cursor, 'MATCH', this.matchPattern, 'COUNT', this.scanCount);
                cursor = scanResult[0]; // New cursor
                const keys = scanResult[1]; // Keys found in this iteration

                if (keys.length > 0) {
                    context.logger.debug(`Found ${keys.length} keys in SCAN iteration.`);
                    // Fetch values based on keyType
                    switch (this.keyType) {
                        case 'string':
                            // Use MGET for efficiency if possible, handle potential nulls for non-existent keys
                            const values = await this.client.mget(keys);
                            for (let i = 0; i < keys.length; i++) {
                                 if (values[i] !== null) {
                                     // Attempt to parse if it looks like JSON, otherwise return as string
                                     try {
                                         yield JSON.parse(values[i] as string) as TOutput;
                                     } catch {
                                         yield values[i] as unknown as TOutput;
                                     }
                                     keysProcessed++;
                                 }
                            }
                            break;
                        case 'hash':
                            // Fetch hashes one by one (or use pipeline for batching)
                            for (const key of keys) {
                                 const hashData = await this.client.hgetall(key);
                                 if (hashData && Object.keys(hashData).length > 0) {
                                     yield hashData as unknown as TOutput; // Assuming TOutput is Record<string, string>
                                     keysProcessed++;
                                 }
                            }
                            break;
                         case 'list':
                             // Fetch lists one by one
                             for (const key of keys) {
                                 const listData = await this.client.lrange(key, 0, -1); // Get all elements
                                 if (listData && listData.length > 0) {
                                     // Yield the whole list? Or individual items? Depends on desired output.
                                     yield listData as unknown as TOutput; // Assuming TOutput is string[]
                                     keysProcessed++;
                                 }
                             }
                             break;
                        // Add cases for 'set', 'zset' if needed
                        default:
                            context.logger.warn(`Unsupported Redis key type for extraction: ${this.keyType}. Skipping keys.`);
                    }
                }
            } while (cursor !== '0'); // Continue until SCAN cursor returns to 0

            context.logger.info(`Finished Redis SCAN. Total keys processed matching type '${this.keyType}': ${keysProcessed}`);
        } catch (error: any) {
             context.logger.error({ err: error, pattern: this.matchPattern, cursor }, `Error during Redis SCAN operation`);
             throw new ComponentError(`Error during Redis SCAN`, 'RedisExtractor', error);
        }
    }
}