import {ChainConfig, ChainForkConfig} from "@lodestar/config";
import {
  KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH,
  KZG_COMMITMENTS_SUBTREE_INDEX,
  NUMBER_OF_COLUMNS,
} from "@lodestar/params";
import {
  computeEpochAtSlot,
  computeStartSlotAtEpoch,
  getBlockHeaderProposerSignatureSetByHeaderSlot,
  getBlockHeaderProposerSignatureSetByParentStateSlot,
} from "@lodestar/state-transition";
import {DataColumnSidecar, Root, Slot, SubnetID, fulu, gloas, ssz} from "@lodestar/types";
import {byteArrayEquals, toRootHex, verifyMerkleBranch} from "@lodestar/utils";
import {BeaconMetrics} from "../../metrics/metrics/beacon.js";
import {Metrics} from "../../metrics/metrics.js";
import {getDataColumnSidecarSlot} from "../../util/dataColumns.js";
import {kzg} from "../../util/kzg.js";
import {PayloadEnvelopeInput} from "../blocks/payloadEnvelopeInput/index.js";
import {
  DataColumnSidecarErrorCode,
  DataColumnSidecarGossipError,
  DataColumnSidecarValidationError,
} from "../errors/dataColumnSidecarError.js";
import {GossipAction} from "../errors/gossipValidation.js";
import {IBeaconChain} from "../interface.js";
import {RegenCaller} from "../regen/interface.js";

// SPEC FUNCTION
// https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.4/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
export async function validateGossipFuluDataColumnSidecar(
  chain: IBeaconChain,
  dataColumnSidecar: fulu.DataColumnSidecar,
  gossipSubnet: SubnetID,
  metrics: Metrics | null
): Promise<void> {
  const blockHeader = dataColumnSidecar.signedBlockHeader.message;
  const blockRootHex = toRootHex(ssz.phase0.BeaconBlockHeader.hashTreeRoot(blockHeader));

  // 1) [REJECT] The sidecar is valid as verified by verify_data_column_sidecar
  verifyFuluDataColumnSidecar(chain.config, dataColumnSidecar);

  // 2) [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id
  if (computeSubnetForDataColumnSidecar(chain.config, dataColumnSidecar) !== gossipSubnet) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_SUBNET,
      columnIndex: dataColumnSidecar.index,
      gossipSubnet: gossipSubnet,
    });
  }

  // 3) [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
  //             -- i.e. validate that sidecar.slot <= current_slot (a client MAY queue future blocks
  //             for processing at the appropriate slot).
  const currentSlotWithGossipDisparity = chain.clock.currentSlotWithGossipDisparity;
  if (currentSlotWithGossipDisparity < blockHeader.slot) {
    throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
      code: DataColumnSidecarErrorCode.FUTURE_SLOT,
      currentSlot: currentSlotWithGossipDisparity,
      blockSlot: blockHeader.slot,
    });
  }

  // 4) [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- i.e. validate that
  //             sidecar.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)
  const finalizedCheckpoint = chain.forkChoice.getFinalizedCheckpoint();
  const finalizedSlot = computeStartSlotAtEpoch(finalizedCheckpoint.epoch);
  if (blockHeader.slot <= finalizedSlot) {
    throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
      code: DataColumnSidecarErrorCode.WOULD_REVERT_FINALIZED_SLOT,
      blockSlot: blockHeader.slot,
      finalizedSlot,
    });
  }

  // 6) [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via gossip
  //             or non-gossip sources)
  const parentRoot = toRootHex(blockHeader.parentRoot);
  const parentBlock = chain.forkChoice.getBlockHexDefaultStatus(parentRoot);
  if (parentBlock === null) {
    // If fork choice does *not* consider the parent to be a descendant of the finalized block,
    // then there are two more cases:
    //
    // 1. We have the parent stored in our database. Because fork-choice has confirmed the
    //    parent is *not* in our post-finalization DAG, all other blocks must be either
    //    pre-finalization or conflicting with finalization.
    // 2. The parent is unknown to us, we probably want to download it since it might actually
    //    descend from the finalized root.
    // (Non-Lighthouse): Since we prune all blocks non-descendant from finalized checking the `db.block` database won't be useful to guard
    // against known bad fork blocks, so we throw PARENT_UNKNOWN for cases (1) and (2)
    throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
      code: DataColumnSidecarErrorCode.PARENT_UNKNOWN,
      parentRoot,
      slot: blockHeader.slot,
    });
  }

  // 8) [REJECT] The sidecar is from a higher slot than the sidecar's block's parent
  if (parentBlock.slot >= blockHeader.slot) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.NOT_LATER_THAN_PARENT,
      parentSlot: parentBlock.slot,
      slot: blockHeader.slot,
    });
  }

  // getBlockSlotState also checks for whether the current finalized checkpoint is an ancestor of the block.
  // As a result, we throw an IGNORE (whereas the spec says we should REJECT for this scenario).
  // this is something we should change this in the future to make the code airtight to the spec.
  // 7) [REJECT] The sidecar's block's parent passes validation.
  const blockState = await chain.regen
    .getBlockSlotState(parentBlock, blockHeader.slot, {dontTransferCache: true}, RegenCaller.validateGossipDataColumn)
    .catch(() => {
      throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
        code: DataColumnSidecarErrorCode.PARENT_UNKNOWN,
        parentRoot,
        slot: blockHeader.slot,
      });
    });

  // 13) [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current
  //              shuffling (defined by block_header.parent_root/block_header.slot). If the proposer_index cannot
  //              immediately be verified against the expected shuffling, the sidecar MAY be queued for later processing
  //              while proposers for the block's branch are calculated -- in such a case do not REJECT, instead IGNORE
  //              this message.
  const proposerIndex = blockHeader.proposerIndex;
  const expectedProposerIndex = blockState.getBeaconProposer(blockHeader.slot);

  if (proposerIndex !== expectedProposerIndex) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INCORRECT_PROPOSER,
      actualProposerIndex: proposerIndex,
      expectedProposerIndex,
    });
  }

  // 5) [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
  const signature = dataColumnSidecar.signedBlockHeader.signature;
  if (!chain.seenBlockInputCache.isVerifiedProposerSignature(blockHeader.slot, blockRootHex, signature)) {
    const signatureSet = getBlockHeaderProposerSignatureSetByParentStateSlot(
      chain.config,
      blockState.slot,
      dataColumnSidecar.signedBlockHeader
    );

    if (
      !(await chain.bls.verifySignatureSets([signatureSet], {
        // verify on main thread so that we only need to verify block proposer signature once per block
        verifyOnMainThread: true,
      }))
    ) {
      throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
        code: DataColumnSidecarErrorCode.PROPOSAL_SIGNATURE_INVALID,
        blockRoot: blockRootHex,
        index: dataColumnSidecar.index,
        slot: blockHeader.slot,
      });
    }

    chain.seenBlockInputCache.markVerifiedProposerSignature(blockHeader.slot, blockRootHex, signature);
  }

  // 9) [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block
  //             -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch)
  //                     == store.finalized_checkpoint.root
  // Handled by 7)

  // 10) [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by
  //              verify_data_column_sidecar_inclusion_proof
  //              TODO: Can cache result on (commitments, proof, header) in the future
  const timer = metrics?.peerDas.dataColumnSidecarInclusionProofVerificationTime.startTimer();
  const valid = verifyDataColumnSidecarInclusionProof(dataColumnSidecar);
  timer?.();

  if (!valid) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID,
      slot: dataColumnSidecar.signedBlockHeader.message.slot,
      columnIndex: dataColumnSidecar.index,
    });
  }

  // single data column is being verified here
  const kzgProofTimer = metrics?.peerDas.dataColumnSidecarKzgProofsVerificationTime.startTimer();
  // 11) [REJECT] The sidecar's column data is valid as verified by verify_data_column_sidecar_kzg_proofs
  try {
    await verifyDataColumnSidecarKzgProofs(
      dataColumnSidecar.kzgCommitments,
      Array.from({length: dataColumnSidecar.column.length}, () => dataColumnSidecar.index),
      dataColumnSidecar.column,
      dataColumnSidecar.kzgProofs
    );
  } catch {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_KZG_PROOF,
      slot: blockHeader.slot,
      columnIndex: dataColumnSidecar.index,
    });
  } finally {
    kzgProofTimer?.();
  }

  // 12) [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index,
  //              sidecar.index) with valid header signature, sidecar inclusion proof, and kzg proof
  //              -- Handled in seenGossipBlockInput
}

// SPEC FUNCTION
// https://github.com/ethereum/consensus-specs/blob/v1.7.0-alpha.3/specs/gloas/p2p-interface.md#data_column_sidecar_subnet_id
export async function validateGossipGloasDataColumnSidecar(
  chain: IBeaconChain,
  payloadInput: PayloadEnvelopeInput,
  dataColumnSidecar: gloas.DataColumnSidecar,
  gossipSubnet: SubnetID,
  metrics: Metrics | null
): Promise<void> {
  const blockRootHex = toRootHex(dataColumnSidecar.beaconBlockRoot);
  const block = chain.forkChoice.getBlockHexDefaultStatus(blockRootHex);

  // [IGNORE] A valid block for the sidecar's `slot` has been seen.
  if (block === null) {
    throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
      code: DataColumnSidecarErrorCode.BLOCK_UNKNOWN,
      blockRoot: blockRootHex,
      slot: dataColumnSidecar.slot,
    });
  }

  // [REJECT] The sidecar slot matches the slot of the block with root beacon_block_root.
  if (block.slot !== dataColumnSidecar.slot) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INCORRECT_SIDECAR_SLOT,
      columnIndex: dataColumnSidecar.index,
      expected: block.slot,
      actual: dataColumnSidecar.slot,
    });
  }

  // [REJECT] The sidecar must pass verify_data_column_sidecar against the block commitments
  const kzgCommitments = payloadInput.getBlobKzgCommitments();
  verifyGloasDataColumnSidecar(dataColumnSidecar, kzgCommitments);

  // [REJECT] The sidecar must be on the correct subnet
  if (computeSubnetForDataColumnSidecar(chain.config, dataColumnSidecar) !== gossipSubnet) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_SUBNET,
      columnIndex: dataColumnSidecar.index,
      gossipSubnet,
    });
  }

  // [REJECT] The sidecar kzg proofs must verify
  const kzgProofTimer = metrics?.peerDas.dataColumnSidecarKzgProofsVerificationTime.startTimer();
  try {
    await verifyDataColumnSidecarKzgProofs(
      kzgCommitments,
      Array.from({length: dataColumnSidecar.column.length}, () => dataColumnSidecar.index),
      dataColumnSidecar.column,
      dataColumnSidecar.kzgProofs
    );
  } catch {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_KZG_PROOF,
      slot: dataColumnSidecar.slot,
      columnIndex: dataColumnSidecar.index,
    });
  } finally {
    kzgProofTimer?.();
  }
}

/**
 * SPEC FUNCTION
 * https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.4/specs/fulu/p2p-interface.md#verify_data_column_sidecar
 */
function verifyFuluDataColumnSidecar(config: ChainForkConfig, dataColumnSidecar: fulu.DataColumnSidecar): void {
  if (dataColumnSidecar.index >= NUMBER_OF_COLUMNS) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_INDEX,
      slot: dataColumnSidecar.signedBlockHeader.message.slot,
      columnIndex: dataColumnSidecar.index,
    });
  }

  if (dataColumnSidecar.kzgCommitments.length === 0) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.NO_COMMITMENTS,
      slot: dataColumnSidecar.signedBlockHeader.message.slot,
      columnIndex: dataColumnSidecar.index,
    });
  }

  const epoch = computeEpochAtSlot(dataColumnSidecar.signedBlockHeader.message.slot);
  const maxBlobsPerBlock = config.getMaxBlobsPerBlock(epoch);

  if (dataColumnSidecar.kzgCommitments.length > maxBlobsPerBlock) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.TOO_MANY_KZG_COMMITMENTS,
      slot: dataColumnSidecar.signedBlockHeader.message.slot,
      columnIndex: dataColumnSidecar.index,
      count: dataColumnSidecar.kzgCommitments.length,
      limit: maxBlobsPerBlock,
    });
  }

  if (
    dataColumnSidecar.column.length !== dataColumnSidecar.kzgCommitments.length ||
    dataColumnSidecar.column.length !== dataColumnSidecar.kzgProofs.length
  ) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.MISMATCHED_LENGTHS,
      columnLength: dataColumnSidecar.column.length,
      commitmentsLength: dataColumnSidecar.kzgCommitments.length,
      proofsLength: dataColumnSidecar.kzgProofs.length,
    });
  }
}

/**
 * SPEC FUNCTION
 * https://github.com/ethereum/consensus-specs/blob/v1.7.0-alpha.3/specs/gloas/p2p-interface.md#modified-verify_data_column_sidecar
 */
function verifyGloasDataColumnSidecar(dataColumnSidecar: gloas.DataColumnSidecar, kzgCommitments: Uint8Array[]): void {
  const slot = getDataColumnSidecarSlot(dataColumnSidecar);
  if (dataColumnSidecar.index >= NUMBER_OF_COLUMNS) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.INVALID_INDEX,
      slot,
      columnIndex: dataColumnSidecar.index,
    });
  }

  if (dataColumnSidecar.column.length === 0) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.NO_COMMITMENTS,
      slot,
      columnIndex: dataColumnSidecar.index,
    });
  }

  if (
    dataColumnSidecar.column.length !== kzgCommitments.length ||
    dataColumnSidecar.column.length !== dataColumnSidecar.kzgProofs.length
  ) {
    throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
      code: DataColumnSidecarErrorCode.MISMATCHED_LENGTHS,
      columnLength: dataColumnSidecar.column.length,
      commitmentsLength: kzgCommitments.length,
      proofsLength: dataColumnSidecar.kzgProofs.length,
    });
  }
}

/**
 * SPEC FUNCTION
 * https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.4/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs
 */
export async function verifyDataColumnSidecarKzgProofs(
  commitments: Uint8Array[],
  cellIndices: number[],
  cells: Uint8Array[],
  proofs: Uint8Array[]
): Promise<void> {
  let valid: boolean;
  try {
    valid = await kzg.asyncVerifyCellKzgProofBatch(commitments, cellIndices, cells, proofs);
  } catch (e) {
    (e as Error).message = `Error on asyncVerifyCellKzgProofBatch: ${(e as Error).message}`;
    throw e;
  }
  if (!valid) {
    throw Error("Invalid verifyCellKzgProofBatch");
  }
}

/**
 * SPEC FUNCTION
 * https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.4/specs/fulu/p2p-interface.md#verify_data_column_sidecar_inclusion_proof
 */
export function verifyDataColumnSidecarInclusionProof(dataColumnSidecar: fulu.DataColumnSidecar): boolean {
  return verifyMerkleBranch(
    ssz.deneb.BlobKzgCommitments.hashTreeRoot(dataColumnSidecar.kzgCommitments),
    dataColumnSidecar.kzgCommitmentsInclusionProof,
    KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH,
    KZG_COMMITMENTS_SUBTREE_INDEX,
    dataColumnSidecar.signedBlockHeader.message.bodyRoot
  );
}

/**
 * Validate a subset of fulu data column sidecars against a block
 *
 * Requires the block to be known to the node
 *
 * NOTE: chain is optional to skip signature verification. Helpful for testing purposes and so that can control whether
 * signature gets checked depending on the reqresp method that is being checked
 */
export async function validateFuluBlockDataColumnSidecars(
  chain: IBeaconChain | null,
  blockSlot: Slot,
  blockRoot: Root,
  blockBlobCount: number,
  dataColumnSidecars: fulu.DataColumnSidecar[],
  metrics?: BeaconMetrics["peerDas"] | null
): Promise<void> {
  metrics?.dataColumnSidecarProcessingRequests.inc(dataColumnSidecars.length);
  const verificationTimer = metrics?.dataColumnSidecarGossipVerificationTime.startTimer();
  try {
    if (dataColumnSidecars.length === 0) {
      return;
    }

    if (blockBlobCount === 0) {
      throw new DataColumnSidecarValidationError(
        {
          code: DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT,
          slot: blockSlot,
          expected: 0,
          actual: dataColumnSidecars.length,
        },
        "Block has no blob commitments but data column sidecars were provided"
      );
    }
    // Hash the first sidecar block header and compare the rest via (cheaper) equality
    const firstSidecarSignedBlockHeader = dataColumnSidecars[0].signedBlockHeader;
    const firstSidecarBlockHeader = firstSidecarSignedBlockHeader.message;
    const firstBlockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(firstSidecarBlockHeader);
    if (!byteArrayEquals(blockRoot, firstBlockRoot)) {
      throw new DataColumnSidecarValidationError(
        {
          code: DataColumnSidecarErrorCode.INCORRECT_BLOCK,
          slot: blockSlot,
          columnIndex: 0,
          expected: toRootHex(blockRoot),
          actual: toRootHex(firstBlockRoot),
        },
        "DataColumnSidecar doesn't match corresponding block"
      );
    }

    if (chain !== null) {
      const rootHex = toRootHex(blockRoot);
      const slot = firstSidecarSignedBlockHeader.message.slot;
      const signature = firstSidecarSignedBlockHeader.signature;
      if (!chain.seenBlockInputCache.isVerifiedProposerSignature(slot, rootHex, signature)) {
        const signatureSet = getBlockHeaderProposerSignatureSetByHeaderSlot(
          chain.config,
          firstSidecarSignedBlockHeader
        );

        if (
          !(await chain.bls.verifySignatureSets([signatureSet], {
            verifyOnMainThread: true,
          }))
        ) {
          throw new DataColumnSidecarValidationError({
            code: DataColumnSidecarErrorCode.PROPOSAL_SIGNATURE_INVALID,
            blockRoot: rootHex,
            slot: blockSlot,
            index: dataColumnSidecars[0].index,
          });
        }

        chain.seenBlockInputCache.markVerifiedProposerSignature(slot, rootHex, signature);
      }
    }

    const commitments: Uint8Array[] = [];
    const cellIndices: number[] = [];
    const cells: Uint8Array[] = [];
    const proofs: Uint8Array[] = [];
    for (let i = 0; i < dataColumnSidecars.length; i++) {
      const columnSidecar = dataColumnSidecars[i];

      if (
        i !== 0 &&
        !ssz.phase0.SignedBeaconBlockHeader.equals(firstSidecarSignedBlockHeader, columnSidecar.signedBlockHeader)
      ) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_HEADER_ROOT,
          slot: blockSlot,
          expected: toRootHex(blockRoot),
          actual: toRootHex(ssz.phase0.BeaconBlockHeader.hashTreeRoot(columnSidecar.signedBlockHeader.message)),
        });
      }

      if (columnSidecar.index >= NUMBER_OF_COLUMNS) {
        throw new DataColumnSidecarValidationError(
          {
            code: DataColumnSidecarErrorCode.INVALID_INDEX,
            slot: blockSlot,
            columnIndex: columnSidecar.index,
          },
          "DataColumnSidecar has invalid index"
        );
      }

      if (columnSidecar.column.length !== blockBlobCount) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_CELL_COUNT,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: blockBlobCount,
          actual: columnSidecar.column.length,
        });
      }

      if (columnSidecar.column.length !== columnSidecar.kzgCommitments.length) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: columnSidecar.column.length,
          actual: columnSidecar.kzgCommitments.length,
        });
      }

      if (columnSidecar.column.length !== columnSidecar.kzgProofs.length) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: columnSidecar.column.length,
          actual: columnSidecar.kzgProofs.length,
        });
      }

      const inclusionProofTimer = metrics?.dataColumnSidecarInclusionProofVerificationTime.startTimer();
      const validInclusionProof = verifyDataColumnSidecarInclusionProof(columnSidecar);
      inclusionProofTimer?.();
      if (!validInclusionProof) {
        throw new DataColumnSidecarValidationError(
          {
            code: DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID,
            slot: blockSlot,
            columnIndex: columnSidecar.index,
          },
          "DataColumnSidecar has invalid inclusion proof"
        );
      }

      commitments.push(...columnSidecar.kzgCommitments);
      cellIndices.push(...Array.from({length: columnSidecar.column.length}, () => columnSidecar.index));
      cells.push(...columnSidecar.column);
      proofs.push(...columnSidecar.kzgProofs);
    }

    let reason: string | undefined;
    // batch verification for the cases: downloadByRange and downloadByRoot
    const kzgVerificationTimer = metrics?.kzgVerificationDataColumnBatchTime.startTimer();
    try {
      const valid = await kzg.asyncVerifyCellKzgProofBatch(commitments, cellIndices, cells, proofs);
      if (!valid) {
        reason = "Invalid KZG proof batch";
      }
    } catch (e) {
      reason = (e as Error).message;
    } finally {
      kzgVerificationTimer?.();
    }
    if (reason !== undefined) {
      throw new DataColumnSidecarValidationError(
        {
          code: DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH,
          slot: blockSlot,
          reason,
        },
        "DataColumnSidecar has invalid KZG proof batch"
      );
    }

    metrics?.dataColumnSidecarProcessingSuccesses.inc();
  } finally {
    verificationTimer?.();
  }
}

/**
 * Validate a subset of gloas data column sidecars against a block
 * Gloas sidecars don't carry signed block headers, kzg commitments, or inclusion proofs
 */
export async function validateGloasBlockDataColumnSidecars(
  blockSlot: Slot,
  blockRoot: Root,
  blockKzgCommitments: Uint8Array[],
  dataColumnSidecars: gloas.DataColumnSidecar[],
  metrics?: BeaconMetrics["peerDas"] | null
): Promise<void> {
  metrics?.dataColumnSidecarProcessingRequests.inc(dataColumnSidecars.length);
  const verificationTimer = metrics?.dataColumnSidecarGossipVerificationTime.startTimer();
  try {
    if (dataColumnSidecars.length === 0) {
      return;
    }

    if (blockKzgCommitments.length === 0) {
      throw new DataColumnSidecarValidationError(
        {
          code: DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT,
          slot: blockSlot,
          expected: 0,
          actual: dataColumnSidecars.length,
        },
        "Block has no blob commitments but data column sidecars were provided"
      );
    }

    const commitments: Uint8Array[] = [];
    const cellIndices: number[] = [];
    const cells: Uint8Array[] = [];
    const proofs: Uint8Array[] = [];
    for (const columnSidecar of dataColumnSidecars) {
      if (columnSidecar.slot !== blockSlot) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_SIDECAR_SLOT,
          columnIndex: columnSidecar.index,
          expected: blockSlot,
          actual: columnSidecar.slot,
        });
      }

      if (!byteArrayEquals(columnSidecar.beaconBlockRoot, blockRoot)) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_BLOCK,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: toRootHex(blockRoot),
          actual: toRootHex(columnSidecar.beaconBlockRoot),
        });
      }

      if (columnSidecar.index >= NUMBER_OF_COLUMNS) {
        throw new DataColumnSidecarValidationError(
          {
            code: DataColumnSidecarErrorCode.INVALID_INDEX,
            slot: blockSlot,
            columnIndex: columnSidecar.index,
          },
          "DataColumnSidecar has invalid index"
        );
      }

      if (columnSidecar.column.length !== blockKzgCommitments.length) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_CELL_COUNT,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: blockKzgCommitments.length,
          actual: columnSidecar.column.length,
        });
      }

      if (columnSidecar.column.length !== columnSidecar.kzgProofs.length) {
        throw new DataColumnSidecarValidationError({
          code: DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT,
          slot: blockSlot,
          columnIndex: columnSidecar.index,
          expected: columnSidecar.column.length,
          actual: columnSidecar.kzgProofs.length,
        });
      }

      commitments.push(...blockKzgCommitments);
      cellIndices.push(...Array.from({length: columnSidecar.column.length}, () => columnSidecar.index));
      cells.push(...columnSidecar.column);
      proofs.push(...columnSidecar.kzgProofs);
    }

    let reason: string | undefined;
    // batch verification for the cases: downloadByRange and downloadByRoot
    const kzgVerificationTimer = metrics?.kzgVerificationDataColumnBatchTime.startTimer();
    try {
      const valid = await kzg.asyncVerifyCellKzgProofBatch(commitments, cellIndices, cells, proofs);
      if (!valid) {
        reason = "Invalid KZG proof batch";
      }
    } catch (e) {
      reason = (e as Error).message;
    } finally {
      kzgVerificationTimer?.();
    }
    if (reason !== undefined) {
      throw new DataColumnSidecarValidationError(
        {
          code: DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH,
          slot: blockSlot,
          reason,
        },
        "DataColumnSidecar has invalid KZG proof batch"
      );
    }

    metrics?.dataColumnSidecarProcessingSuccesses.inc();
  } finally {
    verificationTimer?.();
  }
}

/**
 * SPEC FUNCTION
 * https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.4/specs/fulu/p2p-interface.md#compute_subnet_for_data_column_sidecar
 */
export function computeSubnetForDataColumnSidecar(config: ChainConfig, columnSidecar: DataColumnSidecar): SubnetID {
  return columnSidecar.index % config.DATA_COLUMN_SIDECAR_SUBNET_COUNT;
}
