import {PeerId} from "@libp2p/interface";
import {Libp2p} from "libp2p";
import {BeaconConfig, ForkBoundary} from "@lodestar/config";
import {ForkName, ForkSeq, GENESIS_EPOCH} from "@lodestar/params";
import {
  Encoding,
  ProtocolDescriptor,
  ProtocolHandler,
  ReqResp,
  ReqRespOpts,
  ReqRespRequest,
  RequestError,
  ResponseIncoming,
  ResponseOutgoing,
} from "@lodestar/reqresp";
import {Metadata, Status, phase0, ssz} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {INetworkEventBus, NetworkEvent} from "../events.js";
import {MetadataController} from "../metadata.js";
import {ClientKind} from "../peers/client.js";
import {PeersData} from "../peers/peersData.js";
import {IPeerRpcScoreStore, PeerAction} from "../peers/score/index.js";
import {StatusCache} from "../statusCache.js";
import * as protocols from "./protocols.js";
import {onOutgoingReqRespError} from "./score.js";
import {
  GetReqRespHandlerFn,
  ProtocolNoHandler,
  ReqRespMethod,
  RequestTypedContainer,
  Version,
  requestSszTypeByMethod,
  responseSszTypeByMethod,
} from "./types.js";
import {collectExactOneTyped} from "./utils/collect.js";

export {getReqRespHandlers} from "./handlers/index.js";
export {ReqRespMethod, type RequestTypedContainer} from "./types.js";

export interface ReqRespBeaconNodeModules {
  libp2p: Libp2p;
  peersData: PeersData;
  logger: Logger;
  config: BeaconConfig;
  metrics: NetworkCoreMetrics | null;
  metadata: MetadataController;
  peerRpcScores: IPeerRpcScoreStore;
  events: INetworkEventBus;
  statusCache: StatusCache;
  getHandler: GetReqRespHandlerFn;
}

export type ReqRespBeaconNodeOpts = ReqRespOpts & {disableLightClientServer?: boolean};

/**
 * Implementation of Ethereum Consensus p2p Req/Resp domain.
 * For the spec that this code is based on, see:
 * https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-reqresp-domain
 * https://github.com/ethereum/consensus-specs/blob/v1.6.1/specs/altair/light-client/p2p-interface.md#the-reqresp-domain
 */
export class ReqRespBeaconNode extends ReqResp {
  private readonly metadataController: MetadataController;
  private readonly peerRpcScores: IPeerRpcScoreStore;
  private readonly networkEventBus: INetworkEventBus;
  private readonly peersData: PeersData;
  private readonly statusCache: StatusCache;
  private readonly getHandler: GetReqRespHandlerFn;

  /** Track registered fork to only send to known protocols */
  private currentRegisteredFork: ForkSeq = ForkSeq.phase0;

  private readonly config: BeaconConfig;
  protected readonly logger: Logger;
  protected readonly disableLightClientServer: boolean;

  constructor(modules: ReqRespBeaconNodeModules, options: ReqRespBeaconNodeOpts = {}) {
    const {events, peersData, peerRpcScores, metadata, metrics, logger} = modules;

    super(
      {
        ...modules,
        metricsRegister: metrics?.register ?? null,
      },
      {
        ...options,
        onRateLimit(peerId, method) {
          logger.debug("Do not serve request due to rate limit", {peerId: peerId.toString()});
          peerRpcScores.applyAction(peerId, PeerAction.Fatal, "rate_limit_rpc");
          metrics?.reqResp.rateLimitErrors.inc({method});
        },
        getPeerLogMetadata(peerId) {
          // this logs the whole agent version for unknown client which is good for debugging
          return peersData.getPeerKind(peerId) ?? peersData.getAgentVersion(peerId);
        },
      }
    );

    this.disableLightClientServer = options.disableLightClientServer ?? false;
    this.peerRpcScores = peerRpcScores;
    this.peersData = peersData;
    this.config = modules.config;
    this.logger = logger;
    this.metadataController = metadata;
    this.networkEventBus = events;
    this.statusCache = modules.statusCache;
    this.getHandler = modules.getHandler;
  }

  async start(): Promise<void> {
    await super.start();
  }

  async stop(): Promise<void> {
    await super.stop();
  }

  // NOTE: Do not pruneOnPeerDisconnect. Persist peer rate limit data until pruned by time
  // pruneOnPeerDisconnect(peerId: PeerId): void {
  //   this.rateLimiter.prune(peerId);

  registerProtocolsAtBoundary(boundary: ForkBoundary): void {
    this.currentRegisteredFork = ForkSeq[boundary.fork];

    const mustSubscribeProtocols = this.getProtocolsAtBoundary(boundary);
    const mustSubscribeProtocolIDs = new Set(
      mustSubscribeProtocols.map(([protocol]) => this.formatProtocolID(protocol))
    );

    // Un-subscribe not required protocols
    for (const protocolID of this.getRegisteredProtocols()) {
      if (!mustSubscribeProtocolIDs.has(protocolID)) {
        // Async because of writing to peerstore -_- should never throw
        this.unregisterProtocol(protocolID).catch((e) => {
          this.logger.error("Error on ReqResp.unregisterProtocol", {protocolID}, e);
        });
      }
    }

    // Subscribe required protocols
    for (const [protocol, handler] of mustSubscribeProtocols) {
      this.registerProtocol({...protocol, handler}).catch((e) => {
        this.logger.error("Error on ReqResp.registerProtocol", {protocolID: this.formatProtocolID(protocol)}, e);
      });
    }
  }

  sendRequestWithoutEncoding(
    peerId: PeerId,
    method: ReqRespMethod,
    versions: number[],
    requestData: Uint8Array
  ): AsyncIterable<ResponseIncoming> {
    // Remember preferred encoding
    const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY;

    // Overwrite placeholder requestData from main thread with correct sequenceNumber
    if (method === ReqRespMethod.Ping) {
      requestData = requestSszTypeByMethod(ForkName.phase0, this.config)[ReqRespMethod.Ping].serialize(
        this.metadataController.seqNumber
      );
    }

    // ReqResp outgoing request, emit from main thread to worker
    return this.sendRequest(peerId, method, versions, encoding, requestData);
  }

  async sendPing(peerId: PeerId): Promise<phase0.Ping> {
    return collectExactOneTyped(
      // Ping method request data is overwritten in worker with correct sequence number
      this.sendReqRespRequest(peerId, ReqRespMethod.Ping, [Version.V1], this.metadataController.seqNumber),
      responseSszTypeByMethod[ReqRespMethod.Ping]
    );
  }

  async sendStatus(peerId: PeerId, request: Status): Promise<Status> {
    return collectExactOneTyped(
      this.sendReqRespRequest(
        peerId,
        ReqRespMethod.Status,
        this.currentRegisteredFork >= ForkSeq.fulu ? [Version.V2] : [Version.V1],
        request
      ),
      responseSszTypeByMethod[ReqRespMethod.Status]
    );
  }

  async sendGoodbye(peerId: PeerId, request: phase0.Goodbye): Promise<void> {
    // TODO: Replace with "ignore response after request"
    await collectExactOneTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.Goodbye, [Version.V1], request),
      responseSszTypeByMethod[ReqRespMethod.Goodbye]
    );
  }

  async sendMetadata(peerId: PeerId): Promise<Metadata> {
    return collectExactOneTyped(
      this.sendReqRespRequest(
        peerId,
        ReqRespMethod.Metadata,
        this.currentRegisteredFork >= ForkSeq.fulu
          ? [Version.V3]
          : this.currentRegisteredFork >= ForkSeq.altair
            ? [Version.V3, Version.V2]
            : [Version.V2, Version.V1],
        null
      ),
      responseSszTypeByMethod[ReqRespMethod.Metadata]
    );
  }

  private sendReqRespRequest<Req>(
    peerId: PeerId,
    method: ReqRespMethod,
    versions: number[],
    request: Req
  ): AsyncIterable<ResponseIncoming> {
    const fork = ForkName[ForkSeq[this.currentRegisteredFork] as ForkName];
    const requestType = requestSszTypeByMethod(fork, this.config)[method];
    const requestData = requestType ? requestType.serialize(request as never) : new Uint8Array();
    return this.sendRequestWithoutEncoding(peerId, method, versions, requestData);
  }

  /**
   * Returns the list of protocols that must be subscribed during a specific fork.
   * Any protocol not in this list must be un-subscribed.
   */
  private getProtocolsAtBoundary(boundary: ForkBoundary): [ProtocolNoHandler, ProtocolHandler][] {
    const {fork} = boundary;
    const protocolsAtFork: [ProtocolNoHandler, ProtocolHandler][] = [
      [protocols.Ping(fork, this.config), this.onPing.bind(this)],
      [protocols.Goodbye(fork, this.config), this.onGoodbye.bind(this)],
      // Support V3 methods as soon as implemented (for fulu)
      // Follows pattern for altair:
      // Ref https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
      [protocols.MetadataV3(fork, this.config), this.onMetadata.bind(this)],
      [protocols.BeaconBlocksByRangeV2(fork, this.config), this.getHandler(ReqRespMethod.BeaconBlocksByRange)],
      [protocols.BeaconBlocksByRootV2(fork, this.config), this.getHandler(ReqRespMethod.BeaconBlocksByRoot)],
    ];

    if (ForkSeq[fork] < ForkSeq.altair) {
      // Unregister V1 topics at the fork boundary, so only declare for pre-altair
      protocolsAtFork.push(
        [protocols.Metadata(fork, this.config), this.onMetadata.bind(this)],
        [protocols.BeaconBlocksByRange(fork, this.config), this.getHandler(ReqRespMethod.BeaconBlocksByRange)],
        [protocols.BeaconBlocksByRoot(fork, this.config), this.getHandler(ReqRespMethod.BeaconBlocksByRoot)]
      );
    }

    if (ForkSeq[fork] >= ForkSeq.altair && !this.disableLightClientServer) {
      // Should be okay to enable before altair, but for consistency only enable afterwards
      protocolsAtFork.push(
        [protocols.LightClientBootstrap(fork, this.config), this.getHandler(ReqRespMethod.LightClientBootstrap)],
        [
          protocols.LightClientFinalityUpdate(fork, this.config),
          this.getHandler(ReqRespMethod.LightClientFinalityUpdate),
        ],
        [
          protocols.LightClientOptimisticUpdate(fork, this.config),
          this.getHandler(ReqRespMethod.LightClientOptimisticUpdate),
        ],
        [
          protocols.LightClientUpdatesByRange(fork, this.config),
          this.getHandler(ReqRespMethod.LightClientUpdatesByRange),
        ]
      );
    }

    if (ForkSeq[fork] >= ForkSeq.deneb) {
      protocolsAtFork.push(
        [protocols.BlobSidecarsByRoot(fork, this.config), this.getHandler(ReqRespMethod.BlobSidecarsByRoot)],
        [protocols.BlobSidecarsByRange(fork, this.config), this.getHandler(ReqRespMethod.BlobSidecarsByRange)]
      );
    }

    if (ForkSeq[fork] < ForkSeq.fulu) {
      // Unregister StatusV1, MetadataV2 at the fork boundary, so only declare for pre-fulu
      protocolsAtFork.push(
        [protocols.Status(fork, this.config), this.onStatus.bind(this)],
        [protocols.MetadataV2(fork, this.config), this.onMetadata.bind(this)]
      );
    } else {
      protocolsAtFork.push(
        // We can't handle StatusV2 correctly pre-fulu as request type is selected based on fork
        // instead of protocol version. This is not easily fixable with our current architecture.
        // See https://github.com/ChainSafe/lodestar/pull/8168 for more details.
        [protocols.StatusV2(fork, this.config), this.onStatus.bind(this)],
        [protocols.BeaconBlocksByHead(fork, this.config), this.getHandler(ReqRespMethod.BeaconBlocksByHead)],
        [
          protocols.DataColumnSidecarsByRoot(fork, this.config),
          this.getHandler(ReqRespMethod.DataColumnSidecarsByRoot),
        ],
        [
          protocols.DataColumnSidecarsByRange(fork, this.config),
          this.getHandler(ReqRespMethod.DataColumnSidecarsByRange),
        ]
      );
    }

    if (ForkSeq[fork] >= ForkSeq.gloas) {
      protocolsAtFork.push(
        [
          protocols.ExecutionPayloadEnvelopesByRoot(fork, this.config),
          this.getHandler(ReqRespMethod.ExecutionPayloadEnvelopesByRoot),
        ],
        [
          protocols.ExecutionPayloadEnvelopesByRange(fork, this.config),
          this.getHandler(ReqRespMethod.ExecutionPayloadEnvelopesByRange),
        ]
      );
    }

    return protocolsAtFork;
  }

  protected onIncomingRequestBody(request: RequestTypedContainer, peer: PeerId): void {
    const peerClient = this.peersData.getPeerKind(peer.toString()) ?? ClientKind.Unknown;
    // Allow onRequest to return and close the stream
    // For Goodbye there may be a race condition where the listener of `receivedGoodbye`
    // disconnects in the same synchronous call, preventing the stream from ending cleanly
    callInNextEventLoop(() => this.networkEventBus.emit(NetworkEvent.reqRespRequest, {request, peer, peerClient}));
  }

  protected onIncomingRequest(peerId: PeerId, protocol: ProtocolDescriptor): void {
    // Remember preferred encoding
    if (protocol.method === ReqRespMethod.Status) {
      this.peersData.setEncodingPreference(peerId.toString(), protocol.encoding);
    }
  }

  protected onOutgoingRequestError(peerId: PeerId, method: ReqRespMethod, error: RequestError): void {
    const peerAction = onOutgoingReqRespError(error, method);
    if (peerAction !== null) {
      this.peerRpcScores.applyAction(peerId, peerAction, error.type.code);
    }
  }

  private async *onStatus(req: ReqRespRequest, peerId: PeerId): AsyncIterable<ResponseOutgoing> {
    // Fork is ignored in responseSszTypeByMethod, type is determined by protocol version that is negotiated
    const type = responseSszTypeByMethod[ReqRespMethod.Status](ForkName.phase0, req.version);
    // Request uses the same type as response
    const body = type.deserialize(req.data);
    this.onIncomingRequestBody({method: ReqRespMethod.Status, body}, peerId);

    const status = this.statusCache.get();
    yield {
      data: type.serialize(status),
      // Status topic is fork-agnostic
      boundary: {fork: ForkName.phase0, epoch: GENESIS_EPOCH},
    };
  }

  private async *onGoodbye(req: ReqRespRequest, peerId: PeerId): AsyncIterable<ResponseOutgoing> {
    const body = ssz.phase0.Goodbye.deserialize(req.data);
    this.onIncomingRequestBody({method: ReqRespMethod.Goodbye, body}, peerId);

    yield {
      data: ssz.phase0.Goodbye.serialize(BigInt(0)),
      // Goodbye topic is fork-agnostic
      boundary: {fork: ForkName.phase0, epoch: GENESIS_EPOCH},
    };
  }

  private async *onPing(req: ReqRespRequest, peerId: PeerId): AsyncIterable<ResponseOutgoing> {
    const body = ssz.phase0.Ping.deserialize(req.data);
    this.onIncomingRequestBody({method: ReqRespMethod.Ping, body}, peerId);
    yield {
      data: ssz.phase0.Ping.serialize(this.metadataController.seqNumber),
      // Ping topic is fork-agnostic
      boundary: {fork: ForkName.phase0, epoch: GENESIS_EPOCH},
    };
  }

  private async *onMetadata(req: ReqRespRequest, peerId: PeerId): AsyncIterable<ResponseOutgoing> {
    this.onIncomingRequestBody({method: ReqRespMethod.Metadata, body: null}, peerId);

    const metadata = this.metadataController.json;

    // Fork is ignored in responseSszTypeByMethod, type is determined by protocol version that is negotiated
    const type = responseSszTypeByMethod[ReqRespMethod.Metadata](ForkName.phase0, req.version);

    yield {
      data: type.serialize(metadata),
      // Metadata topic is fork-agnostic
      boundary: {fork: ForkName.phase0, epoch: GENESIS_EPOCH},
    };
  }
}
