// src/core/pipeline.ts
import {
  IExtractor,
  ICleaner,
  ITransformer,
  ILoader,
  DataSource,
  PipelineContext,
  IFlowLabNode,
  IFlowLabRegistry,
  BaseSourceConfig,
  BaseTargetConfig,
  SourceConfig,
  TargetConfig,
  FailedItemInfo,
  ProcessingErrorStrategy,
  FileTargetConfig,
} from './interfaces';
import { createLogger } from '../utils/logger';
import { performRetry } from '../utils/retry';
// import { v4 as uuidv4 } from 'uuid'; // Use uuid for run IDs
import { Logger } from 'pino';
import { loadPipelineFromConfig as loadConfig } from '../config/loader';
import pLimit from 'p-limit'; // Import p-limit
import { FileDlqLoader } from '../loaders/fileDlqLoader'; 
import { createLoader as createComponentLoader } from './registry';



// Helper type for pipeline steps
type PipelineStep<TInput, TOutput> = ICleaner<TInput> | ITransformer<TInput, TOutput>;

// Type guards
function isCleaner<T>(step: any): step is ICleaner<T> {
  return typeof step.clean === 'function';
}

function isTransformer<TInput, TOutput>(step: any): step is ITransformer<TInput, TOutput> {
  return typeof step.transform === 'function';
}

function isAsyncIterable<T>(dataSource: DataSource<T>): dataSource is AsyncIterable<T> {
  return typeof (dataSource as any)[Symbol.asyncIterator] === 'function';
}


// MARK: - DataPipeline
export class DataPipeline<TStart, TEnd> implements IFlowLabNode {
  readonly id: string; // Can be user-defined or generated
  readonly type = '@flowlab/data:pipeline';

  private extractorInstance: IExtractor<TStart> | null = null;
  private steps: PipelineStep<any, any>[] = [];
  private loaderInstance: ILoader<TEnd> | null = null;
  private config: {
      batchSize: number;
      retries: number;
      retryDelay: number;
      logger: Logger;
      concurrency: number; // Add concurrency setting
      itemProcessingErrorStrategy: ProcessingErrorStrategy;
      dlqLoaderInstance?: ILoader<FailedItemInfo>; 
  };

  // Track types through the chain - using 'any' here for simplicity in the builder,
  // but the run method ensures type flow based on step order. A more complex builder
  // could maintain stricter compile-time types between steps.
  private currentOutputType: any; // Internal helper


  constructor(id?: string, options?: { logger?: Logger }) {
    this.id = id || `pipeline-${uuidv4()}`;
    this.config = {
        batchSize: 100,
        retries: 3,
        retryDelay: 1000,
        logger: options?.logger || createLogger(),
        concurrency: 1, // Default to sequential processing
        itemProcessingErrorStrategy: 'fail', // Default to failing the pipeline on item error
        dlqLoaderInstance: undefined,
    };
      this.currentOutputType = null; // Placeholder
  }

  /**
   * MARK: - Configure
   * Configure pipeline options.
   */
  configure(options: Partial<typeof this.config> & { dlqTarget?: TargetConfig }): this {
    const { dlqTarget, ...pipelineOptions } = options;
    this.config = { ...this.config, ...pipelineOptions };
    if (dlqTarget) {
        try {
           // Use a simple file DLQ loader for now, could be made dynamic
        //TO chinese: 使用一个简单的文件DLQ加载器，可以动态创建
           if (dlqTarget.type === 'file-dlq') { // Use a specific type for DLQ file
               this.config.dlqLoaderInstance = new FileDlqLoader(dlqTarget as FileTargetConfig);
               this.config.logger.info(`Configured File DLQ Loader target: ${dlqTarget.path}`);
           } else {
                // Allow registering other DLQ loader types via registry if needed
                // this.config.dlqLoaderInstance = createComponentLoader(dlqTarget);
                //TO chinese: 允许通过注册表动态创建其他DLQ加载器类型，如果需要
                this.config.logger.warn(`Unsupported dlqTarget type: ${dlqTarget.type}. Only 'file-dlq' supported currently.`);
                // Fallback strategy if DLQ loader fails?
                // 如果DLQ加载器失败，回退策略是什么？
                if (this.config.itemProcessingErrorStrategy === 'dlq') {    
                    this.config.itemProcessingErrorStrategy = 'fail';
                    this.config.logger.warn("DLQ configured but loader couldn't be created. Defaulting item error strategy to 'fail'.");
                }
           }
           // Ensure DLQ strategy is set if target is provided
           if (!pipelineOptions.itemProcessingErrorStrategy) {
                this.config.itemProcessingErrorStrategy = 'dlq'; // Default to DLQ if target is set
           } else if (pipelineOptions.itemProcessingErrorStrategy !== 'dlq') {
                 this.config.logger.warn(`DLQ target configured, but itemProcessingErrorStrategy is set to '${pipelineOptions.itemProcessingErrorStrategy}'. DLQ will not be used.`);
           }

       } catch (error) {
           this.config.logger.error({ err: error }, "Failed to instantiate DLQ loader. DLQ strategy disabled.");
            if (this.config.itemProcessingErrorStrategy === 'dlq') {
                this.config.itemProcessingErrorStrategy = 'fail'; // Fallback needed
            }
       }
   } else if (this.config.itemProcessingErrorStrategy === 'dlq') {
        this.config.itemProcessingErrorStrategy = 'fail'; // Can't use DLQ strategy without a target
        this.config.logger.warn("itemProcessingErrorStrategy set to 'dlq' but no dlqTarget provided. Defaulting to 'fail'.");
   }

  return this;
  }


  /**
   * MARK: - handleProcessingError
   * Handle processing errors.
   */
  private async handleProcessingError(
    error: Error,
    item: any,
    stepName: string,
    context: PipelineContext
    ): Promise<boolean> { // Returns true if pipeline should continue, false if it should fail
        context.logger.error({ err: error, item, step: stepName }, `Error during pipeline step: ${stepName}`);

        const strategy = this.config.itemProcessingErrorStrategy;

        if (strategy === 'dlq' && this.config.dlqLoaderInstance) {
            const failedInfo: FailedItemInfo = {
                pipelineId: this.id,
                runId: context.runId,
                timestamp: new Date().toISOString(),
                step: stepName,
                error: {
                    message: error.message,
                    stack: error.stack,
                    name: error.name,
                },
                originalItem: item, // Be careful with large items
            };
            try {
                // Load batch takes an array
                await this.config.dlqLoaderInstance.loadBatch([failedInfo], context);
                context.logger.warn(`Item sent to DLQ due to processing error in step ${stepName}.`);
                return true; // Continue pipeline
            } catch (dlqError) {
                context.logger.error({ err: dlqError }, `Failed to send item to DLQ! Falling back to 'fail' strategy for this run.`);
                return false; // Fail pipeline if DLQ fails
            }
        } else if (strategy === 'log') {
            context.logger.warn(`Skipping item due to processing error in step ${stepName} (strategy: log).`);
            return true; // Continue pipeline
        } else if (strategy === 'skip') {
            context.logger.info(`Skipping item due to processing error in step ${stepName} (strategy: skip).`);
            return true; // Continue pipeline
        } else { // strategy === 'fail'
            context.logger.error(`Pipeline failed due to processing error in step ${stepName} (strategy: fail).`);
            return false; // Fail the pipeline
        }
    }


  /**
   * MARK: - Extractor
   * Set the data extractor.
   */
  extract<TOutput>(extractor: IExtractor<TOutput>): DataPipeline<TOutput, TOutput> { // Reset TEnd initially
      if (this.extractorInstance) {
          this.config.logger.warn({ pipelineId: this.id }, 'Extractor is already set. Overwriting.');
      }
      this.extractorInstance = extractor as unknown as IExtractor<TStart>; // Cast needed due to generic reset
      this.currentOutputType = null; // Reset output type marker
       // Explicitly cast the return type to reflect the new TStart and TEnd
      return this as unknown as DataPipeline<TOutput, TOutput>;
  }

   /**
   * MARK: - Cleaning
   * Add a cleaning step. The output type remains the same.
   */
  clean<TCurrent>(cleaner: ICleaner<TCurrent>): DataPipeline<TStart, TEnd> {
      // Basic type check simulation based on prior step (won't catch all errors at compile time here)
      // if (this.currentOutputType && /* check compatibility */) { throw new Error('Type mismatch'); }
      this.steps.push(cleaner);
      // Cleaner doesn't change the core type, just filters or modifies in place
      return this;
  }

  /**
   * MARK: - Transformation
   * Add a transformation step. Changes the current data type.
   * We need 'any' here in the builder pattern unless we use more complex generic chaining.
   */
  transform<TCurrent, TNext>(transformer: ITransformer<TCurrent, TNext>): DataPipeline<TStart, any> {
      // Basic type check simulation
      // if (this.currentOutputType && /* check compatibility */) { throw new Error('Type mismatch'); }
      this.steps.push(transformer);
      this.currentOutputType = null; // Mark type as changed, could store TNext constructor potentially
      // The TEnd type becomes unknown until load is called
      return this as unknown as DataPipeline<TStart, any>;
  }

   /**
   * MARK: - Loading
   * Set the data loader. This defines the final TEnd type for the pipeline.
   */
  load<TCurrent>(loader: ILoader<TCurrent>): DataPipeline<TStart, TCurrent> {
      // Basic type check simulation
       // if (this.currentOutputType && /* check compatibility */) { throw new Error('Type mismatch'); }
      if (this.loaderInstance) {
          this.config.logger.warn({ pipelineId: this.id }, 'Loader is already set. Overwriting.');
      }
      this.loaderInstance = loader as unknown as ILoader<TEnd>; // Cast needed
      // Explicitly cast the return type to set the final TEnd
      return this as unknown as DataPipeline<TStart, TCurrent>;
  }


  /**
   * MARK: - Execution
   * Execute the pipeline.
   * This is the core logic where data flows through steps.
   */
  async run(): Promise<void> {
      const runId = uuidv4();
      const context: PipelineContext = {
          logger: this.config.logger.child({ pipelineId: this.id, runId }),
          runId: runId,
          // Add any other initial context
      };

      context.logger.info('Pipeline run started.');

      if (!this.extractorInstance) throw new Error('Extractor is not defined for pipeline.');
      if (!this.loaderInstance) throw new Error('Loader is not defined for pipeline.');

      let processedCount = 0;
      let loadedCount = 0;
      let errorCount = 0;
      let currentBatch: TEnd[] = []; // Batch for the loader

      try {
          const dataSource = await performRetry(
              () => this.extractorInstance!.extract(context),
              this.config.retries,
              this.config.retryDelay,
              context.logger,
              "Extractor"
          );

          const processItem = async (item: TStart): Promise<TEnd | null | undefined> => {
               let currentData: any = item; // Start with the extracted item

               for (const step of this.steps) {
                   if (currentData === null || currentData === undefined) {
                       return null; // Skip remaining steps if item was filtered out
                   }

                   try {
                       if (isCleaner(step)) {
                           currentData = await step.clean(currentData, context);
                       } else if (isTransformer(step)) {
                           currentData = await step.transform(currentData, context);
                       }
                   } catch (error: any) {
                       context.logger.error({ err: error, item }, `Error during pipeline step ${step.constructor.name}`);
                       throw error; // Propagate error to main handler for the item
                   }
               }
               return currentData as TEnd; // Final type after all steps
          };


          const loadBatchIfNeeded = async () => {
              if (currentBatch.length > 0) {
                  context.logger.debug(`Loading batch of ${currentBatch.length} items.`);
                  await performRetry(
                      () => this.loaderInstance!.loadBatch(currentBatch, context),
                      this.config.retries,
                      this.config.retryDelay,
                      context.logger,
                      "Loader"
                  );
                  loadedCount += currentBatch.length;
                  currentBatch = []; // Clear the batch
              }
          };

          // Process data (handles both arrays and async iterables)
          if (isAsyncIterable(dataSource)) {
               context.logger.info('Processing data as AsyncIterable (stream/batched extraction).');
               for await (const item of dataSource) {
                  processedCount++;
                  try {
                      const result = await processItem(item);
                      if (result !== null && result !== undefined) {
                          currentBatch.push(result);
                          if (currentBatch.length >= this.config.batchSize) {
                              await loadBatchIfNeeded();
                          }
                      }
                  } catch (error) {
                       errorCount++;
                       // Decide how to handle item processing errors (e.g., log, send to DLQ)
                       context.logger.error({ err: error, item, runId }, 'Failed to process item.');
                       // Continue processing next item? Or fail the run? For now, just log and count.
                  }
                  if (processedCount % 1000 === 0) { // Log progress periodically
                     context.logger.info(`Processed ${processedCount} items...`);
                  }
               }
          } else { // Handle simple array input
               context.logger.info(`Processing data as an Array (${dataSource.length} items).`);
               for (const item of dataSource) {
                   processedCount++;
                    try {
                       const result = await processItem(item);
                        if (result !== null && result !== undefined) {
                           currentBatch.push(result);
                           if (currentBatch.length >= this.config.batchSize) {
                               await loadBatchIfNeeded();
                           }
                       }
                   } catch (error) {
                       errorCount++;
                       context.logger.error({ err: error, item, runId }, 'Failed to process item.');
                   }
               }
          }

          // Load any remaining items in the last batch
          await loadBatchIfNeeded();

          context.logger.info(
              `Pipeline run finished. Processed: ${processedCount}, Loaded: ${loadedCount}, Errors: ${errorCount}`
          );

      } catch (error: any) {
          context.logger.error({ err: error, runId }, 'Pipeline run failed.');
          // Rethrow or handle the fatal error appropriately
          throw error;
      }
  }

  // --- IFlowLabNode Implementation ---

  /**
   * MARK: - execute
   * Basic execution for FlowLab integration. Assumes payload might contain
   * overrides or context, but primarily runs the predefined pipeline.
   */
  async execute(payload?: any, context?: any): Promise<void> {
      this.config.logger.info({ payload, context }, `Executing pipeline node ${this.id} via FlowLab.`);
      // Potentially merge payload/context into pipeline context or config
      await this.run();
  }

  /**
   * Static method to create a pipeline from a configuration object.
   * This requires a way to instantiate components based on config strings.
   * (Requires a registry or factory pattern - more complex setup)
   */
  // MARK: - fromConfig
  static fromConfig(config: /* PipelineConfig from interfaces.ts */ any, componentRegistry: any): DataPipeline<any, any> {
       // TODO: Implement logic to:
       // 1. Parse config
       // 2. Instantiate Extractor based on config.source and registry
       // 3. Instantiate Steps based on config.steps and registry
       // 4. Instantiate Loader based on config.target and registry
       // 5. Configure pipeline options
       throw new Error("fromConfig not yet implemented. Requires component registry.");
  }

  /**
   * MARK: - register
   * Registers the pipeline itself as a node.
   * Individual components (extractors, loaders) could also be registered.
   */
  register(registry: IFlowLabRegistry): void {
      registry.register(this);
      this.config.logger.info(`Registered pipeline node ${this.id} with FlowLab.`);
  }
}