// examples/basic-etl.ts
import { DataPipeline, PrismaExtractor, PrismaLoader, RequiredFieldCleaner, MappingTransformer } from '@flowlab/data';
import { PrismaClient } from '@prisma/client';

// Assume you have Prisma schema defined in prisma/schema.prisma
// e.g., model SourceUser { id Int @id @default(autoincrement()) email String @unique name String? }
// e.g., model TargetUser { userId Int @id @default(autoincrement()) emailAddress String @unique fullName String status String @default("active") }

const prisma = new PrismaClient();

interface SourceUser {
    id: number;
    email: string;
    name: string | null;
}

interface TargetUser {
    userId: number;
    emailAddress: string;
    fullName: string;
    status: string;
}

async function main() {
    const pipeline = new DataPipeline<SourceUser, TargetUser>('user-etl');

    // Configure Extractor
    const extractor = new PrismaExtractor<SourceUser>({
        type: 'postgresql', // Or 'mysql' etc. depending on your Prisma setup
        connection: prisma,
        model: 'sourceUser', // Make sure model name matches Prisma schema
        // queryArgs: { where: { /* filter if needed */ } },
        batchSize: 50 // Smaller batch size for extraction iteration
    });

    // Configure Cleaner (ensure email exists)
    const cleaner = new RequiredFieldCleaner<SourceUser>('email');

    // Configure Transformer (map fields and generate fullName)
    const transformer = new MappingTransformer<SourceUser, TargetUser>([
        { sourcePath: 'id', targetPath: 'userId' },
        { sourcePath: 'email', targetPath: 'emailAddress' },
        {
            sourcePath: 'name',
            targetPath: 'fullName',
            transform: (name, sourceItem) => name || `User ${sourceItem.id}`, // Handle null names
            defaultValue: 'Unknown User'
        },
        // 'status' will get its default value from the TargetUser model definition
    ]);

     // Configure Loader
    const loader = new PrismaLoader<TargetUser>({
        type: 'postgresql',
        connection: prisma,
        model: 'targetUser',
        operation: 'upsert', // Create or update based on email
        upsertWhereField: 'emailAddress' // Use email to check for existing users
    });

    // Chain the steps
    pipeline
        .configure({ batchSize: 100, retries: 2, retryDelay: 500 }) // Configure pipeline run options
        .extract(extractor)
        .clean(cleaner) // Clean before transforming
        .transform(transformer)
        .load(loader);

    try {
        await pipeline.run();
        console.log('ETL Pipeline finished successfully.');
    } catch (error) {
        console.error('ETL Pipeline failed:', error);
    } finally {
        await prisma.$disconnect();
    }
}

main().catch(e => {
    console.error(e);
    process.exit(1);
});