// src/extractors/prismaExtractor.ts
import { IExtractor, PipelineContext, DataSource, DatabaseSourceConfig } from '../core/interfaces';
import { PrismaClient } from '@prisma/client'; // Needs peer dependency

// Define a more specific config type if needed
interface PrismaSourceConfig extends DatabaseSourceConfig {
    type: 'postgresql' | 'mysql'; // Add others supported by Prisma
    connection: PrismaClient; // Expecting an instantiated PrismaClient
    model: string; // e.g., 'user', 'product'
    queryArgs?: object; // Prisma findMany arguments (where, select, include, etc.)
    batchSize?: number; // For cursor-based pagination
}

export class PrismaExtractor<TOutput> implements IExtractor<TOutput> {
    private config: PrismaSourceConfig;

    constructor(config: PrismaSourceConfig) {
        if (!config.connection || !config.model) {
            throw new Error('PrismaExtractor requires "connection" (PrismaClient instance) and "model" in config.');
        }
        this.config = config;
        this.config.batchSize = config.batchSize || 1000; // Default batch size for iteration
    }

    async extract(context: PipelineContext): Promise<DataSource<TOutput>> {
        context.logger.info(`Extracting data using Prisma from model: ${this.config.model}`);
        const modelDelegate = (this.config.connection as any)[this.config.model];
        if (!modelDelegate) {
             throw new Error(`Prisma model "${this.config.model}" not found on the provided client.`);
        }

        // Simple findMany - loads all into memory (potentially bad for large tables)
        // return modelDelegate.findMany(this.config.queryArgs || {});

        // Better: Use cursor-based pagination for large datasets -> AsyncIterable
        return this.extractWithCursor(context, modelDelegate);
    }

    private async *extractWithCursor(context: PipelineContext, modelDelegate: any): AsyncIterable<TOutput> {
        let cursor: string | number | undefined = undefined; // Adjust cursor type based on your @id field
        const take = this.config.batchSize!;
        const idField = 'id'; // *** IMPORTANT: Determine the unique cursor field (@id or @unique) dynamically or configure it ***

        context.logger.debug(`Starting cursor-based extraction for ${this.config.model} with batch size ${take}`);

        while (true) {
            const queryArgs = {
                ...(this.config.queryArgs || {}), // Original query args (where, select, etc.)
                take: take,
                skip: cursor ? 1 : 0, // Skip the cursor item itself on subsequent fetches
                cursor: cursor ? { [idField]: cursor } : undefined,
                orderBy: { // Must order by the unique cursor field
                   [idField]: 'asc',
                },
            };

            context.logger.trace({ queryArgs }, 'Fetching next batch from Prisma');
            const batch = await modelDelegate.findMany(queryArgs) as TOutput[];

            if (batch.length === 0) {
                context.logger.debug('Extraction complete, no more items found.');
                break; // No more data
            }

            for (const item of batch) {
                yield item;
            }

            // Update cursor to the ID of the last item in the batch
            cursor = (batch[batch.length - 1] as any)[idField];
             if (!cursor) {
                 context.logger.error({ lastItem: batch[batch.length - 1] }, `Could not get cursor field '${idField}' from last item.`);
                 throw new Error(`Cursor field '${idField}' not found or null in extracted data.`);
             }
             context.logger.trace(`Cursor updated to: ${cursor}`);
        }
         context.logger.info(`Finished Prisma extraction for ${this.config.model}`);
    }
}