import {ChainForkConfig} from "@lodestar/config";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {
  ForkName,
  ForkPostFulu,
  ForkPostGloas,
  ForkPreGloas,
  SLOTS_PER_EPOCH,
  isForkPostDeneb,
  isForkPostFulu,
  isForkPostGloas,
} from "@lodestar/params";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {BLSSignature, RootHex, SignedBeaconBlock, Slot, deneb, fulu} from "@lodestar/types";
import {LodestarError, Logger, byteArrayEquals, pruneSetToMax} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {MAX_LOOK_AHEAD_EPOCHS} from "../../sync/constants.js";
import {IClock} from "../../util/clock.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {SerializedCache} from "../../util/serializedCache.js";
import {
  BlockInput,
  BlockInputBlobs,
  BlockInputColumns,
  BlockInputNoData,
  BlockInputPreData,
  BlockWithSource,
  DAType,
  ForkBlobsDA,
  IBlockInput,
  LogMetaBasic,
  LogMetaBlobs,
  LogMetaColumns,
  SourceMeta,
  isBlockInputBlobs,
  isBlockInputColumns,
  isDaOutOfRange,
} from "../blocks/blockInput/index.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";

// Target size for the block input cache, enforced by pruneToMaxSize() which runs after prune()
// and onFinalized() — NOT on insertion. The cache can temporarily exceed this during range sync
// (e.g. 32 blocks inserted per batch) but is trimmed back after blocks are processed.
//
// Must be large enough to hold blocks from all concurrently downloaded range sync batches.
// Range sync downloads up to MAX_LOOK_AHEAD_EPOCHS batches ahead of the processing head,
// so up to (MAX_LOOK_AHEAD_EPOCHS + 1) batches (current + look-ahead) of SLOTS_PER_EPOCH
// blocks can be in the cache simultaneously. If this value is too small, pruneToMaxSize()
// will evict blocks from the batch being processed before they are persisted to the database,
// causing errors when async handlers like onForkChoiceFinalized run.
const MAX_BLOCK_INPUT_CACHE_SIZE = (MAX_LOOK_AHEAD_EPOCHS + 1) * SLOTS_PER_EPOCH;

export type SeenBlockInputCacheModules = {
  config: ChainForkConfig;
  clock: IClock;
  chainEvents: ChainEventEmitter;
  signal: AbortSignal;
  custodyConfig: CustodyConfig;
  serializedCache: SerializedCache;
  metrics: Metrics | null;
  logger?: Logger;
};

export type GetByBlobOptions = {
  throwErrorIfAlreadyKnown?: boolean;
};

/**
 * Consumers that create BlockInputs or change types of old BlockInputs
 *
 * - gossipHandlers (block and blob)
 * - beaconBlocksMaybeBlobsByRange
 * - unavailableBeaconBlobsByRoot (beaconBlocksMaybeBlobsByRoot)
 * - publishBlock in the beacon/blocks/index.ts API
 *   https://github.com/ChainSafe/lodestar/blob/unstable/packages/beacon-node/src/api/impl/beacon/blocks/index.ts#L62
 * - maybeValidateBlobs in verifyBlocksDataAvailability (is_data_available spec function)
 *   https://github.com/ChainSafe/lodestar/blob/unstable/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts#L111
 *
 *
 * Pruning management for SeenBlockInputCache
 * ------------------------------------------
 * There are four cases for how pruning needs to be handled
 * - Normal operation following head via gossip (and/or reqresp). For this situation the consumer (process pipeline or
 *   caller of processBlock) will call the `prune` method to remove any processed BlockInputs from the cache. This will
 *   also remove any ancestors of the processed BlockInput as that will also need to have been successfully processed
 *   for import to work correctly
 * - onFinalized event handler will help to prune any non-canonical forks once the chain finalizes. Any block-slots that
 *   are before the finalized checkpoint will be pruned.
 * - Range-sync periods.  The range process uses this cache to store and sync blocks with DA data as the chain is pulled
 *   from peers.  We pull batches, by epoch, so 32 slots are pulled at a time and several batches are downloaded
 *   concurrently (up to MAX_LOOK_AHEAD_EPOCHS ahead).  All downloaded blocks are added to this shared cache, so it
 *   must be large enough to hold blocks from all concurrent batches.  If pruneToMaxSize() evicts blocks from the batch
 *   currently being processed, those blocks may not yet be persisted to the database, causing getBlockByRoot() to fail
 *   when async event handlers (e.g. onForkChoiceFinalized) try to look them up.
 * - Non-Finality times.  This is a bit more tricky.  There can be long periods of non-finality and storing everything
 *   will cause OOM.  The pruneToMaxSize will help ensure the number of stored blocks (with DA) is trimmed back to
 *   MAX_BLOCK_INPUT_CACHE_SIZE after each prune() or onFinalized() call
 */

export class SeenBlockInput {
  private readonly config: ChainForkConfig;
  private readonly custodyConfig: CustodyConfig;
  private readonly clock: IClock;
  private readonly chainEvents: ChainEventEmitter;
  private readonly signal: AbortSignal;
  private readonly serializedCache: SerializedCache;
  private readonly metrics: Metrics | null;
  private readonly logger?: Logger;
  private blockInputs = new Map<RootHex, IBlockInput>();
  // using a Map of slot helps it more convenient to prune
  // there should only 1 block root per slot but we need to always compare against rootHex
  // and the signature to ensure we only skip verification if both match
  private verifiedProposerSignatures = new Map<Slot, Map<RootHex, BLSSignature>>();

  constructor({
    config,
    custodyConfig,
    clock,
    chainEvents,
    signal,
    serializedCache,
    metrics,
    logger,
  }: SeenBlockInputCacheModules) {
    this.config = config;
    this.custodyConfig = custodyConfig;
    this.clock = clock;
    this.chainEvents = chainEvents;
    this.signal = signal;
    this.serializedCache = serializedCache;
    this.metrics = metrics;
    this.logger = logger;

    if (metrics) {
      metrics.seenCache.blockInput.blockInputCount.addCollect(() => {
        metrics.seenCache.blockInput.blockInputCount.set(this.blockInputs.size);
        metrics.seenCache.blockInput.serializedObjectRefs.set(
          Array.from(this.blockInputs.values()).reduce(
            (count, blockInput) => count + blockInput.getSerializedCacheKeys().length,
            0
          )
        );
      });
    }

    this.chainEvents.on(ChainEvent.forkChoiceFinalized, this.onFinalized);
    this.signal.addEventListener("abort", () => {
      this.chainEvents.off(ChainEvent.forkChoiceFinalized, this.onFinalized);
    });
  }

  hasBlock(rootHex: RootHex): boolean {
    return this.blockInputs.get(rootHex)?.hasBlock() ?? false;
  }

  get(rootHex: RootHex): IBlockInput | undefined {
    return this.blockInputs.get(rootHex);
  }

  /**
   * Removes the single BlockInput from the cache
   */
  remove(rootHex: RootHex): void {
    const blockInput = this.blockInputs.get(rootHex);
    if (blockInput) {
      this.evictBlockInput(blockInput);
    }
  }

  /**
   * Removes a processed BlockInput from the cache and also removes any ancestors of processed blocks
   */
  prune(rootHex: RootHex): void {
    let blockInput = this.blockInputs.get(rootHex);
    let parentRootHex = blockInput?.parentRootHex;
    let deletedCount = 0;
    while (blockInput) {
      deletedCount++;
      this.evictBlockInput(blockInput);
      blockInput = this.blockInputs.get(parentRootHex ?? "");
      parentRootHex = blockInput?.parentRootHex;
    }
    this.logger?.debug("BlockInputCache.prune deleted cached BlockInputs", {deletedCount});
    this.pruneToMaxSize();
  }

  onFinalized = (checkpoint: CheckpointWithHex) => {
    let deletedCount = 0;
    const cutoffSlot = computeStartSlotAtEpoch(checkpoint.epoch);
    for (const [, blockInput] of this.blockInputs) {
      if (blockInput.slot < cutoffSlot) {
        deletedCount++;
        this.evictBlockInput(blockInput);
      }
    }
    this.logger?.debug("BlockInputCache.onFinalized deleted cached BlockInputs", {deletedCount});
    this.pruneToMaxSize();
  };

  getByBlock({blockRootHex, block, source, seenTimestampSec, peerIdStr}: BlockWithSource): BlockInput {
    // TODO(peerDAS): Why is it necessary to static cast this here. All conditional paths result in a valid value so should be defined correctly below
    let blockInput = this.blockInputs.get(blockRootHex) as IBlockInput;
    if (!blockInput) {
      const {forkName, daOutOfRange} = this.buildCommonProps(block.message.slot);

      if (isForkPostGloas(forkName)) {
        // Post-gloas
        blockInput = BlockInputNoData.createFromBlock({
          block: block as SignedBeaconBlock<ForkPostGloas>,
          blockRootHex,
          daOutOfRange,
          forkName,
          source,
          seenTimestampSec,
          peerIdStr,
        });
      } else if (!isForkPostDeneb(forkName)) {
        // Pre-deneb
        blockInput = BlockInputPreData.createFromBlock({
          block,
          blockRootHex,
          daOutOfRange,
          forkName,
          source,
          seenTimestampSec,
          peerIdStr,
        });
      } else if (isForkPostFulu(forkName)) {
        // Fulu Only
        blockInput = BlockInputColumns.createFromBlock({
          block: block as SignedBeaconBlock<ForkPostFulu & ForkPreGloas>,
          blockRootHex,
          daOutOfRange,
          forkName,
          custodyColumns: this.custodyConfig.custodyColumns,
          sampledColumns: this.custodyConfig.sampledColumns,
          source,
          seenTimestampSec,
          peerIdStr,
        });
      } else {
        // Deneb and Electra
        blockInput = BlockInputBlobs.createFromBlock({
          block: block as SignedBeaconBlock<ForkBlobsDA>,
          blockRootHex,
          daOutOfRange,
          forkName,
          source,
          seenTimestampSec,
          peerIdStr,
        });
      }
      this.metrics?.seenCache.blockInput.createdByBlock.inc();
      this.blockInputs.set(blockInput.blockRootHex, blockInput);
    }

    if (!blockInput.hasBlock()) {
      blockInput.addBlock({block, blockRootHex, source, seenTimestampSec, peerIdStr});
    } else {
      this.logger?.debug("Attempt to cache block but is already cached on BlockInput", blockInput.getLogMeta());
      this.metrics?.seenCache.blockInput.duplicateBlockCount.inc({source});
    }

    return blockInput as BlockInput;
  }

  getByBlob(
    {
      blockRootHex,
      blobSidecar,
      source,
      seenTimestampSec,
      peerIdStr,
    }: SourceMeta & {blockRootHex: RootHex; blobSidecar: deneb.BlobSidecar},
    opts: GetByBlobOptions = {}
  ): BlockInputBlobs {
    // TODO(peerDAS): Why is it necessary to static cast this here. All conditional paths result in a valid value so should be defined correctly below
    let blockInput = this.blockInputs.get(blockRootHex) as IBlockInput;
    let created = false;
    if (!blockInput) {
      created = true;
      const {forkName, daOutOfRange} = this.buildCommonProps(blobSidecar.signedBlockHeader.message.slot);
      blockInput = BlockInputBlobs.createFromBlob({
        blobSidecar,
        blockRootHex,
        daOutOfRange,
        forkName,
        source,
        seenTimestampSec,
        peerIdStr,
      });
      this.metrics?.seenCache.blockInput.createdByBlob.inc();
      this.blockInputs.set(blockRootHex, blockInput);
    }

    if (!isBlockInputBlobs(blockInput)) {
      throw new SeenBlockInputCacheError(
        {
          code: SeenBlockInputCacheErrorCode.WRONG_BLOCK_INPUT_TYPE,
          cachedType: blockInput.type,
          requestedType: DAType.Blobs,
          ...blockInput.getLogMeta(),
        },
        `BlockInputType mismatch adding blobIndex=${blobSidecar.index}`
      );
    }

    if (!blockInput.hasBlob(blobSidecar.index)) {
      blockInput.addBlob({blobSidecar, blockRootHex, source, seenTimestampSec, peerIdStr});
    } else if (!created) {
      this.logger?.debug(
        `Attempt to cache blob index #${blobSidecar.index} but is already cached on BlockInput`,
        blockInput.getLogMeta()
      );
      this.metrics?.seenCache.blockInput.duplicateBlobCount.inc({source});
      if (opts.throwErrorIfAlreadyKnown) {
        throw new SeenBlockInputCacheError({
          code: SeenBlockInputCacheErrorCode.GOSSIP_BLOB_ALREADY_KNOWN,
          ...blockInput.getLogMeta(),
        });
      }
    }

    return blockInput;
  }

  getByColumn(
    {
      blockRootHex,
      columnSidecar,
      seenTimestampSec,
      source,
      peerIdStr,
    }: SourceMeta & {blockRootHex: RootHex; columnSidecar: fulu.DataColumnSidecar},
    opts: GetByBlobOptions = {}
  ): BlockInputColumns {
    let blockInput = this.blockInputs.get(blockRootHex);
    let created = false;
    if (!blockInput) {
      created = true;
      const {forkName, daOutOfRange} = this.buildCommonProps(columnSidecar.signedBlockHeader.message.slot);
      blockInput = BlockInputColumns.createFromColumn({
        columnSidecar,
        blockRootHex,
        daOutOfRange,
        forkName,
        source,
        seenTimestampSec,
        peerIdStr,
        custodyColumns: this.custodyConfig.custodyColumns,
        sampledColumns: this.custodyConfig.sampledColumns,
      });
      this.metrics?.seenCache.blockInput.createdByColumn.inc();
      this.blockInputs.set(blockRootHex, blockInput);
    }

    if (!isBlockInputColumns(blockInput)) {
      throw new SeenBlockInputCacheError(
        {
          code: SeenBlockInputCacheErrorCode.WRONG_BLOCK_INPUT_TYPE,
          cachedType: blockInput.type,
          requestedType: DAType.Columns,
          ...blockInput.getLogMeta(),
        },
        `BlockInputType mismatch adding columnIndex=${columnSidecar.index}`
      );
    }

    if (!blockInput.hasColumn(columnSidecar.index)) {
      blockInput.addColumn({columnSidecar, blockRootHex, source, seenTimestampSec, peerIdStr});
    } else if (!created) {
      this.logger?.debug(
        `Attempt to cache column index #${columnSidecar.index} but is already cached on BlockInput`,
        blockInput.getLogMeta()
      );
      this.metrics?.seenCache.blockInput.duplicateColumnCount.inc({source});
      if (opts.throwErrorIfAlreadyKnown) {
        throw new SeenBlockInputCacheError({
          code: SeenBlockInputCacheErrorCode.GOSSIP_COLUMN_ALREADY_KNOWN,
          ...blockInput.getLogMeta(),
        });
      }
    }

    return blockInput;
  }

  /**
   * Check if a proposer signature has already been verified for this slot and block root.
   */
  isVerifiedProposerSignature(slot: Slot, blockRootHex: RootHex, signature: BLSSignature): boolean {
    const seenMap = this.verifiedProposerSignatures.get(slot);
    const cachedSignature = seenMap?.get(blockRootHex);
    if (!cachedSignature) {
      return false;
    }
    // Only consider verified if the signature matches
    return byteArrayEquals(cachedSignature, signature);
  }

  /**
   * Mark that the proposer signature for this slot and block root has been verified
   * so that we only verify it once per slot
   */
  markVerifiedProposerSignature(slot: Slot, blockRootHex: RootHex, signature: BLSSignature): void {
    let seenMap = this.verifiedProposerSignatures.get(slot);
    if (!seenMap) {
      seenMap = new Map<RootHex, BLSSignature>();
      this.verifiedProposerSignatures.set(slot, seenMap);
    }
    seenMap.set(blockRootHex, signature);
  }

  private buildCommonProps(slot: Slot): {
    daOutOfRange: boolean;
    forkName: ForkName;
  } {
    const forkName = this.config.getForkName(slot);
    return {
      forkName,
      daOutOfRange: isDaOutOfRange(this.config, forkName, slot, this.clock.currentEpoch),
    };
  }

  /**
   * Use custom implementation of pruneSetToMax to allow for sorting by slot
   * and deleting via key/rootHex
   */
  private pruneToMaxSize() {
    let itemsToDelete = this.blockInputs.size - MAX_BLOCK_INPUT_CACHE_SIZE;

    if (itemsToDelete > 0) {
      const sorted = [...this.blockInputs.entries()].sort((a, b) => a[1].slot - b[1].slot);
      for (const [, blockInput] of sorted) {
        this.evictBlockInput(blockInput);
        itemsToDelete--;
        if (itemsToDelete <= 0) return;
      }
    }
    pruneSetToMax(this.verifiedProposerSignatures, MAX_BLOCK_INPUT_CACHE_SIZE);
  }

  private evictBlockInput(blockInput: IBlockInput): void {
    // Without forcefully clearing this cache, we would rely on WeakMap to evict memory which is not reliable
    this.serializedCache.delete(blockInput.getSerializedCacheKeys());
    this.blockInputs.delete(blockInput.blockRootHex);
  }
}

enum SeenBlockInputCacheErrorCode {
  WRONG_BLOCK_INPUT_TYPE = "BLOCK_INPUT_CACHE_ERROR_WRONG_BLOCK_INPUT_TYPE",
  GOSSIP_BLOB_ALREADY_KNOWN = "BLOCK_INPUT_CACHE_ERROR_GOSSIP_BLOB_ALREADY_KNOWN",
  GOSSIP_COLUMN_ALREADY_KNOWN = "BLOCK_INPUT_CACHE_ERROR_GOSSIP_COLUMN_ALREADY_KNOWN",
}

type SeenBlockInputCacheErrorType =
  | (LogMetaBasic & {
      code: SeenBlockInputCacheErrorCode.WRONG_BLOCK_INPUT_TYPE;
      cachedType: DAType;
      requestedType: DAType;
    })
  | (LogMetaBlobs & {
      code: SeenBlockInputCacheErrorCode.GOSSIP_BLOB_ALREADY_KNOWN;
    })
  | (LogMetaColumns & {
      code: SeenBlockInputCacheErrorCode.GOSSIP_COLUMN_ALREADY_KNOWN;
    });

class SeenBlockInputCacheError extends LodestarError<SeenBlockInputCacheErrorType> {}
