#!/usr/bin/env tsx
import * as I from '@augceo/iterators';
import { Command } from '@commander-js/extra-typings';
import * as fs from 'fs';
import * as glob from 'glob';
import cluster from 'node:cluster';
import { Readable } from 'node:stream';
import postgres from 'postgres';
import { parseHand, parseHandIdentifiers } from '../formats/pokerstars/parse.ts';
import * as Poker from '../index.ts';

const program = new Command()
  .name('parse-hands')
  .description('Parse poker hand history and output stats as CSV or PostgreSQL table')
  .argument('<output>', 'output file path (for CSV) or table name (for PostgreSQL)')
  .option('-i, --input <pattern>', 'input file glob pattern (e.g., "files/**/*.txt")', String)
  .option('-l, --limit <number>', 'limit number of hands to parse', parseInt, Infinity)
  .option('-p, --progress', 'show progress while parsing')
  .option('-v, --verbose', 'output extra information')
  .option('-e, --error-file <path>', 'file to save hand texts that caused parsing errors')
  .option('--skip-timeouts', 'skip hands that include timeout messages or incomplete showdowns')
  .option(
    '-c, --connection <string>',
    'PostgreSQL connection string (e.g., postgresql://user:pass@host:port/db)'
  )
  .option('-m, --method <string>', 'PostgreSQL import method (copy or insert)', 'insert')
  .option(
    '--max-concurrency <number>',
    'maximum number of concurrent database operations',
    parseInt,
    2
  )
  .version('1.0.0');

program.parse();

const options = program.opts();
const [outputPath] = program.args;

if (cluster.isPrimary) {
  const worker = cluster.fork();

  worker.on('exit', (code, signal) => {
    process.exit(code ?? (signal ? 1 : 0));
  });

  process.on('SIGINT', () => {
    console.log('Recieved SIGINT');
    return false;
  });
  process.on('SIGTERM', () => {
    console.log('Recieved SIGTERM');
    process.exit(0);
  });

  cluster.on('exit', (_worker, code, signal) => {
    process.exit(code ?? (signal ? 1 : 0));
  });

  await new Promise(() => {});
} else {
  // Handle graceful shutdown on SIGINT (Ctrl+C)
  process.on('SIGINT', () => {
    triggerGracefulShutdown();
    return false;
  });
  process.on('SIGTERM', () => {
    triggerGracefulShutdown();
    return false;
  });
}

// Setup error file stream if option provided
let errorStream: fs.WriteStream | null = null;
if (options.errorFile) {
  errorStream = fs.createWriteStream(options.errorFile);
}

// Setup PostgreSQL client if table output is requested
let sql: postgres.Sql | null = null;

// Add after other variable declarations
let readCount = 0;
let parsedCount = 0;
let analyzedCount = 0;
let skippedCount = 0;
let errorCount = 0;
let duplicateCount = 0;
let writtenCount = 0;
let processedFilesCount = 0;
let totalInputSize = 0;
const startTime = Date.now();
const seenGameIds = new Set<number>();
const errorCounts: Record<string, number> = {};
let abortController: AbortController | null = null;
let statsInterval: NodeJS.Timeout | null = null;

// Helper function to escape for TEXT protocol
function escapeText(value: any): string {
  if (value === null || value === undefined) {
    return '\\N';
  }
  if (value instanceof Date) {
    return value.toISOString();
  }
  return String(value)
    .replace(/\\/g, '\\\\')
    .replace(/\t/g, '\\t')
    .replace(/\n/g, '\\n')
    .replace(/\r/g, '\\r');
}

// Function to check if games exist in database
let filterBatchProcessing = 0;
async function filterBatch(gameBatch: string[]) {
  try {
    filterBatchProcessing++;

    //console.log('filter batch', gameBatch.length, filterBatchProcessing, 'in flight');
    //await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
    if (!sql) throw new Error('PostgreSQL not initialized');
    if (gameBatch.length === 0) return [];

    // First pass: collect all game IDs and check for duplicates
    const ids: number[] = [];
    const uniqueBatch = gameBatch.filter((handText, _index) => {
      const { hand: id } = parseHandIdentifiers(handText.substring(0, handText.indexOf('\n')));
      if (seenGameIds.has(id)) {
        duplicateCount++;
        return false;
      } else {
        ids.push(id);
        seenGameIds.add(id);
        return true;
      }
    });

    if (uniqueBatch.length === 0) {
      filterBatchProcessing--;
      //console.log('filtered', 0);
      return [];
    }

    const { venue } = parseHandIdentifiers(
      uniqueBatch[0].substring(0, uniqueBatch[0].indexOf('\n'))
    );

    const result =
      await sql`SELECT id FROM game_states WHERE id = ANY(${ids}) AND venue = ${venue}`;

    const existingGameIds = new Set(result.map(row => parseInt(row.id)));
    const filtered = uniqueBatch.filter((_, index) => {
      const id = ids[index];
      seenGameIds.delete(id);
      if (existingGameIds.has(id)) {
        duplicateCount++;
        return false;
      }
      return true;
    });

    return filtered;
  } finally {
    filterBatchProcessing--;
  }
}

// Helper function to format file size
function formatFileSize(bytes: number): string {
  if (bytes < 1024) return bytes + ' B';
  if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(2) + ' KB';
  if (bytes < 1024 * 1024 * 1024) return (bytes / (1024 * 1024)).toFixed(2) + ' MB';
  return (bytes / (1024 * 1024 * 1024)).toFixed(2) + ' GB';
}

// Function to output final stats and clean up
function printStats(isFinal = false) {
  try {
    if (!isFinal) {
      process.stdout.write('\u001B[2J\u001B[0;0f');
    }
    // Get output file size for CSV
    let outputSize = 0;
    if (isFinal && !options.connection) {
      try {
        const stats = fs.statSync(outputPath);
        outputSize = stats.size;
      } catch (error) {
        console.error(`Error getting output file size: ${error}`);
      }
    }

    const duration = (Date.now() - startTime) / 1000; // Convert to seconds
    const inputPerSecond = duration > 0 ? readCount / duration : 0;
    const outputPerSecond = duration > 0 ? writtenCount / duration : 0;

    let errorBreakdown = '';
    if (errorCount > 0) {
      errorBreakdown =
        '\n' +
        Object.entries(errorCounts)
          .map(([key, value]) => `  - ${key}: ${value}`)
          .join('\n');
    }

    const message = `${isFinal ? '\nSummary:' : 'Progress:'}
- Read: ${readCount} hands
- Parsed: ${parsedCount} hands
- Analyzed: ${analyzedCount} hands
- Skipped: ${skippedCount} hands
- Duplicates: ${duplicateCount} hands
- Written: ${writtenCount} hands
- Files processed: ${processedFilesCount}
- Input size: ${formatFileSize(totalInputSize)}
- Errored: ${errorCount} hands${errorBreakdown}
${isFinal && !options.connection ? `\n- Output size: ${formatFileSize(outputSize)}` : ''}
- Duration: ${duration.toFixed(2)} seconds
- Input: ${inputPerSecond.toFixed(2)} hands/second
- Output: ${outputPerSecond.toFixed(2)} hands/second
- Memory: ${Math.floor(process.memoryUsage().rss / 1024 / 1024)} MB
`;

    process.stdout.write(message + '\n');
    if (isFinal) {
      console.timeEnd('Total time');
    }
  } catch (error) {
    console.error('Error during final stats output:', error);
  }
}

function triggerGracefulShutdown() {
  if (abortController?.signal.aborted) {
    console.error('\nForce quitting...');
    process.exit(1);
  }
  process.stdout.write('\nProcess interrupted. Shutting down gracefully...\n');
  shutdown().then(() => {
    process.exit(0);
  });

  return false;
}

// Function to handle graceful shutdown
async function shutdown() {
  if (abortController?.signal.aborted) return;
  abortController?.abort();

  if (statsInterval) {
    clearInterval(statsInterval);
  }

  try {
    // Close database connection
    if (sql) {
      console.log('Closing connections');
      await sql.end();
    }
    //console.log('Closing pool');
    //    await pool?.end();
    //console.log('Closed pool');

    // Close error stream if it exists
    if (errorStream) {
      errorStream.end();
      await new Promise<void>(resolve => errorStream.once('finish', resolve));
    }
    // Output final stats
    printStats(true);
  } catch (error) {
    console.error('Error during shutdown:', error);
  }
}

// Handle process exit to ensure clean shutdown
//process.on('exit', async () => {
//  if (!isInterrupted) {
//    await shutdown();
//  }
//});
function logError(error: any, text: string, hand: Poker.Hand | null = null) {
  errorCount++;
  const errorMessage = String(error.message || error);
  const errorKey = errorMessage.split(/[:.,\n]/)[0].trim();
  errorCounts[errorKey] = (errorCounts[errorKey] || 0) + 1;

  if (errorStream) {
    const context = hand ? JSON.stringify(hand, null, 2) + '\n' + text : text;
    errorStream.write(context + '\n' + error.stack + '\n\n');
  }
}
function processHand(handText: string): readonly [Poker.Hand, Poker.Game] | null {
  if (
    options.skipTimeouts &&
    (handText.includes('has timed out') ||
      handText.includes(' showed  and ') ||
      handText.includes('Hand was run twice') ||
      handText.includes('Hand cancelled'))
  ) {
    skippedCount++;
    return null;
  }

  //console.log('processHand', handText.length);
  let hand: Poker.Hand | null = null;
  try {
    hand = parseHand(handText);
  } catch (error: any) {
    console.log(error);
    logError(error, handText);
    return null;
  }

  if (!hand) return null;

  parsedCount++;

  let game: Poker.Game | null = null;
  try {
    game = Poker.Game(hand);
    analyzedCount++;
  } catch (error: any) {
    logError(error, handText, hand);
    return null;
  }

  if (!game) return null;

  return [hand, game] as const;
}

// Function to write batch of games
async function writeGames(games: Poker.Hand[]) {
  if (!options.connection || !sql) return;

  if (options.method === 'copy') {
    const reserved = await sql.reserve();
    try {
      const columns = [
        'id',
        'type',
        'variant',
        'venue',
        'table',
        'event',
        'url',
        'seat_count',
        'player_count',
        'created_at',
        'updated_at',
        'finished_at',
        'state',
      ];

      async function* generateData() {
        for (const game of games) {
          writtenCount++;
          const createdAt = new Date(game.timestamp || 0);
          const gameData = [
            game.hand,
            'poker',
            game.variant,
            game.venue,
            game.table,
            game.event,
            game.url,
            game.seatCount,
            game.players.length,
            createdAt,
            createdAt,
            createdAt,
            JSON.stringify(game),
          ];
          yield gameData.map(escapeText).join('\t') + '\n';
        }
      }

      const readable = Readable.from(generateData());
      const copyStream = await reserved`COPY game_states (${sql(columns)}) FROM STDIN`.writable();

      await new Promise((resolve, reject) => {
        readable.pipe(copyStream).on('finish', resolve).on('error', reject);
      });
    } finally {
      await reserved.release();
    }
  } else {
    const columns = [
      'id',
      'type',
      'variant',
      'venue',
      'table',
      'event',
      'url',
      'seat_count',
      'player_count',
      'created_at',
      'updated_at',
      'finished_at',
      'state',
    ];
    const columnsData: any[][] = Array.from({ length: columns.length }, () => []);
    let rowCount = 0;

    for (const game of games) {
      writtenCount++;
      rowCount++;
      const createdAt = new Date(game.timestamp || 0);
      const gameValues: any[] = [
        game.hand,
        'poker',
        game.variant,
        game.venue,
        game.table,
        game.event,
        game.url,
        game.seatCount,
        game.players.length,
        createdAt,
        createdAt,
        createdAt,
        JSON.stringify(game),
      ];
      gameValues.forEach((val, i) => {
        const column = columns[i];
        if (val instanceof Date) {
          val = val.toISOString();
        }
        if (
          ['event', 'url', 'variant', 'venue', 'table'].includes(column) &&
          val !== null &&
          typeof val !== 'string'
        ) {
          columnsData[i].push(String(val));
        } else {
          columnsData[i].push(val === undefined ? null : val);
        }
      });
    }

    if (rowCount > 0) {
      const casts = [
        'bigint[]',
        'text[]',
        'text[]',
        'text[]',
        'text[]',
        'text[]',
        'text[]',
        'bigint[]',
        'integer[]',
        'integer[]',
        'timestamptz[]',
        'timestamptz[]',
        'timestamptz[]',
        'jsonb[]',
      ];
      const placeholders = casts.map((cast, i) => `$${i + 1}::${cast}`).join(', ');
      const query = `INSERT INTO game_states ("${columns.join(
        '","'
      )}") SELECT * FROM unnest(${placeholders}) ON CONFLICT (id, venue) DO NOTHING`;

      try {
        await sql.unsafe(query, columnsData, { prepare: true });
      } catch (error) {
        console.error('Error inserting games:', error);
      }
    }
  }
}

// Function to write batch of stats
async function writeStats(tables: Poker.Game[]) {
  if (!options.connection) {
    if (process.stdout.writable) {
      for (const table of tables) {
        for (const stat of table.stats) {
          const row = Poker.Stats.columns.map(col => stat[col as keyof typeof stat]);
          process.stdout.write(row.map(escapeText).join(',') + '\n');
        }
      }
    }
    return;
  }
  if (!sql) return;

  if (options.method === 'copy') {
    const reserved = await sql.reserve();
    try {
      const columnNames = Poker.Stats.getColumnNames();

      async function* generateData() {
        for (const table of tables) {
          for (const stat of table.stats) {
            const row = Poker.Stats.columns.map(col =>
              col === 'createdAt' ? new Date(table.gameTimestamp) : stat[col as keyof typeof stat]
            );
            yield row.map(escapeText).join('\t') + '\n';
          }
        }
      }

      const readable = Readable.from(generateData());
      const copyStream =
        await reserved`COPY ${sql(outputPath)} (${sql(columnNames)}) FROM STDIN`.writable();

      await new Promise((resolve, reject) => {
        readable.pipe(copyStream).on('finish', resolve).on('error', reject);
      });
    } finally {
      await reserved.release();
    }
  } else {
    const columnNames = Poker.Stats.getColumnNames();
    const columns = Poker.Stats.columns;
    const columnsData: any[][] = Array.from({ length: columnNames.length }, () => []);
    let rowCount = 0;

    for (const table of tables) {
      for (const stat of table.stats) {
        rowCount++;
        columns.forEach((col, i) => {
          columnsData[i].push(
            col == 'createdAt'
              ? new Date(table.gameTimestamp).toISOString()
              : stat[col as keyof typeof stat]
          );
        });
      }
    }

    if (rowCount > 0) {
      const textColumns = new Set(['player', 'venue', 'gameId', 'table', 'street', 'currency']);
      const timestampColumns = new Set(['createdAt']);
      const casts = columns.map(col => {
        if (textColumns.has(col)) return 'text[]';
        if (timestampColumns.has(col)) return 'timestamptz[]';
        return 'numeric[]';
      });
      const placeholders = casts.map((cast, i) => `$${i + 1}::${cast}`).join(', ');
      const query = `INSERT INTO "${outputPath}" ("${columnNames.join(
        '","'
      )}") SELECT * FROM unnest(${placeholders}) ON CONFLICT DO NOTHING`;

      await sql.unsafe(query, columnsData, { prepare: true });
    }
  }
}

// Generator function for file paths
async function* getFilePaths(pattern: string) {
  const files = await glob.glob(pattern);
  if (files.length === 0) {
    console.error(`Error: No files found matching pattern "${pattern}"`);
    process.exit(1);
  }

  if (options.verbose) {
    //console.log(`Processing ${files.length} files matching "${pattern}"`);
  }

  for (const file of files) {
    yield file;
  }
}

// Function to read file contents
async function readFile(file: string) {
  processedFilesCount++;
  //console.log('readFile');
  try {
    const stats = await fs.promises.stat(file);
    totalInputSize += stats.size;
    //console.log('File', file, formatFileSize(stats.size));
  } catch (error) {
    console.error(`Error getting file size for ${file}: ${error}`);
  }
  return fs.createReadStream(file, { encoding: 'utf8' });
}

// Generator function to split into hands
async function* splitHands(chunks: AsyncIterable<string>): AsyncGenerator<string> {
  //console.log('splitHands');
  let buffer = '';

  const hands: string[] = [];
  let count = 0;
  for await (const chunk of chunks) {
    buffer += chunk.replace(/\r/g, '');
    let start = 0;
    let pos = 0;

    while ((pos = buffer.indexOf('PokerStars Hand #', start + 1)) !== -1) {
      if (start < pos) {
        const hand = buffer.substring(start, pos).trim();
        if (hand) {
          count++;
          yield hand;
          readCount++;
        }
      }
      start = pos;
    }

    buffer = buffer.substring(start);
  }

  if (buffer) {
    count++;
    yield buffer;
    hands.push(buffer);
  }
  //console.log('splitHands done', count);
  return hands;
}

// Main processing function
async function processFiles() {
  console.time('Total time');

  if (options.progress) {
    statsInterval = setInterval(() => printStats(), 250);
  }

  if (options.connection) {
    sql = postgres(options.connection, { max: 10 });
  }
  try {
    if (!options.input) {
      throw new Error('Input pattern is required');
    }
    // Create the processing pipeline using rotery
    const [result, controller] = I.abortable(I =>
      I.pipe(
        // Get file paths
        getFilePaths(options.input!),
        I.map(readFile, 4),
        I.map(splitHands, 4),
        I.concat(4, 4), // Find new hands in batches in paralleal
        I.chunk(5000),
        I.map(filterBatch, 5),
        I.concat(5, 1),
        I.map(processHand),
        I.filter(I.identity),
        I.take(options.limit),
        I.dispatch(([hand, game]) => ({ hand, game }), {
          hand: I.pipe(I.chunk(2000), I.map(writeGames, 3)),
          game: I.pipe(I.chunk(500), I.map(writeStats, 3)),
        })
      )
    );
    abortController = controller;

    var i = 0;
    for await (const _stream of result) {
      if (++i % 1000 === 0) {
        //console.log('>', i, Math.floor(process.memoryUsage().rss / 1024 / 1024) + 'MB memory used');
      }
    }
    console.log('DONE');
  } finally {
    await shutdown();
  }
}

// Start processing
processFiles().catch(error => {
  console.error('Error during processing:', error);
  process.exit(1);
});
