// src/streaming/streamManager.ts
import { StreamProcessingConfig, PipelineContext, IStreamProcessor } from '../core/interfaces';
import { createLogger } from '../utils/logger';
import { Logger } from 'pino';
import { ConfigurationError, PipelineRunError } from '../core/errors';
import { IStreamSourceAdapter, IStreamingFlowNode } from './interfaces';
import { v4 as uuidv4 } from 'uuid'; // Use uuid for run IDs
import { KafkaAdapter } from './kafkaAdapter';
import { RedisStreamAdapter } from './redisStreamAdapter';

// MARK: - Manager
export class StreamManager implements IStreamingFlowNode {
    readonly id: string;
    readonly type = '@flowlab/data:stream-processor';

    private config: StreamProcessingConfig | null = null;
    private context: PipelineContext | null = null;
    private logger: Logger;
    private adapter: IStreamSourceAdapter | null = null;
    private isRunning: boolean = false;

    
    constructor(id: string, logger?: Logger) {
        this.id = id || `stream-manager-${uuidv4()}`;
        this.logger = logger || createLogger().child({ streamManagerId: this.id });
    }
    
    // MARK: - configure
    configure(config: StreamProcessingConfig): void {
        this.config = config;
        this.logger.info({ config: { ...config, processor: '[IStreamProcessor]' } }, 'StreamManager configured.');

         // Instantiate the correct adapter based on config.source.type
         switch(config.source.type) {
            case 'kafka':
                 this.adapter = new KafkaAdapter();
                 break;
            case 'redis-streams':
                 this.adapter = new RedisStreamAdapter();
                 break;
             // Add other types like RabbitMQ, MQTT etc. here
             default:
                 throw new ConfigurationError(`Unsupported stream source type: ${config.source.type}`);
         }
         this.logger.info(`Using stream source adapter: ${this.adapter?.constructor.name}`);
    }

    // MARK: - start
    async start(): Promise<void> {
        if (this.isRunning) {
            this.logger.warn('StreamManager is already running.');
            return;
        }
        if (!this.config || !this.adapter || !this.config.processor) {
            throw new ConfigurationError('StreamManager is not configured. Call configure() first.');
        }

        this.isRunning = true;
        const runId = uuidv4();
        this.context = {
            logger: this.logger.child({ runId }),
            runId,
            streamConfig: this.config // Add config to context?
        };

        this.logger.info({ runId }, 'Starting StreamManager...');

        try {
             // Initialize processor and adapter
             if (this.config.processor.initialize) {
                 await this.config.processor.initialize(this.context);
             }
             await this.adapter.initialize(this.config.source.config, this.context.logger);

             // Start consuming - this call typically runs indefinitely until stopped or an error occurs
             await this.adapter.consume(this.config.processor, this.context);

             this.logger.info({ runId }, 'Stream consumption finished (adapter stopped).');

        } catch (error: any) {
             this.logger.error({ err: error, runId }, 'StreamManager encountered a fatal error during startup or consumption.');
             this.isRunning = false; // Ensure state is updated on error
             // Rethrow or handle? Depends on desired behavior.
             throw new PipelineRunError('Stream processing failed', this.id, runId, error);
        } finally {
             // We might not reach here if consume runs forever, shutdown logic is important
             if (this.isRunning) { // If it finished unexpectedly
                  await this.stopInternal();
             }
        }
    }

    // MARK: - stop
    async stop(): Promise<void> {
         if (!this.isRunning) {
             this.logger.warn('StreamManager is not running.');
             return;
         }
         this.logger.info('Stopping StreamManager...');
         await this.stopInternal();
         this.logger.info('StreamManager stopped.');
    }

    // MARK: - stopInternal
    private async stopInternal(): Promise<void> {
         if (!this.isRunning) return;

         // Signal the adapter to stop consuming
         if (this.adapter) {
             try {
                 await this.adapter.shutdown();
             } catch (error) {
                 this.logger.error({ err: error }, 'Error shutting down stream source adapter.');
             }
         }

         // Shutdown the processor
         if (this.config?.processor.shutdown && this.context) {
             try {
                 await this.config.processor.shutdown(this.context);
             } catch (error) {
                 this.logger.error({ err: error }, 'Error shutting down stream processor.');
             }
         }

         this.isRunning = false;
         this.context = null;
         this.adapter = null; // Release adapter instance?
    }
}