import {EventEmitter} from "node:events";
import {StrictEventEmitter} from "strict-event-emitter-types";
import {BeaconConfig} from "@lodestar/config";
import {IBeaconStateViewGloas, computeStartSlotAtEpoch, isStatePostGloas} from "@lodestar/state-transition";
import {Epoch, Status, fulu} from "@lodestar/types";
import {Logger, prettyPrintIndices, toRootHex} from "@lodestar/utils";
import {IBlockInput} from "../../chain/blocks/blockInput/types.js";
import {AttestationImportOpt, ImportBlockOpts} from "../../chain/blocks/index.js";
import {assertLinearChainSegment} from "../../chain/blocks/utils/chainSegment.js";
import {BlockError} from "../../chain/errors/index.js";
import {IBeaconChain} from "../../chain/index.js";
import {Metrics} from "../../metrics/index.js";
import {INetwork} from "../../network/index.js";
import {PeerIdStr} from "../../util/peerId.js";
import {
  DownloadByRangeError,
  DownloadByRangeErrorCode,
  cacheByRangeResponses,
  downloadByRange,
} from "../utils/downloadByRange.js";
import {RangeSyncType, getRangeSyncTarget, rangeSyncTypes} from "../utils/remoteSyncType.js";
import {ChainTarget, SyncChain, SyncChainDebugState, SyncChainFns} from "./chain.js";
import {updateChains} from "./utils/index.js";

export enum RangeSyncEvent {
  completedChain = "RangeSync-completedChain",
}

type RangeSyncEvents = {
  [RangeSyncEvent.completedChain]: () => void;
};

type RangeSyncEmitter = StrictEventEmitter<EventEmitter, RangeSyncEvents>;

export enum RangeSyncStatus {
  /** A finalized chain is being synced */
  Finalized,
  /** There are no finalized chains and we are syncing one more head chains */
  Head,
  /** There are no head or finalized chains and no long range sync is in progress */
  Idle,
}

type RangeSyncState =
  | {status: RangeSyncStatus.Finalized; target: ChainTarget}
  | {status: RangeSyncStatus.Head; targets: ChainTarget[]}
  | {status: RangeSyncStatus.Idle};

export type RangeSyncModules = {
  chain: IBeaconChain;
  network: INetwork;
  metrics: Metrics | null;
  config: BeaconConfig;
  logger: Logger;
};

export type RangeSyncOpts = {
  disableProcessAsChainSegment?: boolean;
};

/**
 * RangeSync groups peers by their `status` into static target `SyncChain` instances
 * Peers on each chain will be queried for batches until reaching their target.
 *
 * Not all SyncChain-s will sync at once, and are grouped by sync type:
 * - Finalized Chain Sync
 * - Head Chain Sync
 *
 * ### Finalized Chain Sync
 *
 * At least one peer's status finalized checkpoint is greater than ours. Then we'll form
 * a chain starting from our finalized epoch and sync up to their finalized checkpoint.
 * - Only one finalized chain can sync at a time
 * - The finalized chain with the largest peer pool takes priority
 * - As peers' status progresses we will switch to a SyncChain with a better target
 *
 * ### Head Chain Sync
 *
 * If no Finalized Chain Sync is active, and the peer's STATUS head is beyond
 * `SLOT_IMPORT_TOLERANCE`, then we'll form a chain starting from our finalized epoch and sync
 * up to their head.
 * - More than one head chain can sync in parallel
 * - If there are many head chains the ones with more peers take priority
 */
export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
  private readonly chain: IBeaconChain;
  private readonly network: INetwork;
  private readonly metrics: Metrics | null;
  private readonly config: BeaconConfig;
  private readonly logger: Logger;
  /** There is a single chain per type, 1 finalized sync, 1 head sync */
  private readonly chains = new Map<RangeSyncType, SyncChain>();

  private opts?: RangeSyncOpts;

  constructor(modules: RangeSyncModules, opts?: RangeSyncOpts) {
    super();
    const {chain, network, metrics, config, logger} = modules;
    this.chain = chain;
    this.network = network;
    this.metrics = metrics;
    this.config = config;
    this.logger = logger;
    this.opts = opts;

    if (metrics) {
      metrics.syncStatus.addCollect(() => this.scrapeMetrics(metrics));
    }
  }

  /** Throw / return all AsyncGenerators inside every SyncChain instance */
  close(): void {
    for (const chain of this.chains.values()) {
      chain.remove();
    }
  }

  /**
   * A peer with a relevant STATUS message has been found, which also is advanced from us.
   * Add this peer to an existing chain or create a new one. The update the chains status.
   */
  addPeer(peerId: PeerIdStr, localStatus: Status, peerStatus: Status): void {
    // Compute if we should do a Finalized or Head sync with this peer
    const {syncType, startEpoch, target} = getRangeSyncTarget(localStatus, peerStatus, this.chain);
    this.logger.debug("Sync peer joined", {
      peer: peerId,
      syncType,
      startEpoch,
      targetSlot: target.slot,
      targetRoot: toRootHex(target.root),
      localHeadSlot: localStatus.headSlot,
      earliestAvailableSlot: (peerStatus as fulu.Status).earliestAvailableSlot ?? Infinity,
    });

    // If the peer existed in any other chain, remove it.
    // re-status'd peers can exist in multiple finalized chains, only one sync at a time
    if (syncType === RangeSyncType.Head) {
      this.removePeer(peerId);
    }

    this.addPeerOrCreateChain(startEpoch, target, peerId, syncType);
    this.update(localStatus.finalizedEpoch);
  }

  /**
   * Remove this peer from all head and finalized chains. A chain may become peer-empty and be dropped
   */
  removePeer(peerId: PeerIdStr): void {
    for (const syncChain of this.chains.values()) {
      syncChain.removePeer(peerId);
    }
  }

  /**
   * Compute the current RangeSync state, not cached
   */
  get state(): RangeSyncState {
    const syncingHeadTargets: ChainTarget[] = [];
    for (const chain of this.chains.values()) {
      if (chain.isSyncing) {
        if (chain.syncType === RangeSyncType.Finalized) {
          return {status: RangeSyncStatus.Finalized, target: chain.target};
        }
        syncingHeadTargets.push(chain.target);
      }
    }

    if (syncingHeadTargets.length > 0) {
      return {status: RangeSyncStatus.Head, targets: syncingHeadTargets};
    }
    return {status: RangeSyncStatus.Idle};
  }

  /** Full debug state for lodestar API */
  getSyncChainsDebugState(): SyncChainDebugState[] {
    return Array.from(this.chains.values())
      .map((syncChain) => syncChain.getDebugState())
      .reverse(); // Newest additions first
  }

  /** Convenience method for `SyncChain` */
  private processChainSegment: SyncChainFns["processChainSegment"] = async (blocks, payloadEnvelopes, syncType) => {
    // Not trusted, verify signatures
    const flags: ImportBlockOpts = {
      // Only skip importing attestations for finalized sync. For head sync attestation are valuable.
      // Importing attestations also triggers a head update, see https://github.com/ChainSafe/lodestar/issues/3804
      // TODO: Review if this is okay, can we prevent some attacks by importing attestations?
      importAttestations: syncType === RangeSyncType.Finalized ? AttestationImportOpt.Skip : undefined,
      // Ignores ALREADY_KNOWN or GENESIS_BLOCK errors, and continues with the next block in chain segment
      ignoreIfKnown: true,
      // Ignore WOULD_REVERT_FINALIZED_SLOT error, continue with the next block in chain segment
      ignoreIfFinalized: true,
      // We won't attest to this block so it's okay to ignore a SYNCING message from execution layer
      fromRangeSync: true,
      // when this runs, syncing is the most important thing and gossip is not likely to run
      // so we can utilize worker threads to verify signatures
      blsVerifyOnMainThread: false,
    };

    if (this.opts?.disableProcessAsChainSegment) {
      // Should only be used for debugging or testing
      for (const block of blocks) {
        await this.chain.processBlock(block, flags);
        const payloadEnvelope = payloadEnvelopes?.get(block.slot);
        if (payloadEnvelope) {
          await this.chain.processExecutionPayload(payloadEnvelope);
        }
      }
    } else {
      await this.chain.processChainSegment(blocks, payloadEnvelopes, flags);
    }
  };

  private downloadByRange: SyncChainFns["downloadByRange"] = async (peer, batch) => {
    const batchBlocks = batch.getBlocks();
    const requests = batch.getRequestsForPeer(peer);
    const parentRoot = requests.parentPayloadRequest?.envelopeBlockRoot ?? requests.parentPayloadRequest?.blockRoot;
    const parentPayloadCommitments = parentRoot ? batch.getParentPayloadCommitments(parentRoot) : undefined;
    const {result, warnings} = await downloadByRange({
      config: this.config,
      network: this.network,
      logger: this.logger,
      peerIdStr: peer.peerId,
      batchBlocks,
      parentPayloadCommitments,
      peerDasMetrics: this.chain.metrics?.peerDas,
      ...requests,
    });
    const {responses, payloadEnvelopes: downloadedPayloadEnvelopes} = result;
    const {blocks, payloadEnvelopes} = cacheByRangeResponses({
      cache: this.chain.seenBlockInputCache,
      seenPayloadEnvelopeInputCache: this.chain.seenPayloadEnvelopeInputCache,
      peerIdStr: peer.peerId,
      responses,
      batchBlocks,
      downloadedPayloadEnvelopes,
      existingPayloadEnvelopes: batch.getPayloadEnvelopes(),
      custodyConfig: this.chain.custodyConfig,
      seenTimestampSec: Date.now() / 1000,
    });

    const segmentBlocks = blocks.filter((b) => b.hasBlock()).sort((a, b) => a.slot - b.slot);
    const envelopeSlots = payloadEnvelopes
      ? Array.from(payloadEnvelopes.entries())
          .filter(([, pi]) => pi.hasPayloadEnvelope())
          .map(([slot]) => slot)
          .sort((a, b) => a - b)
      : [];
    this.logger.verbose("downloadByRange batch ready", {
      peer: peer.peerId,
      blockSlots: prettyPrintIndices(segmentBlocks.map((b) => b.slot)),
      envelopeSlots: prettyPrintIndices(envelopeSlots),
      ...batch.getMetadata(),
    });

    if (segmentBlocks.length > 1) {
      try {
        assertLinearChainSegment(this.config, segmentBlocks, payloadEnvelopes, null);
      } catch (err) {
        if (err instanceof BlockError) {
          this.logger.debug(
            "downloadByRange segment validation failed",
            {
              peer: peer.peerId,
              reason: err.type.code,
              slot: err.signedBlock.message.slot,
              detail: JSON.stringify(err.type),
              ...batch.getMetadata(),
            },
            err
          );
          // with this error, the peer will be penalized inside SyncChain
          throw new DownloadByRangeError({
            code: DownloadByRangeErrorCode.INVALID_CHAIN_SEGMENT,
            slot: err.signedBlock.message.slot,
            reason: err.type.code,
          });
        }

        throw err;
      }
    }

    return {result: {blocks, payloadEnvelopes}, warnings};
  };

  private pruneBlockInputs: SyncChainFns["pruneBlockInputs"] = (blocks: IBlockInput[]) => {
    for (const block of blocks) {
      this.chain.seenBlockInputCache.prune(block.blockRootHex);
    }
  };

  /** Convenience method for `SyncChain` */
  private reportPeer: SyncChainFns["reportPeer"] = (peer, action, actionName) => {
    this.network.reportPeer(peer, action, actionName);
  };

  private getConnectedPeerSyncMeta: SyncChainFns["getConnectedPeerSyncMeta"] = (peerId) => {
    return this.network.getConnectedPeerSyncMeta(peerId);
  };

  /** Convenience method for `SyncChain` */
  private onSyncChainEnd: SyncChainFns["onEnd"] = (err, target) => {
    this.update(this.chain.forkChoice.getFinalizedCheckpoint().epoch);
    this.emit(RangeSyncEvent.completedChain);

    if (err === null && target !== null) {
      this.metrics?.syncRange.syncChainHighestTargetSlotCompleted.set(target.slot);
    }
  };

  private addPeerOrCreateChain(startEpoch: Epoch, target: ChainTarget, peer: PeerIdStr, syncType: RangeSyncType): void {
    let syncChain = this.chains.get(syncType);
    if (!syncChain) {
      // The first batch of a new sync chain may need to detect whether the parent block was an
      // gloas "empty" block (no envelope produced). It does so by comparing the first
      // downloaded block's `bid.parentBlockHash` against the head state's `latestExecutionPayloadBid.blockHash`.
      const headState = this.chain.getHeadState();
      const latestBid = isStatePostGloas(headState)
        ? (headState as IBeaconStateViewGloas).latestExecutionPayloadBid
        : undefined;

      syncChain = new SyncChain(
        startEpoch,
        target,
        syncType,
        {
          processChainSegment: this.processChainSegment,
          downloadByRange: this.downloadByRange,
          reportPeer: this.reportPeer,
          getConnectedPeerSyncMeta: this.getConnectedPeerSyncMeta,
          pruneBlockInputs: this.pruneBlockInputs,
          onEnd: this.onSyncChainEnd,
        },
        {
          config: this.config,
          clock: this.chain.clock,
          logger: this.logger,
          custodyConfig: this.chain.custodyConfig,
          metrics: this.metrics,
        },
        latestBid
      );
      this.chains.set(syncType, syncChain);

      this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "add"});
      this.logger.debug("SyncChain added", {
        syncType,
        firstEpoch: syncChain.firstBatchEpoch,
        targetSlot: syncChain.target.slot,
        targetRoot: toRootHex(syncChain.target.root),
        peer,
      });
    }

    syncChain.addPeer(peer, target);
  }

  private update(localFinalizedEpoch: Epoch): void {
    const localFinalizedSlot = computeStartSlotAtEpoch(localFinalizedEpoch);

    // Remove chains that are out-dated, peer-empty, completed or failed
    for (const [id, syncChain] of this.chains.entries()) {
      // Checks if a Finalized or Head chain should be removed
      if (
        // Sync chain has completed syncing or encountered an error
        syncChain.isRemovable ||
        // Sync chain has no more peers to download from
        syncChain.peers === 0 ||
        // Outdated: our chain has progressed beyond this sync chain
        syncChain.target.slot < localFinalizedSlot ||
        this.chain.forkChoice.hasBlock(syncChain.target.root)
      ) {
        syncChain.remove();
        this.chains.delete(id);

        this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "remove"});
        this.logger.debug("SyncChain removed", {
          id: syncChain.logId,
          localFinalizedSlot,
          lastValidatedSlot: syncChain.lastValidatedSlot,
          firstEpoch: syncChain.firstBatchEpoch,
          targetSlot: syncChain.target.slot,
          targetRoot: toRootHex(syncChain.target.root),
          validatedEpochs: syncChain.validatedEpochs,
        });

        // Re-status peers from successful chain. Potentially trigger a Head sync
        this.network
          .reStatusPeers(syncChain.getPeers())
          .catch((e) => this.logger.error("Error resyncing peers", {}, e));
      }
    }

    const {toStop, toStart} = updateChains(Array.from(this.chains.values()));

    for (const syncChain of toStop) {
      syncChain.stopSyncing();
      if (syncChain.isSyncing) {
        this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "stop"});
      }
    }

    for (const syncChain of toStart) {
      syncChain.startSyncing(localFinalizedEpoch);
      if (!syncChain.isSyncing) {
        this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "start"});
      }
    }
  }

  private scrapeMetrics(metrics: Metrics): void {
    metrics.syncRange.syncChainsPeers.reset();
    const syncChainsByType: Record<RangeSyncType, number> = {
      [RangeSyncType.Finalized]: 0,
      [RangeSyncType.Head]: 0,
    };

    for (const chain of this.chains.values()) {
      metrics.syncRange.syncChainsPeers.observe({syncType: chain.syncType}, chain.peers);
      syncChainsByType[chain.syncType]++;
    }

    for (const syncType of rangeSyncTypes) {
      metrics.syncRange.syncChains.set({syncType}, syncChainsByType[syncType]);
    }
  }
}
