import path from "node:path";
import {ChainForkConfig} from "@lodestar/config";
import {KeyValue} from "@lodestar/db";
import {CheckpointWithHex, IForkChoice, PayloadStatus, ProtoBlock} from "@lodestar/fork-choice";
import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {Epoch, Slot} from "@lodestar/types";
import {Logger, fromAsync, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {IBeaconDb} from "../../../db/index.js";
import {BlockArchiveBatchPutBinaryItem} from "../../../db/repositories/index.js";
import {ensureDir, writeIfNotExist} from "../../../util/file.js";
import {BlockRootHex} from "../../../util/sszBytes.js";
import {LightClientServer} from "../../lightClient/index.js";

// Process in chunks to avoid OOM
// this number of blocks per chunk is tested in e2e test blockArchive.test.ts
// TODO: Review after merge since the size of blocks will increase significantly
const BLOCK_BATCH_SIZE = 256;
const BLOB_SIDECAR_BATCH_SIZE = 32;

type BlockRootSlot = {slot: Slot; root: Uint8Array};

/**
 * Persist orphaned block to disk
 */
async function persistOrphanedBlock(
  slot: Slot,
  blockRoot: BlockRootHex,
  bytes: Uint8Array,
  opts: {
    persistOrphanedBlocksDir: string;
  }
) {
  const dirpath = path.join(opts.persistOrphanedBlocksDir ?? "orphaned_blocks");
  const filepath = path.join(dirpath, `${slot}_${blockRoot}.ssz`);
  await ensureDir(dirpath);
  await writeIfNotExist(filepath, bytes);
}

/**
 * Archives finalized blocks from active bucket to archive bucket.
 *
 * Only archive blocks on the same chain to the finalized checkpoint.
 * Each run should move all finalized blocks to blockArhive db to make it consistent
 * to stateArchive, so that the node always work well when we restart.
 * Note that the finalized block still stay in forkchoice to check finalize checkpoint of next onBlock calls,
 * the next run should not reprocess finalzied block of this run.
 */
export async function archiveBlocks(
  config: ChainForkConfig,
  db: IBeaconDb,
  forkChoice: IForkChoice,
  lightclientServer: LightClientServer | undefined,
  logger: Logger,
  finalizedCheckpoint: CheckpointWithHex,
  currentEpoch: Epoch,
  archiveDataEpochs?: number,
  persistOrphanedBlocks?: boolean,
  persistOrphanedBlocksDir?: string
): Promise<void> {
  // Use fork choice to determine the blocks to archive and delete.
  // `ancestors` is the canonical walk back from the finalized root, including the previous finalized
  // block as its last element.
  const {ancestors: finalizedCanonicalBlocks, nonAncestors: finalizedNonCanonicalBlocks} =
    forkChoice.getAllAncestorAndNonAncestorBlocksDefaultStatus(finalizedCheckpoint.rootHex);

  // NOTE: The finalized block will be exactly the first block of `epoch` or previous
  const finalizedPostDeneb = finalizedCheckpoint.epoch >= config.DENEB_FORK_EPOCH;
  const finalizedPostFulu = finalizedCheckpoint.epoch >= config.FULU_FORK_EPOCH;
  const finalizedPostGloas = finalizedCheckpoint.epoch >= config.GLOAS_FORK_EPOCH;

  const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({
    slot: block.slot,
    root: fromHex(block.blockRoot),
  }));

  const logCtx = {currentEpoch, finalizedEpoch: finalizedCheckpoint.epoch, finalizedRoot: finalizedCheckpoint.rootHex};

  if (finalizedCanonicalBlockRoots.length > 0) {
    const migratedSlots = await migrateBlocksFromHotToColdDb(db, logger, finalizedCanonicalBlockRoots);
    logger.verbose("Migrated blocks from hot DB to cold DB", {
      ...logCtx,
      fromSlot: finalizedCanonicalBlockRoots[0].slot,
      toSlot: finalizedCanonicalBlockRoots.at(-1)?.slot,
      size: finalizedCanonicalBlockRoots.length,
      migratedEntries: migratedSlots.length,
      slotRange: prettyPrintIndices(migratedSlots),
    });

    if (finalizedPostDeneb) {
      const migratedEntries = await migrateBlobSidecarsFromHotToColdDb(
        config,
        db,
        logger,
        finalizedCanonicalBlockRoots,
        currentEpoch
      );
      logger.verbose("Migrated blobSidecars from hot DB to cold DB", {...logCtx, migratedEntries});
    }

    if (finalizedPostFulu) {
      const migratedSlots = await migrateDataColumnSidecarsFromHotToColdDb(
        config,
        db,
        logger,
        finalizedCanonicalBlocks,
        currentEpoch
      );
      logger.verbose("Migrated dataColumnSidecars from hot DB to cold DB", {
        ...logCtx,
        migratedEntries: migratedSlots.length,
        slotRange: prettyPrintIndices(migratedSlots),
      });
    }

    if (finalizedPostGloas) {
      const migratedSlots = await migrateExecutionPayloadEnvelopesFromHotToColdDb(
        config,
        db,
        logger,
        finalizedCanonicalBlocks
      );
      logger.verbose("Migrated executionPayloadEnvelopes from hot DB to cold DB", {
        ...logCtx,
        migratedEntries: migratedSlots.length,
        slotRange: prettyPrintIndices(migratedSlots),
      });
    }
  }

  // deleteNonCanonicalBlocks
  // loop through forkchoice single time

  const nonCanonicalBlockRoots = finalizedNonCanonicalBlocks.map((summary) => fromHex(summary.blockRoot));
  if (nonCanonicalBlockRoots.length > 0) {
    if (persistOrphanedBlocks) {
      // Persist orphaned blocks to disk before deleting them from hot db
      await Promise.all(
        nonCanonicalBlockRoots.map(async (root, index) => {
          const block = finalizedNonCanonicalBlocks[index];
          const blockBytes = await db.block.getBinary(root);
          const blockLogCtx = {slot: block.slot, root: block.blockRoot};
          if (blockBytes) {
            await persistOrphanedBlock(block.slot, block.blockRoot, blockBytes, {
              persistOrphanedBlocksDir: persistOrphanedBlocksDir ?? "orphaned_blocks",
            });
            logger.verbose("Persisted orphaned block", {...logCtx, ...blockLogCtx});
          } else {
            logger.warn("Tried to persist orphaned block but no block found", {...logCtx, ...blockLogCtx});
          }
        })
      );
    }

    const nonCanonicalSlots = finalizedNonCanonicalBlocks.map((summary) => summary.slot).sort((a, b) => a - b);
    const nonCanonicalLogCtx = {
      ...logCtx,
      count: nonCanonicalBlockRoots.length,
      slotRange: prettyPrintIndices(nonCanonicalSlots),
    };

    await db.block.batchDelete(nonCanonicalBlockRoots);
    logger.verbose("Deleted non canonical blocks from hot DB", nonCanonicalLogCtx);

    if (finalizedPostDeneb) {
      await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
      logger.verbose("Deleted non canonical blobSidecars from hot DB", nonCanonicalLogCtx);
    }

    if (finalizedPostFulu) {
      await db.dataColumnSidecar.deleteMany(nonCanonicalBlockRoots);
      logger.verbose("Deleted non canonical dataColumnSidecars from hot DB", nonCanonicalLogCtx);
    }

    if (finalizedPostGloas) {
      await db.executionPayloadEnvelope.batchDelete(nonCanonicalBlockRoots);
      logger.verbose("Deleted non canonical executionPayloadEnvelopes from hot DB", nonCanonicalLogCtx);
    }
  }

  // Delete expired blobs
  // Keep only `[current_epoch - max(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveDataEpochs)]`
  // if archiveDataEpochs set to Infinity do not prune`
  if (finalizedPostDeneb) {
    if (archiveDataEpochs !== Infinity) {
      const blobsArchiveWindow = Math.max(config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveDataEpochs ?? 0);
      const blobSidecarsMinEpoch = currentEpoch - blobsArchiveWindow;
      if (blobSidecarsMinEpoch >= config.DENEB_FORK_EPOCH) {
        const slotsToDelete = await db.blobSidecarsArchive.keys({lt: computeStartSlotAtEpoch(blobSidecarsMinEpoch)});
        if (slotsToDelete.length > 0) {
          await db.blobSidecarsArchive.batchDelete(slotsToDelete);
          logger.verbose(`blobSidecars prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete.at(-1)}`, logCtx);
        } else {
          logger.verbose(`blobSidecars prune: no entries before epoch ${blobSidecarsMinEpoch}`, logCtx);
        }
      }
    } else {
      logger.verbose("blobSidecars pruning skipped: archiveDataEpochs set to Infinity", logCtx);
    }
  }

  // Delete expired data column sidecars
  // Keep only `[current_epoch - max(MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, archiveDataEpochs)]`
  if (finalizedPostFulu) {
    if (archiveDataEpochs !== Infinity) {
      const dataColumnSidecarsArchiveWindow = Math.max(
        config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS,
        archiveDataEpochs ?? 0
      );
      const dataColumnSidecarsMinEpoch = currentEpoch - dataColumnSidecarsArchiveWindow;
      if (dataColumnSidecarsMinEpoch >= config.FULU_FORK_EPOCH) {
        const prefixedKeys = await db.dataColumnSidecarArchive.keys({
          // The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch`
          lt: {prefix: computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch), id: 0},
        });
        // for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it
        const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b);

        if (slotsToDelete.length > 0) {
          await db.dataColumnSidecarArchive.deleteMany(slotsToDelete);
          logger.verbose("dataColumnSidecars prune", {
            ...logCtx,
            slotRange: prettyPrintIndices(slotsToDelete),
            numOfSlots: slotsToDelete.length,
            totalNumOfSidecars: prefixedKeys.length,
          });
        } else {
          logger.verbose(`dataColumnSidecars prune: no entries before epoch ${dataColumnSidecarsMinEpoch}`, logCtx);
        }
      } else {
        logger.verbose(
          `dataColumnSidecars pruning skipped: ${dataColumnSidecarsMinEpoch} is before fulu fork epoch ${config.FULU_FORK_EPOCH}`,
          logCtx
        );
      }
    } else {
      logger.verbose("dataColumnSidecars pruning skipped: archiveDataEpochs set to Infinity", logCtx);
    }
  }

  // Prunning potential checkpoint data
  const finalizedCanonicalNonCheckpointBlocks = getNonCheckpointBlocks(finalizedCanonicalBlockRoots);
  const nonCheckpointBlockRoots: Uint8Array[] = [...nonCanonicalBlockRoots];
  for (const block of finalizedCanonicalNonCheckpointBlocks) {
    nonCheckpointBlockRoots.push(block.root);
  }

  if (lightclientServer) {
    await lightclientServer.pruneNonCheckpointData(nonCheckpointBlockRoots);
  }

  logger.verbose("Archiving of finalized blocks complete", {
    ...logCtx,
    totalArchived: finalizedCanonicalBlocks.length,
  });
}

async function migrateBlocksFromHotToColdDb(db: IBeaconDb, logger: Logger, blocks: BlockRootSlot[]): Promise<Slot[]> {
  // The input includes the previous finalized block as the last ancestor; its SignedBeaconBlock
  // was archived on a previous run and is no longer in hot db. `getBinary` returning null for any
  // block in the batch is therefore treated as "already migrated, skip" rather than an error.
  const migratedSlots: Slot[] = [];
  for (let i = 0; i < blocks.length; i += BLOCK_BATCH_SIZE) {
    const toIdx = Math.min(i + BLOCK_BATCH_SIZE, blocks.length);
    const canonicalBlocks = blocks.slice(i, toIdx);

    if (canonicalBlocks.length === 0) break;

    // load Buffer instead of SignedBeaconBlock to improve performance
    const canonicalBlockEntries = (
      await Promise.all(
        canonicalBlocks.map(async (block): Promise<BlockArchiveBatchPutBinaryItem | null> => {
          const blockBuffer = await db.block.getBinary(block.root);
          if (!blockBuffer) {
            logger.debug("Block in forkchoice but missing in hot db, could be already archived", {
              slot: block.slot,
              root: toRootHex(block.root),
            });
            return null;
          }
          return {
            key: block.slot,
            value: blockBuffer,
            slot: block.slot,
            blockRoot: block.root,
            // TODO: Benchmark if faster to slice Buffer or fromHex()
            parentRoot: getParentRootFromSignedBlock(blockBuffer),
          };
        })
      )
    ).filter((entry): entry is BlockArchiveBatchPutBinaryItem => entry !== null);

    if (canonicalBlockEntries.length === 0) continue;

    await Promise.all([
      db.blockArchive.batchPutBinary(canonicalBlockEntries),
      db.block.batchDelete(canonicalBlockEntries.map((entry) => entry.blockRoot)),
    ]);
    for (const entry of canonicalBlockEntries) migratedSlots.push(entry.slot);
  }
  // Ancestor walk is newest → oldest; sort ascending so `prettyPrintIndices` renders cleanly.
  return migratedSlots.sort((a, b) => a - b);
}

/**
 * Migrate blobSidecars from hot db to cold db.
 * @returns true if we do that, false if block is out of range data.
 */
async function migrateBlobSidecarsFromHotToColdDb(
  config: ChainForkConfig,
  db: IBeaconDb,
  logger: Logger,
  blocks: BlockRootSlot[],
  currentEpoch: Epoch
): Promise<number> {
  let migratedWrappedBlobSidecars = 0;
  for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
    const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
    const canonicalBlocks = blocks.slice(i, toIdx);

    // processCanonicalBlocks
    if (canonicalBlocks.length === 0) break;

    // load Buffer instead of ssz deserialized to improve performance
    const canonicalBlobSidecarsEntries: KeyValue<Slot, Uint8Array>[] = (
      await Promise.all(
        canonicalBlocks
          .filter((block) => {
            const blockSlot = block.slot;
            const blockEpoch = computeEpochAtSlot(blockSlot);
            const forkSeq = config.getForkSeq(blockSlot);
            return (
              forkSeq >= ForkSeq.deneb &&
              forkSeq < ForkSeq.fulu &&
              // if block is out of ${config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS}, skip this step
              blockEpoch >= currentEpoch - config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
            );
          })
          .map(async (block): Promise<KeyValue<Slot, Uint8Array> | null> => {
            // The ancestor walk includes the boundary (previous finalized) block; on first
            // finalization that boundary is the anchor which has no blob sidecars in hot db.
            // Treat a null hot-db entry as "nothing to migrate" rather than an error.
            const bytes = await db.blobSidecars.getBinary(block.root);
            if (!bytes) {
              logger.debug("BlobSidecars in forkchoice but missing in hot db, could be already archived", {
                slot: block.slot,
                root: toRootHex(block.root),
              });
              return null;
            }
            return {key: block.slot, value: bytes};
          })
      )
    ).filter((e): e is KeyValue<Slot, Uint8Array> => e !== null);

    // put to blockArchive db and delete block db
    await Promise.all([
      db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
      db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
    ]);
    migratedWrappedBlobSidecars += canonicalBlobSidecarsEntries.length;
  }

  return migratedWrappedBlobSidecars;
}

// TODO: This function can be simplified further by reducing layers of promises in a loop
/**
 * Post-gloas the data columns of a Gloas block are tied to its execution payload envelope —
 * columns only exist once the FULL variant of the block is in the proto-array. Pre-Gloas (Fulu)
 * blocks only have a FULL variant, so the `payloadStatus === FULL` filter passes them all.
 * Blocks whose canonical variant is PENDING/EMPTY are skipped here — their columns will be picked
 * up on a later run once the FULL variant appears in the ancestor walk.
 */
async function migrateDataColumnSidecarsFromHotToColdDb(
  config: ChainForkConfig,
  db: IBeaconDb,
  logger: Logger,
  canonicalBlocks: ProtoBlock[],
  currentEpoch: Epoch
): Promise<Slot[]> {
  const columnBlocks = canonicalBlocks.filter(
    (block) => config.getForkSeq(block.slot) < ForkSeq.gloas || block.payloadStatus === PayloadStatus.FULL
  );
  if (columnBlocks.length === 0) return [];
  const blocks: BlockRootSlot[] = columnBlocks.map((block) => ({slot: block.slot, root: fromHex(block.blockRoot)}));

  const migratedSlots: Slot[] = [];
  for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
    const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
    const batch = blocks.slice(i, toIdx);
    if (batch.length === 0) break;

    const promises: Promise<void>[] = [];

    // load Buffer instead of ssz deserialized to improve performance
    for (const block of batch) {
      const blockSlot = block.slot;
      const blockEpoch = computeEpochAtSlot(blockSlot);

      if (
        config.getForkSeq(blockSlot) < ForkSeq.fulu ||
        // if block is out of ${config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS}, skip this step
        blockEpoch < currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
      ) {
        continue;
      }

      // Here we assume the data column sidecars are already in the hot db
      const dataColumnSidecarBytes = await fromAsync(db.dataColumnSidecar.valuesStreamBinary(block.root));
      if (dataColumnSidecarBytes.length === 0) {
        // Empty stream: either the block has no blobs, or columns were already archived on a
        // previous run (boundary block). Nothing to migrate.
        logger.debug("DataColumnSidecars in forkchoice but missing in hot db, could be already archived", {
          slot: block.slot,
          root: toRootHex(block.root),
        });
        continue;
      }
      logger.verbose("Migrated dataColumnSidecars for block", {
        currentEpoch,
        slot: block.slot,
        root: toRootHex(block.root),
        numSidecars: dataColumnSidecarBytes.length,
      });
      promises.push(
        db.dataColumnSidecarArchive.putManyBinary(
          block.slot,
          dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value}))
        )
      );
      migratedSlots.push(block.slot);
    }

    promises.push(db.dataColumnSidecar.deleteMany(batch.map((block) => block.root)));

    await Promise.all(promises);
  }

  // Ancestor walk is newest → oldest; sort ascending so `prettyPrintIndices` renders cleanly.
  return migratedSlots.sort((a, b) => a - b);
}

/**
 * Post-gloas given a finalized checkpoint at a block root, payload of that block root
 * is not considered finalized, hence they are archived in the next run.
 */
async function migrateExecutionPayloadEnvelopesFromHotToColdDb(
  config: ChainForkConfig,
  db: IBeaconDb,
  logger: Logger,
  canonicalBlocks: ProtoBlock[]
): Promise<Slot[]> {
  const payloadBlocks = canonicalBlocks.filter(
    (block) => config.getForkSeq(block.slot) < ForkSeq.gloas || block.payloadStatus === PayloadStatus.FULL
  );
  if (payloadBlocks.length === 0) return [];
  const blocks = payloadBlocks.map((block) => ({slot: block.slot, root: fromHex(block.blockRoot)}));

  const envelopeEntries: KeyValue<Slot, Uint8Array>[] = [];
  const migratedRoots: Uint8Array[] = [];

  const envelopeBytesArray = await Promise.all(
    blocks.map((block) => db.executionPayloadEnvelope.getBinary(block.root))
  );

  for (let i = 0; i < blocks.length; i++) {
    const bytes = envelopeBytesArray[i];
    if (bytes !== null) {
      envelopeEntries.push({key: blocks[i].slot, value: bytes});
      migratedRoots.push(blocks[i].root);
    } else {
      logger.debug("ExecutionPayloadEnvelope in forkchoice but missing in hot db, could be already archived", {
        slot: blocks[i].slot,
        root: toRootHex(blocks[i].root),
      });
    }
  }

  if (envelopeEntries.length === 0) return [];

  await Promise.all([
    db.executionPayloadEnvelopeArchive.batchPutBinary(envelopeEntries),
    db.executionPayloadEnvelope.batchDelete(migratedRoots),
  ]);

  // Slots are ascending in hot-db key order — sort to guarantee `prettyPrintIndices` output is clean
  // regardless of ancestor-walk order (newest → oldest).
  return envelopeEntries.map((entry) => entry.key).sort((a, b) => a - b);
}

/**
 * ```
 * class SignedBeaconBlock(Container):
 *   message: BeaconBlock [offset - 4 bytes]
 *   signature: BLSSignature [fixed - 96 bytes]
 *
 * class BeaconBlock(Container):
 *   slot: Slot [fixed - 8 bytes]
 *   proposer_index: ValidatorIndex [fixed - 8 bytes]
 *   parent_root: Root [fixed - 32 bytes]
 *   state_root: Root
 *   body: BeaconBlockBody
 * ```
 * From byte: `4 + 96 + 8 + 8 = 116`
 * To byte: `116 + 32 = 148`
 */
export function getParentRootFromSignedBlock(bytes: Uint8Array): Uint8Array {
  return bytes.slice(116, 148);
}

/**
 *
 * @param blocks sequence of linear blocks, from child to ancestor.
 * In ProtoArray.getAllAncestorNodes child nodes are pushed first to the returned array.
 */
export function getNonCheckpointBlocks<T extends {slot: Slot}>(blocks: T[]): T[] {
  // Iterate from lowest child to highest ancestor
  // Look for the checkpoint of the lowest epoch
  // If block at `epoch * SLOTS_PER_EPOCH`, it's a checkpoint.
  // - Then for the previous epoch all blocks but the 0 are NOT checkpoints
  // - Otherwise for the previous epoch the last block is a checkpoint

  if (blocks.length < 1) {
    return [];
  }

  const nonCheckpointBlocks: T[] = [];
  // Start with Infinity to always trigger `blockEpoch < epochPtr` in the first loop
  let epochPtr = Infinity;
  // Assume worst case, since it's unknown if a future epoch will skip the first slot or not.
  // This function must return only blocks that are guaranteed to never become checkpoints.
  let epochPtrHasFirstSlot = false;

  // blocks order: from child to ancestor, decreasing slot
  for (let i = 0; i < blocks.length; i++) {
    let isCheckpoint = false;
    const block = blocks[i];
    const blockEpoch = computeEpochAtSlot(block.slot);

    if (blockEpoch < epochPtr) {
      // If future epoch has skipped the first slot, the last block in the previous epoch is a checkpoint
      if (!epochPtrHasFirstSlot) {
        isCheckpoint = true;
      }

      // Reset epoch pointer
      epochPtr = blockEpoch;
      epochPtrHasFirstSlot = false;
    }

    // The block in the first slot of an epoch is always a checkpoint slot
    if (block.slot % SLOTS_PER_EPOCH === 0) {
      epochPtrHasFirstSlot = true;
      isCheckpoint = true;
    }

    if (!isCheckpoint) {
      nonCheckpointBlocks.push(block);
    }
  }

  return nonCheckpointBlocks;
}
