import {ChainForkConfig} from "@lodestar/config";
import {Epoch, Root, Slot, gloas} from "@lodestar/types";
import {ErrorAborted, LodestarError, Logger, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {isBlockInputBlobs, isBlockInputColumns} from "../../chain/blocks/blockInput/blockInput.js";
import {BlockInputErrorCode} from "../../chain/blocks/blockInput/errors.js";
import {IBlockInput} from "../../chain/blocks/blockInput/types.js";
import {PayloadEnvelopeInput} from "../../chain/blocks/payloadEnvelopeInput/payloadEnvelopeInput.js";
import {BlobSidecarErrorCode} from "../../chain/errors/blobSidecarError.js";
import {DataColumnSidecarErrorCode} from "../../chain/errors/dataColumnSidecarError.js";
import {Metrics} from "../../metrics/metrics.js";
import {PeerAction, prettyPrintPeerIdStr} from "../../network/index.js";
import {PeerSyncMeta} from "../../network/peers/peersData.js";
import {IClock} from "../../util/clock.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {ItTrigger} from "../../util/itTrigger.js";
import {PeerIdStr} from "../../util/peerId.js";
import {WarnResult, wrapError} from "../../util/wrapError.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js";
import {DownloadByRangeError, DownloadByRangeErrorCode} from "../utils/downloadByRange.js";
import {getRateLimitedUntilMs} from "../utils/rateLimit.js";
import {RangeSyncType} from "../utils/remoteSyncType.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js";
import {
  ChainPeersBalancer,
  PeerSyncInfo,
  batchStartEpochIsAfterSlot,
  computeHighestTarget,
  getBatchSlotRange,
  getNextBatchToProcess,
  isSyncChainDone,
  toArr,
  toBeDownloadedStartEpoch,
  validateBatchesStatus,
} from "./utils/index.js";

export type SyncChainModules = {
  config: ChainForkConfig;
  clock: IClock;
  custodyConfig: CustodyConfig;
  logger: Logger;
  metrics: Metrics | null;
};

export type SyncChainFns = {
  /**
   * Must return if ALL blocks are processed successfully
   * If SOME blocks are processed must throw BlockProcessorError()
   */
  processChainSegment: (
    blocks: IBlockInput[],
    payloadEnvelopes: Map<Slot, PayloadEnvelopeInput> | null,
    syncType: RangeSyncType
  ) => Promise<void>;
  /** Must download blocks, and validate their range */
  downloadByRange: (
    peer: PeerSyncMeta,
    batch: Batch,
    syncType: RangeSyncType
  ) => Promise<
    WarnResult<{blocks: IBlockInput[]; payloadEnvelopes: Map<Slot, PayloadEnvelopeInput> | null}, DownloadByRangeError>
  >;
  /** Report peer for negative actions. Decouples from the full network instance */
  reportPeer: (peer: PeerIdStr, action: PeerAction, actionName: string) => void;
  /** Gets current peer custodyColumns and earliestAvailableSlot */
  getConnectedPeerSyncMeta: (peerId: string) => PeerSyncMeta;
  /** Hook called when Chain state completes */
  onEnd: (err: Error | null, target: ChainTarget | null) => void;
  /** Deletes an array of BlockInputs from the BlockInputCache */
  pruneBlockInputs: (blockInputs: IBlockInput[]) => void;
};

/**
 * Sync this up to this target. Uses slot instead of epoch to re-use logic for finalized sync
 * and head sync. The root is used to uniquely identify this chain on different forks
 */
export type ChainTarget = {
  slot: Slot;
  root: Root;
};

export class SyncChainStartError extends Error {}

export type SyncChainDebugState = {
  targetRoot: string | null;
  targetSlot: number | null;
  syncType: RangeSyncType;
  status: SyncChainStatus;
  startEpoch: number;
  peers: number;
  batches: BatchMetadata[];
};

export enum SyncChainStatus {
  Stopped = "Stopped",
  Syncing = "Syncing",
  Done = "Done",
  Error = "Error",
}

// this global chain id is used to identify the chain over time, increase it every time a new chain is created
// a chain type could be Finalized or Head, so it should be appended with this id to make the log unique
let nextChainId = 0;

/**
 * Dynamic target sync chain. Peers with multiple targets but with the same syncType are added
 * through the `addPeer()` hook.
 *
 * A chain of blocks that need to be downloaded. Peers who claim to contain the target head
 * root are grouped into the peer pool and queried for batches when downloading the chain.
 */
export class SyncChain {
  /** Short string id to identify this SyncChain in logs */
  readonly logId: string;
  readonly syncType: RangeSyncType;
  /**
   * Should sync up until this slot, then stop.
   * Finalized SyncChains have a dynamic target, so if this chain has no peers the target can become null
   */
  target: ChainTarget;

  /** Number of validated epochs. For the SyncRange to prevent switching chains too fast */
  validatedEpochs = 0;

  readonly firstBatchEpoch: Epoch;
  /**
   * The start of the chain segment. Any epoch previous to this one has been validated.
   * Note: lastEpochWithProcessBlocks` signals the epoch at which 1 or more blocks have been processed
   * successfully. So that epoch itself may or may not be valid.
   */
  private lastEpochWithProcessBlocks: Epoch;
  private status = SyncChainStatus.Stopped;

  private readonly processChainSegment: SyncChainFns["processChainSegment"];
  private readonly downloadByRange: SyncChainFns["downloadByRange"];
  private readonly reportPeer: SyncChainFns["reportPeer"];
  private readonly getConnectedPeerSyncMeta: SyncChainFns["getConnectedPeerSyncMeta"];
  private readonly pruneBlockInputs: SyncChainFns["pruneBlockInputs"];

  /** AsyncIterable that guarantees processChainSegment is run only at once at anytime */
  private readonly batchProcessor = new ItTrigger();
  /** Sorted map of batches undergoing some kind of processing. */
  private readonly batches = new Map<Epoch, Batch>();
  /**
   * `true` until the first `Batch` is constructed via `includeNextBatch`
   */
  private isFirstBatch = true;
  private readonly peerset = new Map<PeerIdStr, ChainTarget>();
  /**
   * Tracks peers that have rate-limited us, mapped to the timestamp (ms) until which we should avoid them.
   * This is a sync-layer optimization to avoid assigning batches to backed-off peers.
   * The reqresp SelfRateLimiter independently enforces backoff at the protocol level as a safety net.
   */
  private readonly rateLimitedPeers = new Map<PeerIdStr, number>();
  private rateLimitBackoffTimeout: NodeJS.Timeout | undefined;

  private readonly logger: Logger;
  private readonly config: ChainForkConfig;
  private readonly clock: IClock;
  private readonly metrics: Metrics | null;
  private readonly custodyConfig: CustodyConfig;
  private readonly latestBid: gloas.ExecutionPayloadBid | undefined;

  constructor(
    initialBatchEpoch: Epoch,
    initialTarget: ChainTarget,
    syncType: RangeSyncType,
    fns: SyncChainFns,
    modules: SyncChainModules,
    latestBid: gloas.ExecutionPayloadBid | undefined
  ) {
    const {config, clock, custodyConfig, logger, metrics} = modules;
    this.firstBatchEpoch = initialBatchEpoch;
    this.lastEpochWithProcessBlocks = initialBatchEpoch;
    this.target = initialTarget;
    this.syncType = syncType;
    this.processChainSegment = fns.processChainSegment;
    this.downloadByRange = fns.downloadByRange;
    this.reportPeer = fns.reportPeer;
    this.pruneBlockInputs = fns.pruneBlockInputs;
    this.getConnectedPeerSyncMeta = fns.getConnectedPeerSyncMeta;
    this.config = config;
    this.clock = clock;
    this.metrics = metrics;
    this.custodyConfig = custodyConfig;
    this.latestBid = latestBid;
    this.logger = logger;
    this.logId = `${syncType}-${nextChainId++}`;

    if (metrics) {
      metrics.syncRange.headSyncPeers.addCollect(() => this.scrapeMetrics(metrics));
    }

    // Trigger event on parent class
    this.sync().then(
      () => fns.onEnd(null, this.target),
      (e) => fns.onEnd(e, null)
    );
  }

  /**
   * Start syncing a new chain or an old one with an existing peer list
   * In the same call, advance the chain if localFinalizedEpoch >
   */
  startSyncing(localFinalizedEpoch: Epoch): void {
    switch (this.status) {
      case SyncChainStatus.Stopped:
        break; // Ok, continue
      case SyncChainStatus.Syncing:
        return; // Skip, already started
      case SyncChainStatus.Error:
      case SyncChainStatus.Done:
        throw new SyncChainStartError(`Attempted to start an ended SyncChain ${this.status}`);
    }

    this.status = SyncChainStatus.Syncing;

    this.logger.debug("SyncChain startSyncing", {
      localFinalizedEpoch,
      lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks,
      targetSlot: this.target.slot,
    });

    // to avoid dropping local progress, we advance the chain with its batch boundaries.
    // get the aligned epoch that produces a batch containing the `localFinalizedEpoch`
    const lastEpochWithProcessBlocksAligned =
      this.lastEpochWithProcessBlocks +
      Math.floor((localFinalizedEpoch - this.lastEpochWithProcessBlocks) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH;
    this.advanceChain(lastEpochWithProcessBlocksAligned);

    // Potentially download new batches and process pending
    this.triggerBatchDownloader();
    this.triggerBatchProcessor();
  }

  /**
   * Temporarily stop the chain. Will prevent batches from being processed
   */
  stopSyncing(): void {
    this.status = SyncChainStatus.Stopped;
    this.clearRateLimitBackoffTimer();
    this.logger.debug("SyncChain stopSyncing", {id: this.logId});
  }

  /**
   * Permanently remove this chain. Throws the main AsyncIterable
   */
  remove(): void {
    this.logger.debug("SyncChain remove", {id: this.logId});
    this.clearRateLimitBackoffTimer();
    this.batchProcessor.end(new ErrorAborted("SyncChain"));
  }

  /**
   * Add peer to the chain and request batches if active
   */
  addPeer(peer: PeerIdStr, target: ChainTarget): void {
    this.peerset.set(peer, target);
    this.computeTarget();
    this.triggerBatchDownloader();
  }

  /**
   * Returns true if the peer existed and has been removed
   * NOTE: The RangeSync will take care of deleting the SyncChain if peers = 0
   */
  removePeer(peerId: PeerIdStr): boolean {
    const deleted = this.peerset.delete(peerId);
    this.rateLimitedPeers.delete(peerId);
    this.computeTarget();
    return deleted;
  }

  /**
   * Helper to print internal state for debugging when chain gets stuck
   */
  getBatchesState(): BatchMetadata[] {
    return toArr(this.batches).map((batch) => batch.getMetadata());
  }

  get lastValidatedSlot(): Slot {
    // Last epoch of the batch after the last one validated
    return getBatchSlotRange(this.lastEpochWithProcessBlocks + EPOCHS_PER_BATCH).startSlot - 1;
  }

  get isSyncing(): boolean {
    return this.status === SyncChainStatus.Syncing;
  }

  get isRemovable(): boolean {
    return this.status === SyncChainStatus.Error || this.status === SyncChainStatus.Done;
  }

  get peers(): number {
    return this.peerset.size;
  }

  getPeers(): PeerIdStr[] {
    return Array.from(this.peerset.keys());
  }

  /** Full debug state for lodestar API */
  getDebugState(): SyncChainDebugState {
    return {
      targetRoot: toRootHex(this.target.root),
      targetSlot: this.target.slot,
      syncType: this.syncType,
      status: this.status,
      startEpoch: this.lastEpochWithProcessBlocks,
      peers: this.peers,
      batches: this.getBatchesState(),
    };
  }

  private computeTarget(): void {
    if (this.peerset.size > 0) {
      const targets = Array.from(this.peerset.values());
      this.target = computeHighestTarget(targets);
    }
  }

  /**
   * Main Promise that handles the sync process. Will resolve when initial sync completes
   * i.e. when it successfully processes a epoch >= than this chain `targetEpoch`
   */
  private async sync(): Promise<void> {
    try {
      // Start processing batches on demand in strict sequence
      for await (const _ of this.batchProcessor) {
        if (this.status !== SyncChainStatus.Syncing) {
          continue;
        }

        // TODO: Consider running this check less often after the sync is well tested
        validateBatchesStatus(toArr(this.batches));

        // Returns true if SyncChain has processed all possible blocks with slot <= target.slot
        if (isSyncChainDone(toArr(this.batches), this.lastEpochWithProcessBlocks, this.target.slot)) {
          break;
        }

        // Processes the next batch if ready
        const batch = getNextBatchToProcess(toArr(this.batches));
        if (batch) await this.processBatch(batch);
      }

      this.status = SyncChainStatus.Done;
      this.logger.verbose("SyncChain Done", {id: this.logId});
    } catch (e) {
      if (e instanceof ErrorAborted) {
        return; // Ignore
      }

      for (const batch of this.batches.values()) {
        this.pruneBlockInputs(batch.getBlocks());
      }

      this.status = SyncChainStatus.Error;
      this.logger.verbose("SyncChain Error", {id: this.logId}, e as Error);

      // If a batch exceeds it's retry limit, maybe downscore peers.
      // shouldDownscoreOnBatchError() functions enforces that all BatchErrorCode values are covered
      if (e instanceof BatchError) {
        const shouldReportPeer = shouldReportPeerOnBatchError(e.type.code);
        if (shouldReportPeer) {
          for (const peer of this.peerset.keys()) {
            this.reportPeer(peer, shouldReportPeer.action, shouldReportPeer.reason);
          }
        }
      }

      throw e;
    } finally {
      this.clearRateLimitBackoffTimer();
    }
  }

  /**
   * Request to process batches if possible
   */
  private triggerBatchProcessor(): void {
    this.batchProcessor.trigger();
  }

  /**
   * Request to download batches if possible
   * Backlogs requests into a single pending request
   */
  private triggerBatchDownloader(): void {
    try {
      this.requestBatches();
    } catch (e) {
      // bubble the error up to the main async iterable loop
      this.batchProcessor.end(e as Error);
    }
  }

  private scheduleRateLimitBackoffRetry(): void {
    this.clearRateLimitBackoffTimer();

    if (this.status !== SyncChainStatus.Syncing || this.rateLimitedPeers.size === 0) {
      return;
    }

    const now = Date.now();
    let retryAt: number | null = null;
    for (const [peerId, rateLimitedUntil] of this.rateLimitedPeers.entries()) {
      if (rateLimitedUntil <= now) {
        this.rateLimitedPeers.delete(peerId);
        continue;
      }
      retryAt = Math.min(retryAt ?? rateLimitedUntil, rateLimitedUntil);
    }

    if (retryAt === null) {
      return;
    }

    this.rateLimitBackoffTimeout = setTimeout(
      () => {
        this.rateLimitBackoffTimeout = undefined;
        this.triggerBatchDownloader();
        this.scheduleRateLimitBackoffRetry();
      },
      Math.max(0, retryAt - now)
    );
  }

  private clearRateLimitBackoffTimer(): void {
    if (this.rateLimitBackoffTimeout !== undefined) {
      clearTimeout(this.rateLimitBackoffTimeout);
      this.rateLimitBackoffTimeout = undefined;
    }
  }

  /**
   * Attempts to request the next required batches from the peer pool if the chain is syncing.
   * It will exhaust the peer pool and left over batches until the batch buffer is reached.
   */
  private requestBatches(): void {
    if (this.status !== SyncChainStatus.Syncing) {
      return;
    }

    const now = Date.now();
    const peersSyncInfo: PeerSyncInfo[] = [];
    for (const [peerId, target] of this.peerset.entries()) {
      // Skip peers that are currently in rate-limit backoff
      const rateLimitedUntil = this.rateLimitedPeers.get(peerId);
      if (rateLimitedUntil !== undefined) {
        if (now < rateLimitedUntil) {
          continue;
        }
        this.rateLimitedPeers.delete(peerId);
      }

      try {
        peersSyncInfo.push({...this.getConnectedPeerSyncMeta(peerId), target});
      } catch (e) {
        this.logger.debug("Failed to get peer sync meta", {peerId}, e as Error);
      }
    }

    const peerBalancer = new ChainPeersBalancer(peersSyncInfo, toArr(this.batches), this.custodyConfig, this.syncType);

    // Retry download of existing batches
    for (const batch of this.batches.values()) {
      if (batch.state.status !== BatchStatus.AwaitingDownload) {
        continue;
      }

      const peer = peerBalancer.bestPeerToRetryBatch(batch);
      if (peer) {
        void this.sendBatch(batch, peer);
      }
    }

    // find the next pending batch and request it from the peer
    let batch = this.includeNextBatch();
    while (batch != null) {
      const peer = peerBalancer.idlePeerForBatch(batch);
      if (!peer) {
        // if there is no peer available, we stop requesting batches because next batches will have greater startEpoch with the same sampling groups
        break;
      }
      void this.sendBatch(batch, peer);
      batch = this.includeNextBatch();
    }
  }

  /**
   * Creates the next required batch from the chain. If there are no more batches required, returns `null`.
   */
  private includeNextBatch(): Batch | null {
    const batches = toArr(this.batches);

    // Only request batches up to the buffer size limit
    // Note: Don't count batches in the AwaitingValidation state, to prevent stalling sync
    // if the current processing window is contained in a long range of skip slots.
    const batchesInBuffer = batches.filter((batch) => {
      return batch.state.status === BatchStatus.Downloading || batch.state.status === BatchStatus.AwaitingProcessing;
    });
    if (batchesInBuffer.length > BATCH_BUFFER_SIZE) {
      return null;
    }

    // if last processed epoch is n, we don't want to request batches with epoch > n + MAX_LOOK_AHEAD_EPOCHS
    // we should have enough batches to process in the buffer: n + 1, ..., n + MAX_LOOK_AHEAD_EPOCHS
    // let's focus on redownloading these batches first because it may have to reach different peers to get enough sampled columns
    if (
      batches.length > 0 &&
      Math.max(...batches.map((b) => b.startEpoch)) >= this.lastEpochWithProcessBlocks + MAX_LOOK_AHEAD_EPOCHS
    ) {
      return null;
    }

    // This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch
    const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks);

    // Don't request batches beyond the target head slot. The to-be-downloaded batch must be strictly after target.slot
    if (batchStartEpochIsAfterSlot(startEpoch, this.target.slot)) {
      return null;
    }

    if (this.batches.has(startEpoch)) {
      this.logger.error("Attempting to add existing Batch to SyncChain", {id: this.logId, startEpoch});
      return null;
    }

    const batch = new Batch(
      startEpoch,
      this.config,
      this.clock,
      this.custodyConfig,
      this.isFirstBatch,
      // `latestBid` is only meaningful for the first batch's parent-payload check
      this.isFirstBatch ? this.latestBid : undefined,
      this.target.slot
    );
    this.isFirstBatch = false;
    this.batches.set(startEpoch, batch);
    return batch;
  }

  /**
   * Requests the batch assigned to the given id from a given peer.
   */
  private async sendBatch(batch: Batch, peer: PeerSyncMeta): Promise<void> {
    this.logger.verbose("Downloading batch", {
      id: this.logId,
      ...batch.getMetadata(),
      peer: prettyPrintPeerIdStr(peer.peerId),
    });
    try {
      batch.startDownloading(peer);

      // wrapError ensures to never call both batch success() and batch error()
      const res = await wrapError(this.downloadByRange(peer, batch, this.syncType));

      if (res.err) {
        // There's several known error cases where we want to take action on the peer
        const errCode = (res.err as LodestarError<{code: string}>).type?.code;
        this.metrics?.syncRange.downloadByRange.error.inc({client: peer.client, code: errCode ?? "UNKNOWN"});
        if (this.syncType === RangeSyncType.Finalized) {
          // For finalized sync, we are stricter with peers as there is no ambiguity about which chain we're syncing.
          // The below cases indicate the peer may be on a different chain, so are not penalized during head sync.
          switch (errCode) {
            case BlockInputErrorCode.MISMATCHED_ROOT_HEX:
            case DownloadByRangeErrorCode.MISSING_BLOBS:
            case DownloadByRangeErrorCode.EXTRA_BLOBS:
            case DownloadByRangeErrorCode.MISSING_COLUMNS:
            case DownloadByRangeErrorCode.EXTRA_COLUMNS:
            case BlobSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
            case BlobSidecarErrorCode.INCORRECT_BLOCK:
            case DataColumnSidecarErrorCode.INCORRECT_SIDECAR_COUNT:
            case DataColumnSidecarErrorCode.INCORRECT_BLOCK:
              this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
          }
        }
        switch (errCode) {
          case DownloadByRangeErrorCode.EXTRA_BLOCKS:
          case DownloadByRangeErrorCode.OUT_OF_ORDER_BLOCKS:
          case DownloadByRangeErrorCode.OUT_OF_RANGE_BLOCKS:
          case DownloadByRangeErrorCode.PARENT_ROOT_MISMATCH:
          case DownloadByRangeErrorCode.INVALID_ENVELOPE_BEACON_BLOCK_ROOT:
          case DownloadByRangeErrorCode.INVALID_CHAIN_SEGMENT:
          case BlobSidecarErrorCode.INCLUSION_PROOF_INVALID:
          case BlobSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
          case DataColumnSidecarErrorCode.INCORRECT_KZG_COMMITMENTS_COUNT:
          case DataColumnSidecarErrorCode.INCORRECT_KZG_PROOF_COUNT:
          case DataColumnSidecarErrorCode.INVALID_KZG_PROOF_BATCH:
          case DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID:
            this.reportPeer(peer.peerId, PeerAction.LowToleranceError, res.err.message);
        }
        this.logger.verbose(
          "Batch download error",
          {id: this.logId, ...batch.getMetadata(), peer: prettyPrintPeerIdStr(peer.peerId)},
          res.err
        );
        const rateLimitedUntilMs = getRateLimitedUntilMs(res.err);
        if (rateLimitedUntilMs !== null) {
          // Peer rate-limited us — don't count as a failed download attempt and mark peer for backoff
          this.rateLimitedPeers.set(peer.peerId, rateLimitedUntilMs);
          this.scheduleRateLimitBackoffRetry();
          batch.downloadingRateLimited();
          this.triggerBatchDownloader();
        } else {
          batch.downloadingError(peer.peerId); // Throws after MAX_DOWNLOAD_ATTEMPTS
        }
      } else {
        this.logger.verbose("Batch download success", {
          id: this.logId,
          ...batch.getMetadata(),
          peer: prettyPrintPeerIdStr(peer.peerId),
        });
        this.metrics?.syncRange.downloadByRange.success.inc();
        const {warnings, result} = res.result;
        const {blocks: downloadedBlocks, payloadEnvelopes} = result;
        const downloadSuccessOutput = batch.downloadingSuccess(peer.peerId, downloadedBlocks, payloadEnvelopes);
        const logMeta: Record<string, number> = {
          blockCount: downloadSuccessOutput.blocks.length,
        };

        if (warnings && warnings.length > 0) {
          for (const warning of warnings) {
            this.metrics?.syncRange.downloadByRange.warn.inc({client: peer.client, code: warning.type.code});
            this.logger.debug(
              "Batch downloaded with warning",
              {id: this.logId, ...batch.getMetadata(), ...logMeta, peer: prettyPrintPeerIdStr(peer.peerId)},
              warning
            );
          }
        }

        for (const block of downloadSuccessOutput.blocks) {
          if (isBlockInputBlobs(block)) {
            const blockLogMeta = block.getLogMeta();
            const expectedBlobs = typeof blockLogMeta.expectedBlobs === "number" ? blockLogMeta.expectedBlobs : 0;
            logMeta.expectedBlobCount = (logMeta.expectedBlobCount ?? 0) + expectedBlobs;
            logMeta.receivedBlobCount = (logMeta.receivedBlobCount ?? 0) + blockLogMeta.receivedBlobs;
          } else if (isBlockInputColumns(block)) {
            logMeta.columnCount = (logMeta.columnCount ?? 0) + block.getLogMeta().receivedColumns;
          }
        }

        let logMessage: string;
        if (downloadSuccessOutput.status === BatchStatus.AwaitingProcessing) {
          logMessage = "Finished downloading batch by range";
          this.triggerBatchProcessor();
        } else {
          logMessage = "Partially downloaded batch by range. Attempting another round of downloads";
          // the flow will continue to call triggerBatchDownloader() below
        }

        const blockSlots = downloadSuccessOutput.blocks.map((b) => b.slot);
        const envelopeSlots = downloadSuccessOutput.payloadEnvelopes
          ? Array.from(downloadSuccessOutput.payloadEnvelopes.keys())
          : null;

        this.logger.debug(logMessage, {
          id: this.logId,
          ...batch.getMetadata(),
          ...logMeta,
          blockSlots: prettyPrintIndices(blockSlots),
          ...(envelopeSlots ? {envelopeSlots: prettyPrintIndices(envelopeSlots)} : {}),
          peer: prettyPrintPeerIdStr(peer.peerId),
        });
      }

      // Preemptively request more blocks from peers whilst we process current blocks
      //
      // TODO(fulu): why is this second call here.  should fall through to the one below the catch block. commenting
      //      for now and will resolve during PR process
      // this.triggerBatchDownloader();
    } catch (e) {
      // bubble the error up to the main async iterable loop
      this.batchProcessor.end(e as Error);
    }

    // Preemptively request more blocks from peers whilst we process current blocks
    this.triggerBatchDownloader();
  }

  /**
   * Sends `batch` to the processor. Note: batch may be empty
   */
  private async processBatch(batch: Batch): Promise<void> {
    const {blocks, payloadEnvelopes, peers} = batch.startProcessing();

    const logCtx = {
      id: this.logId,
      ...batch.getMetadata(),
      blockCount: blocks.length,
      blockSlots: prettyPrintIndices(blocks.map((b) => b.slot)),
      ...(payloadEnvelopes ? {envelopeSlots: prettyPrintIndices(Array.from(payloadEnvelopes.keys()))} : {}),
      peers: peers.map(prettyPrintPeerIdStr).join(","),
    };
    this.logger.verbose("Processing batch", logCtx);

    // wrapError ensures to never call both batch success() and batch error()
    const res = await wrapError(this.processChainSegment(blocks, payloadEnvelopes, this.syncType));

    if (!res.err) {
      batch.processingSuccess();
      this.logger.verbose("Processed batch", {...logCtx, ...batch.getMetadata()});

      // If the processed batch is not empty, validate previous AwaitingValidation blocks.
      if (blocks.length > 0) {
        this.advanceChain(batch.startEpoch);
      }

      // Potentially process next AwaitingProcessing batch
      this.triggerBatchProcessor();
    } else {
      this.logger.verbose("Batch process error", logCtx, res.err);
      batch.processingError(res.err); // Throws after MAX_BATCH_PROCESSING_ATTEMPTS

      // At least one block was successfully verified and imported, so we can be sure all
      // previous batches are valid and we only need to download the current failed batch.
      // TODO: Disabled for now
      // if (res.err instanceof ChainSegmentError && res.err.importedBlocks > 0) {
      //   this.advanceChain(batch.startEpoch);
      // }

      // The current batch could not be processed, so either this or previous batches are invalid.
      // All previous batches (AwaitingValidation) are potentially faulty and marked for retry.
      // Progress will be drop back to `this.startEpoch`
      for (const pendingBatch of this.batches.values()) {
        if (pendingBatch.startEpoch < batch.startEpoch) {
          this.logger.verbose("Batch validation error", {id: this.logId, ...pendingBatch.getMetadata()});
          pendingBatch.validationError(res.err); // Throws after MAX_BATCH_PROCESSING_ATTEMPTS
        }
      }
    }

    // A batch is no longer in Processing status, queue has an empty spot to download next batch
    this.triggerBatchDownloader();
  }

  /**
   * Drops any batches previous to `newLatestValidatedEpoch` and updates the chain boundaries
   */
  private advanceChain(newLastEpochWithProcessBlocks: Epoch): void {
    // make sure this epoch produces an advancement
    if (newLastEpochWithProcessBlocks <= this.lastEpochWithProcessBlocks) {
      return;
    }

    for (const [batchKey, batch] of this.batches.entries()) {
      if (batch.startEpoch < newLastEpochWithProcessBlocks) {
        this.batches.delete(batchKey);
        this.validatedEpochs += EPOCHS_PER_BATCH;

        // The last batch attempt is right, all others are wrong. Penalize other peers
        const attemptOk = batch.validationSuccess();
        for (const attempt of batch.failedProcessingAttempts) {
          if (attempt.hash !== attemptOk.hash) {
            for (const badAttemptPeer of attempt.peers) {
              if (attemptOk.peers.find((goodPeer) => goodPeer === badAttemptPeer)) {
                // The same peer corrected its previous attempt
                this.reportPeer(badAttemptPeer, PeerAction.MidToleranceError, "SyncChainInvalidBatchSelf");
              } else {
                // A different peer sent an bad batch
                this.reportPeer(badAttemptPeer, PeerAction.LowToleranceError, "SyncChainInvalidBatchOther");
              }
            }
          }
        }
      }
    }

    this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks;
    this.logger.verbose("Advanced chain", {
      id: this.logId,
      lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks,
    });
  }

  private scrapeMetrics(metrics: Metrics): void {
    const syncPeersMetric =
      this.syncType === RangeSyncType.Finalized
        ? metrics.syncRange.finalizedSyncPeers
        : metrics.syncRange.headSyncPeers;

    const peersSyncMeta = new Map<PeerIdStr, PeerSyncMeta>();
    for (const peerId of this.peerset.keys()) {
      try {
        peersSyncMeta.set(peerId, this.getConnectedPeerSyncMeta(peerId));
      } catch (_) {
        // ignore for metric as peer could be disconnected
      }
    }

    const peersByColumnIndex = new Map<number, number>();
    for (const [columnIndex, column] of this.custodyConfig.sampledColumns.entries()) {
      for (const {custodyColumns} of peersSyncMeta.values()) {
        if (custodyColumns.includes(column)) {
          peersByColumnIndex.set(columnIndex, (peersByColumnIndex.get(columnIndex) ?? 0) + 1);
        }
      }
    }

    for (let columnIndex = 0; columnIndex < this.custodyConfig.sampledColumns.length; columnIndex++) {
      const peerCount = peersByColumnIndex.get(columnIndex) ?? 0;
      syncPeersMetric.set({columnIndex}, peerCount);
    }
  }
}

/**
 * Enforces that a report peer action is defined for all BatchErrorCode exhaustively.
 * If peer should not be downscored, returns null.
 */
export function shouldReportPeerOnBatchError(
  code: BatchErrorCode
): {action: PeerAction.LowToleranceError; reason: string} | null {
  switch (code) {
    // A batch could not be processed after max retry limit. It's likely that all peers
    // in this chain are sending invalid batches repeatedly so are either malicious or faulty.
    // We drop the chain and report all peers.
    // There are some edge cases with forks that could cause this situation, but it's unlikely.
    case BatchErrorCode.MAX_PROCESSING_ATTEMPTS:
      return {action: PeerAction.LowToleranceError, reason: "SyncChainMaxProcessingAttempts"};

    // TODO: Should peers be reported for MAX_DOWNLOAD_ATTEMPTS?
    case BatchErrorCode.MAX_DOWNLOAD_ATTEMPTS:
    case BatchErrorCode.INVALID_COUNT:
    case BatchErrorCode.WRONG_STATUS:
    case BatchErrorCode.MAX_EXECUTION_ENGINE_ERROR_ATTEMPTS:
      return null;
  }
}
