import type {PeerScoreStatsDump} from "@libp2p/gossipsub/score";
import type {PublishOpts} from "@libp2p/gossipsub/types";
import type {PeerId, PrivateKey} from "@libp2p/interface";
import {peerIdFromPrivateKey} from "@libp2p/peer-id";
import {routes} from "@lodestar/api";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {ForkSeq} from "@lodestar/params";
import {ResponseIncoming} from "@lodestar/reqresp";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {
  AttesterSlashing,
  DataColumnSidecar,
  LightClientBootstrap,
  LightClientFinalityUpdate,
  LightClientOptimisticUpdate,
  LightClientUpdate,
  Root,
  SignedAggregateAndProof,
  SignedBeaconBlock,
  SingleAttestation,
  SlotRootHex,
  SubnetID,
  altair,
  capella,
  deneb,
  fulu,
  gloas,
  isGloasDataColumnSidecar,
  phase0,
} from "@lodestar/types";
import {prettyPrintIndices, sleep} from "@lodestar/utils";
import {BlockInputSource} from "../chain/blocks/blockInput/types.js";
import {ChainEvent, IBeaconChain} from "../chain/index.js";
import {computeSubnetForDataColumnSidecar} from "../chain/validation/dataColumnSidecar.js";
import {IBeaconDb} from "../db/interface.js";
import {Metrics, RegistryMetricCreator} from "../metrics/index.js";
import {IClock} from "../util/clock.js";
import {CustodyConfig} from "../util/dataColumns.js";
import {PeerIdStr, peerIdToString} from "../util/peerId.js";
import {promiseAllMaybeAsync} from "../util/promises.js";
import {
  BeaconBlocksByRootRequest,
  BlobSidecarsByRootRequest,
  DataColumnSidecarsByRootRequest,
  ExecutionPayloadEnvelopesByRootRequest,
} from "../util/types.js";
import {INetworkCore, NetworkCore, WorkerNetworkCore} from "./core/index.js";
import {INetworkEventBus, NetworkEvent, NetworkEventBus, NetworkEventData} from "./events.js";
import {getActiveForkBoundaries} from "./forks.js";
import {GossipHandlers, GossipTopicMap, GossipType, GossipTypeMap} from "./gossip/index.js";
import {getGossipSSZType, gossipTopicIgnoreDuplicatePublishError, stringifyGossipTopic} from "./gossip/topic.js";
import {INetwork} from "./interface.js";
import {NetworkOptions} from "./options.js";
import {PeerAction, PeerScoreStats} from "./peers/index.js";
import {PeerSyncMeta} from "./peers/peersData.js";
import {AggregatorTracker} from "./processor/aggregatorTracker.js";
import {NetworkProcessor, PendingGossipsubMessage} from "./processor/index.js";
import {ReqRespMethod} from "./reqresp/index.js";
import {GetReqRespHandlerFn, Version, requestSszTypeByMethod, responseSszTypeByMethod} from "./reqresp/types.js";
import {
  collectExactOneTyped,
  collectMaxResponseTyped,
  collectMaxResponseTypedWithBytes,
} from "./reqresp/utils/collect.js";
import {collectSequentialBlocksInRange} from "./reqresp/utils/collectSequentialBlocksInRange.js";
import {CommitteeSubscription} from "./subnets/index.js";
import {isPublishToZeroPeersError, prettyPrintPeerIdStr} from "./util.js";

type NetworkModules = {
  opts: NetworkOptions;
  privateKey: PrivateKey;
  config: BeaconConfig;
  logger: LoggerNode;
  chain: IBeaconChain;
  networkEventBus: NetworkEventBus;
  aggregatorTracker: AggregatorTracker;
  networkProcessor: NetworkProcessor;
  core: INetworkCore;
};

export type NetworkInitModules = {
  opts: NetworkOptions;
  config: BeaconConfig;
  privateKey: PrivateKey;
  peerStoreDir?: string;
  logger: LoggerNode;
  metrics: Metrics | null;
  chain: IBeaconChain;
  db: IBeaconDb;
  getReqRespHandler: GetReqRespHandlerFn;
  // Optionally pass custom GossipHandlers, for testing
  gossipHandlers?: GossipHandlers;
};

/**
 * Must support running both on worker and on main thread.
 *
 * Exists a front class that's what consumers interact with.
 * This class will multiplex between:
 * - libp2p in worker
 * - libp2p in main thread
 */
export class Network implements INetwork {
  readonly peerId: PeerId;
  readonly custodyConfig: CustodyConfig;
  // TODO: Make private
  readonly events: INetworkEventBus;

  private readonly logger: LoggerNode;
  private readonly config: BeaconConfig;
  private readonly clock: IClock;
  private readonly chain: IBeaconChain;
  // Used only for sleep() statements
  private readonly controller: AbortController;

  // TODO: Review
  private readonly networkProcessor: NetworkProcessor;
  private readonly core: INetworkCore;
  private readonly aggregatorTracker: AggregatorTracker;

  private subscribedToCoreTopics = false;
  private connectedPeersSyncMeta = new Map<PeerIdStr, Omit<PeerSyncMeta, "peerId">>();

  constructor(modules: NetworkModules) {
    this.peerId = peerIdFromPrivateKey(modules.privateKey);
    this.config = modules.config;
    this.custodyConfig = modules.chain.custodyConfig;
    this.logger = modules.logger;
    this.chain = modules.chain;
    this.clock = modules.chain.clock;
    this.controller = new AbortController();
    this.events = modules.networkEventBus;
    this.networkProcessor = modules.networkProcessor;
    this.core = modules.core;
    this.aggregatorTracker = modules.aggregatorTracker;

    this.events.on(NetworkEvent.peerConnected, this.onPeerConnected);
    this.events.on(NetworkEvent.peerDisconnected, this.onPeerDisconnected);
    this.chain.emitter.on(routes.events.EventType.head, this.onHead);
    this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, ({data}) =>
      this.onLightClientFinalityUpdate(data)
    );
    this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, ({data}) =>
      this.onLightClientOptimisticUpdate(data)
    );
    this.chain.emitter.on(ChainEvent.updateTargetCustodyGroupCount, this.onTargetGroupCountUpdated);
    this.chain.emitter.on(ChainEvent.publishDataColumns, this.onPublishDataColumns);
    this.chain.emitter.on(ChainEvent.publishBlobSidecars, this.onPublishBlobSidecars);
    this.chain.emitter.on(ChainEvent.updateStatus, this.onUpdateStatus);
  }

  static async init({
    opts,
    config,
    logger,
    metrics,
    chain,
    db,
    gossipHandlers,
    privateKey,
    peerStoreDir,
    getReqRespHandler,
  }: NetworkInitModules): Promise<Network> {
    const events = new NetworkEventBus();
    const aggregatorTracker = new AggregatorTracker();

    const activeValidatorCount = chain.getHeadState().activeValidatorCount;
    const initialStatus = chain.getStatus();
    const initialCustodyGroupCount = chain.custodyConfig.targetCustodyGroupCount;

    if (opts.useWorker) {
      logger.info("running libp2p instance in worker thread");
    }

    const core = opts.useWorker
      ? await WorkerNetworkCore.init({
          opts: {
            ...opts,
            peerStoreDir,
            metricsEnabled: Boolean(metrics),
            activeValidatorCount,
            genesisTime: chain.genesisTime,
            initialStatus,
            initialCustodyGroupCount,
          },
          config,
          privateKey,
          logger,
          events,
          metrics,
          getReqRespHandler,
        })
      : await NetworkCore.init({
          opts,
          config,
          privateKey,
          peerStoreDir,
          logger,
          clock: chain.clock,
          events,
          getReqRespHandler,
          metricsRegistry: metrics ? new RegistryMetricCreator() : null,
          initialStatus,
          initialCustodyGroupCount,
          activeValidatorCount,
        });

    const networkProcessor = new NetworkProcessor(
      {chain, db, config, logger, metrics, events, gossipHandlers, core, aggregatorTracker},
      opts
    );

    const multiaddresses = opts.localMultiaddrs?.join(",");
    const peerId = peerIdFromPrivateKey(privateKey);
    logger.info(`PeerId ${peerIdToString(peerId)}, Multiaddrs ${multiaddresses}`);

    return new Network({
      opts,
      privateKey,
      config,
      logger,
      chain,
      networkEventBus: events,
      aggregatorTracker,
      networkProcessor,
      core,
    });
  }

  get closed(): boolean {
    return this.controller.signal.aborted;
  }

  /** Destroy this instance. Can only be called once. */
  async close(): Promise<void> {
    if (this.closed) return;

    this.events.off(NetworkEvent.peerConnected, this.onPeerConnected);
    this.events.off(NetworkEvent.peerDisconnected, this.onPeerDisconnected);
    this.chain.emitter.off(routes.events.EventType.head, this.onHead);
    this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
    this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
    this.chain.emitter.off(ChainEvent.updateTargetCustodyGroupCount, this.onTargetGroupCountUpdated);
    this.chain.emitter.off(ChainEvent.publishDataColumns, this.onPublishDataColumns);
    this.chain.emitter.off(ChainEvent.publishBlobSidecars, this.onPublishBlobSidecars);
    this.chain.emitter.off(ChainEvent.updateStatus, this.onUpdateStatus);
    await this.core.close();

    // Used only for sleep() statements
    this.controller.abort();
    this.logger.debug("network core closed");
  }

  async scrapeMetrics(): Promise<string> {
    return this.core.scrapeMetrics();
  }

  /**
   * Request att subnets up `toSlot`. Network will ensure to mantain some peers for each
   */
  async prepareBeaconCommitteeSubnets(subscriptions: CommitteeSubscription[]): Promise<void> {
    for (const subscription of subscriptions) {
      if (subscription.isAggregator) {
        this.aggregatorTracker.addAggregator(subscription.subnet, subscription.slot);
      }
    }
    this.aggregatorTracker.prune();

    return this.core.prepareBeaconCommitteeSubnets(subscriptions);
  }

  async prepareSyncCommitteeSubnets(subscriptions: CommitteeSubscription[]): Promise<void> {
    return this.core.prepareSyncCommitteeSubnets(subscriptions);
  }

  /**
   * The app layer needs to refresh the status of some peers. The sync have reached a target
   */
  async reStatusPeers(peers: PeerIdStr[]): Promise<void> {
    return this.core.reStatusPeers(peers);
  }

  searchUnknownBlock(slotRoot: SlotRootHex, source: BlockInputSource, peer?: PeerIdStr): void {
    this.networkProcessor.searchUnknownBlock(slotRoot, source, peer);
  }

  searchUnknownEnvelope(slotRoot: SlotRootHex, source: BlockInputSource, peer?: PeerIdStr): void {
    this.networkProcessor.searchUnknownEnvelope(slotRoot, source, peer);
  }

  async reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise<void> {
    return this.core.reportPeer(peer, action, actionName);
  }

  // REST API queries
  getConnectedPeers(): PeerIdStr[] {
    return Array.from(this.connectedPeersSyncMeta.keys());
  }

  getConnectedPeerSyncMeta(peerId: PeerIdStr): PeerSyncMeta {
    const syncMeta = this.connectedPeersSyncMeta.get(peerId);
    if (!syncMeta) {
      throw new Error(`peerId=${prettyPrintPeerIdStr(peerId)} not in connectedPeerSyncMeta`);
    }
    return {peerId, ...syncMeta};
  }

  getConnectedPeerCount(): number {
    return this.connectedPeersSyncMeta.size;
  }

  async getNetworkIdentity(): Promise<routes.node.NetworkIdentity> {
    return this.core.getNetworkIdentity();
  }

  /**
   * Subscribe to all gossip events. Safe to call multiple times
   */
  async subscribeGossipCoreTopics(): Promise<void> {
    if (!this.subscribedToCoreTopics) {
      await this.core.subscribeGossipCoreTopics();
      // Only mark subscribedToCoreTopics if worker resolved this call
      this.subscribedToCoreTopics = true;
    }
  }

  /**
   * Unsubscribe from all gossip events. Safe to call multiple times
   */
  async unsubscribeGossipCoreTopics(): Promise<void> {
    // Drop all the gossip validation queues
    this.networkProcessor.dropAllJobs();

    await this.core.unsubscribeGossipCoreTopics();
    this.subscribedToCoreTopics = false;
  }

  isSubscribedToGossipCoreTopics(): boolean {
    return this.subscribedToCoreTopics;
  }

  shouldAggregate(subnet: SubnetID, slot: number): boolean {
    return this.aggregatorTracker.shouldAggregate(subnet, slot);
  }

  // Gossip

  async publishBeaconBlock(signedBlock: SignedBeaconBlock): Promise<number> {
    const epoch = computeEpochAtSlot(signedBlock.message.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.beacon_block>({type: GossipType.beacon_block, boundary}, signedBlock, {
      ignoreDuplicatePublishError: true,
    });
  }

  async publishBlobSidecar(blobSidecar: deneb.BlobSidecar): Promise<number> {
    const epoch = computeEpochAtSlot(blobSidecar.signedBlockHeader.message.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    const subnet = blobSidecar.index;

    return this.publishGossip<GossipType.blob_sidecar>({type: GossipType.blob_sidecar, boundary, subnet}, blobSidecar, {
      ignoreDuplicatePublishError: true,
    });
  }

  async publishDataColumnSidecar(dataColumnSidecar: DataColumnSidecar): Promise<number> {
    const slot = isGloasDataColumnSidecar(dataColumnSidecar)
      ? dataColumnSidecar.slot
      : dataColumnSidecar.signedBlockHeader.message.slot;
    const epoch = computeEpochAtSlot(slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    const subnet = computeSubnetForDataColumnSidecar(this.config, dataColumnSidecar);
    return this.publishGossip<GossipType.data_column_sidecar>(
      {type: GossipType.data_column_sidecar, boundary, subnet},
      dataColumnSidecar,
      {
        ignoreDuplicatePublishError: true,
        // we ensure having all topic peers via prioritizePeers() function
        // in the worse case, if there is 0 peer on the topic, the overall publish operation could be still a success
        // because supernode will rebuild and publish missing data column sidecars for us
        // hence we want to track sent peers as 0 instead of an error
        allowPublishToZeroTopicPeers: true,
      }
    );
  }

  async publishBeaconAggregateAndProof(aggregateAndProof: SignedAggregateAndProof): Promise<number> {
    const epoch = computeEpochAtSlot(aggregateAndProof.message.aggregate.data.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.beacon_aggregate_and_proof>(
      {type: GossipType.beacon_aggregate_and_proof, boundary},
      aggregateAndProof,
      {ignoreDuplicatePublishError: true}
    );
  }

  async publishBeaconAttestation(attestation: SingleAttestation, subnet: SubnetID): Promise<number> {
    const epoch = computeEpochAtSlot(attestation.data.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.beacon_attestation>(
      {type: GossipType.beacon_attestation, boundary, subnet},
      attestation,
      {ignoreDuplicatePublishError: true}
    );
  }

  async publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<number> {
    const epoch = voluntaryExit.message.epoch;
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.voluntary_exit>({type: GossipType.voluntary_exit, boundary}, voluntaryExit, {
      ignoreDuplicatePublishError: true,
    });
  }

  async publishBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<number> {
    const publishChanges = [];
    for (const boundary of getActiveForkBoundaries(this.config, this.clock.currentEpoch)) {
      const fork = ForkSeq[boundary.fork];

      if (fork >= ForkSeq.capella) {
        const publishPromise = this.publishGossip<GossipType.bls_to_execution_change>(
          {type: GossipType.bls_to_execution_change, boundary},
          blsToExecutionChange,
          {ignoreDuplicatePublishError: true}
        );
        publishChanges.push(publishPromise);
      }
    }

    if (publishChanges.length === 0) {
      throw Error("No capella+ fork active yet to publish blsToExecutionChange");
    }
    return Promise.any(publishChanges);
  }

  async publishProposerSlashing(proposerSlashing: phase0.ProposerSlashing): Promise<number> {
    const epoch = computeEpochAtSlot(Number(proposerSlashing.signedHeader1.message.slot as bigint));
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.proposer_slashing>(
      {type: GossipType.proposer_slashing, boundary},
      proposerSlashing
    );
  }

  async publishAttesterSlashing(attesterSlashing: AttesterSlashing): Promise<number> {
    const epoch = computeEpochAtSlot(Number(attesterSlashing.attestation1.data.slot as bigint));
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.attester_slashing>(
      {type: GossipType.attester_slashing, boundary},
      attesterSlashing
    );
  }

  async publishSyncCommitteeSignature(signature: altair.SyncCommitteeMessage, subnet: SubnetID): Promise<number> {
    const epoch = computeEpochAtSlot(signature.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.sync_committee>(
      {type: GossipType.sync_committee, boundary, subnet},
      signature,
      {
        ignoreDuplicatePublishError: true,
      }
    );
  }

  async publishContributionAndProof(contributionAndProof: altair.SignedContributionAndProof): Promise<number> {
    const epoch = computeEpochAtSlot(contributionAndProof.message.contribution.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.sync_committee_contribution_and_proof>(
      {type: GossipType.sync_committee_contribution_and_proof, boundary},
      contributionAndProof,
      {ignoreDuplicatePublishError: true}
    );
  }

  async publishLightClientFinalityUpdate(update: LightClientFinalityUpdate): Promise<number> {
    const epoch = computeEpochAtSlot(update.signatureSlot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.light_client_finality_update>(
      {type: GossipType.light_client_finality_update, boundary},
      update
    );
  }

  async publishLightClientOptimisticUpdate(update: LightClientOptimisticUpdate): Promise<number> {
    const epoch = computeEpochAtSlot(update.signatureSlot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.light_client_optimistic_update>(
      {type: GossipType.light_client_optimistic_update, boundary},
      update
    );
  }

  async publishSignedExecutionPayloadEnvelope(signedEnvelope: gloas.SignedExecutionPayloadEnvelope): Promise<number> {
    const epoch = computeEpochAtSlot(signedEnvelope.message.payload.slotNumber);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.execution_payload>(
      {type: GossipType.execution_payload, boundary},
      signedEnvelope,
      {ignoreDuplicatePublishError: true}
    );
  }

  async publishPayloadAttestationMessage(payloadAttestationMessage: gloas.PayloadAttestationMessage): Promise<number> {
    const epoch = computeEpochAtSlot(payloadAttestationMessage.data.slot);
    const boundary = this.config.getForkBoundaryAtEpoch(epoch);

    return this.publishGossip<GossipType.payload_attestation_message>(
      {type: GossipType.payload_attestation_message, boundary},
      payloadAttestationMessage,
      {ignoreDuplicatePublishError: true}
    );
  }

  private async publishGossip<K extends GossipType>(
    topic: GossipTopicMap[K],
    object: GossipTypeMap[K],
    opts?: PublishOpts | undefined
  ): Promise<number> {
    const topicStr = stringifyGossipTopic(this.config, topic);
    const sszType = getGossipSSZType(topic);
    const messageData = (sszType.serialize as (object: GossipTypeMap[GossipType]) => Uint8Array)(object);
    opts = {
      ...opts,
      ignoreDuplicatePublishError: gossipTopicIgnoreDuplicatePublishError[topic.type],
    };
    const sentPeers = await this.core.publishGossip(topicStr, messageData, opts);

    this.logger.verbose("Publish to topic", {topic: topicStr, sentPeers, currentSlot: this.clock.currentSlot});
    return sentPeers;
  }

  // ReqResp

  async sendBeaconBlocksByRange(
    peerId: PeerIdStr,
    request: phase0.BeaconBlocksByRangeRequest
  ): Promise<SignedBeaconBlock[]> {
    return collectSequentialBlocksInRange(
      this.sendReqRespRequest(
        peerId,
        ReqRespMethod.BeaconBlocksByRange,
        // Before altair, prioritize V2. After altair only request V2
        this.config.getForkSeq(this.clock.currentSlot) >= ForkSeq.altair ? [Version.V2] : [Version.V2, Version.V1],
        request
      ),
      request,
      this.chain.serializedCache
    );
  }

  async sendBeaconBlocksByRoot(peerId: PeerIdStr, request: BeaconBlocksByRootRequest): Promise<SignedBeaconBlock[]> {
    return collectMaxResponseTypedWithBytes(
      this.sendReqRespRequest(
        peerId,
        ReqRespMethod.BeaconBlocksByRoot,
        // Before altair, prioritize V2. After altair only request V2
        this.config.getForkSeq(this.clock.currentSlot) >= ForkSeq.altair ? [Version.V2] : [Version.V2, Version.V1],
        request
      ),
      request.length,
      responseSszTypeByMethod[ReqRespMethod.BeaconBlocksByRoot],
      this.chain.serializedCache
    );
  }

  async sendBeaconBlocksByHead(
    peerId: PeerIdStr,
    request: fulu.BeaconBlocksByHeadRequest
  ): Promise<SignedBeaconBlock[]> {
    return collectMaxResponseTypedWithBytes(
      this.sendReqRespRequest(peerId, ReqRespMethod.BeaconBlocksByHead, [Version.V1], request),
      Math.min(request.count, this.config.MAX_REQUEST_BLOCKS_DENEB),
      responseSszTypeByMethod[ReqRespMethod.BeaconBlocksByHead],
      this.chain.serializedCache
    );
  }

  async sendLightClientBootstrap(peerId: PeerIdStr, request: Root): Promise<LightClientBootstrap> {
    return collectExactOneTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.LightClientBootstrap, [Version.V1], request),
      responseSszTypeByMethod[ReqRespMethod.LightClientBootstrap]
    );
  }

  async sendLightClientOptimisticUpdate(peerId: PeerIdStr): Promise<LightClientOptimisticUpdate> {
    return collectExactOneTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.LightClientOptimisticUpdate, [Version.V1], null),
      responseSszTypeByMethod[ReqRespMethod.LightClientOptimisticUpdate]
    );
  }

  async sendLightClientFinalityUpdate(peerId: PeerIdStr): Promise<LightClientFinalityUpdate> {
    return collectExactOneTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.LightClientFinalityUpdate, [Version.V1], null),
      responseSszTypeByMethod[ReqRespMethod.LightClientFinalityUpdate]
    );
  }

  async sendLightClientUpdatesByRange(
    peerId: PeerIdStr,
    request: altair.LightClientUpdatesByRange
  ): Promise<LightClientUpdate[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.LightClientUpdatesByRange, [Version.V1], request),
      request.count,
      responseSszTypeByMethod[ReqRespMethod.LightClientUpdatesByRange]
    );
  }

  async sendBlobSidecarsByRange(
    peerId: PeerIdStr,
    request: deneb.BlobSidecarsByRangeRequest
  ): Promise<deneb.BlobSidecar[]> {
    const epoch = computeEpochAtSlot(request.startSlot);
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.BlobSidecarsByRange, [Version.V1], request),
      // request's count represent the slots, so the actual max count received could be slots * blobs per slot
      request.count * this.config.getMaxBlobsPerBlock(epoch),
      responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRange]
    );
  }

  async sendBlobSidecarsByRoot(peerId: PeerIdStr, request: BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.BlobSidecarsByRoot, [Version.V1], request),
      request.length,
      responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRoot],
      this.chain.serializedCache
    );
  }

  async sendDataColumnSidecarsByRange(
    peerId: PeerIdStr,
    request: fulu.DataColumnSidecarsByRangeRequest
  ): Promise<DataColumnSidecar[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.DataColumnSidecarsByRange, [Version.V1], request),
      request.count * request.columns.length,
      responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRange]
    );
  }

  async sendDataColumnSidecarsByRoot(
    peerId: PeerIdStr,
    request: DataColumnSidecarsByRootRequest
  ): Promise<DataColumnSidecar[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.DataColumnSidecarsByRoot, [Version.V1], request),
      request.reduce((total, {columns}) => total + columns.length, 0),
      responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRoot],
      this.chain.serializedCache
    );
  }

  async sendExecutionPayloadEnvelopesByRange(
    peerId: PeerIdStr,
    request: gloas.ExecutionPayloadEnvelopesByRangeRequest
  ): Promise<gloas.SignedExecutionPayloadEnvelope[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.ExecutionPayloadEnvelopesByRange, [Version.V1], request),
      request.count,
      responseSszTypeByMethod[ReqRespMethod.ExecutionPayloadEnvelopesByRange]
    );
  }

  async sendExecutionPayloadEnvelopesByRoot(
    peerId: PeerIdStr,
    request: ExecutionPayloadEnvelopesByRootRequest
  ): Promise<gloas.SignedExecutionPayloadEnvelope[]> {
    return collectMaxResponseTyped(
      this.sendReqRespRequest(peerId, ReqRespMethod.ExecutionPayloadEnvelopesByRoot, [Version.V1], request),
      request.length,
      responseSszTypeByMethod[ReqRespMethod.ExecutionPayloadEnvelopesByRoot],
      this.chain.serializedCache
    );
  }

  private sendReqRespRequest<Req>(
    peerId: PeerIdStr,
    method: ReqRespMethod,
    versions: number[],
    request: Req
  ): AsyncIterable<ResponseIncoming> {
    const fork = this.config.getForkName(this.clock.currentSlot);
    const requestType = requestSszTypeByMethod(fork, this.config)[method];
    const requestData = requestType ? requestType.serialize(request as never) : new Uint8Array();

    // ReqResp outgoing request, emit from main thread to worker
    return this.core.sendReqRespRequest({peerId, method, versions, requestData});
  }

  // Debug

  connectToPeer(peer: string, multiaddr: string[]): Promise<void> {
    return this.core.connectToPeer(peer, multiaddr);
  }

  disconnectPeer(peer: string): Promise<void> {
    return this.core.disconnectPeer(peer);
  }

  addDirectPeer(peer: routes.lodestar.DirectPeer): Promise<string | null> {
    return this.core.addDirectPeer(peer);
  }

  removeDirectPeer(peerId: string): Promise<boolean> {
    return this.core.removeDirectPeer(peerId);
  }

  getDirectPeers(): Promise<string[]> {
    return this.core.getDirectPeers();
  }

  dumpPeer(peerIdStr: string): Promise<routes.lodestar.LodestarNodePeer | undefined> {
    return this.core.dumpPeer(peerIdStr);
  }

  dumpPeers(): Promise<routes.lodestar.LodestarNodePeer[]> {
    return this.core.dumpPeers();
  }

  dumpPeerScoreStats(): Promise<PeerScoreStats> {
    return this.core.dumpPeerScoreStats();
  }

  dumpGossipPeerScoreStats(): Promise<PeerScoreStatsDump> {
    return this.core.dumpGossipPeerScoreStats();
  }

  dumpDiscv5KadValues(): Promise<string[]> {
    return this.core.dumpDiscv5KadValues();
  }

  dumpMeshPeers(): Promise<Record<string, string[]>> {
    return this.core.dumpMeshPeers();
  }

  async dumpGossipQueue(gossipType: GossipType): Promise<PendingGossipsubMessage[]> {
    return this.networkProcessor.dumpGossipQueue(gossipType);
  }

  async writeNetworkThreadProfile(durationMs: number, dirpath: string): Promise<string> {
    return this.core.writeNetworkThreadProfile(durationMs, dirpath);
  }

  async writeDiscv5Profile(durationMs: number, dirpath: string): Promise<string> {
    return this.core.writeDiscv5Profile(durationMs, dirpath);
  }

  async writeNetworkHeapSnapshot(prefix: string, dirpath: string): Promise<string> {
    return this.core.writeNetworkHeapSnapshot(prefix, dirpath);
  }

  async writeDiscv5HeapSnapshot(prefix: string, dirpath: string): Promise<string> {
    return this.core.writeDiscv5HeapSnapshot(prefix, dirpath);
  }

  private onLightClientFinalityUpdate = async (finalityUpdate: LightClientFinalityUpdate): Promise<void> => {
    // TODO: Review is OK to remove if (this.hasAttachedSyncCommitteeMember())

    try {
      // messages SHOULD be broadcast after SYNC_MESSAGE_DUE_BPS of slot has transpired
      // https://github.com/ethereum/consensus-specs/blob/v1.6.1/specs/altair/light-client/p2p-interface.md#sync-committee
      await this.waitForSyncMessageCutoff(finalityUpdate.signatureSlot);
      await this.publishLightClientFinalityUpdate(finalityUpdate);
    } catch (e) {
      // Non-mandatory route on most of network as of Oct 2022. May not have found any peers on topic yet
      // Remove once https://github.com/ChainSafe/js-libp2p-gossipsub/issues/367
      if (!isPublishToZeroPeersError(e as Error)) {
        this.logger.debug("Error on BeaconGossipHandler.onLightclientFinalityUpdate", {}, e as Error);
      }
    }
  };

  private onLightClientOptimisticUpdate = async (optimisticUpdate: LightClientOptimisticUpdate): Promise<void> => {
    // TODO: Review is OK to remove if (this.hasAttachedSyncCommitteeMember())

    try {
      // messages SHOULD be broadcast after SYNC_MESSAGE_DUE_BPS of slot has transpired
      // https://github.com/ethereum/consensus-specs/blob/v1.6.1/specs/altair/light-client/p2p-interface.md#sync-committee
      await this.waitForSyncMessageCutoff(optimisticUpdate.signatureSlot);
      await this.publishLightClientOptimisticUpdate(optimisticUpdate);
    } catch (e) {
      // Non-mandatory route on most of network as of Oct 2022. May not have found any peers on topic yet
      // Remove once https://github.com/ChainSafe/js-libp2p-gossipsub/issues/367
      if (!isPublishToZeroPeersError(e as Error)) {
        this.logger.debug("Error on BeaconGossipHandler.onLightclientOptimisticUpdate", {}, e as Error);
      }
    }
  };

  private waitForSyncMessageCutoff = async (slot: number): Promise<void> => {
    const fork = this.config.getForkName(slot);
    const msToCutoffTime = this.config.getSyncMessageDueMs(fork) - this.chain.clock.msFromSlot(slot);
    await sleep(msToCutoffTime, this.controller.signal);
  };

  private onHead = async (): Promise<void> => {
    await this.onUpdateStatus();
  };

  private onPeerConnected = (data: NetworkEventData[NetworkEvent.peerConnected]): void => {
    const {peer, clientAgent, custodyColumns, status} = data;
    const earliestAvailableSlot = (status as fulu.Status).earliestAvailableSlot;
    this.logger.verbose("onPeerConnected", {
      peer,
      clientAgent,
      custodyColumns: prettyPrintIndices(custodyColumns),
      earliestAvailableSlot: earliestAvailableSlot ?? "pre-fulu",
    });
    this.connectedPeersSyncMeta.set(peer, {
      client: clientAgent,
      custodyColumns,
      earliestAvailableSlot, // can be undefined pre-fulu
    });
  };

  private onPeerDisconnected = (data: NetworkEventData[NetworkEvent.peerDisconnected]): void => {
    this.connectedPeersSyncMeta.delete(data.peer);
  };

  private onTargetGroupCountUpdated = (count: number): void => {
    this.core.setTargetGroupCount(count);
  };

  private onPublishDataColumns = (sidecars: DataColumnSidecar[]): Promise<number[]> => {
    return promiseAllMaybeAsync(sidecars.map((sidecar) => () => this.publishDataColumnSidecar(sidecar)));
  };

  private onPublishBlobSidecars = (sidecars: deneb.BlobSidecar[]): Promise<number[]> => {
    return promiseAllMaybeAsync(sidecars.map((sidecar) => () => this.publishBlobSidecar(sidecar)));
  };

  private onUpdateStatus = async (): Promise<void> => {
    await this.core.updateStatus(this.chain.getStatus());
  };
}
