import {BeaconConfig, ForkBoundary} from "@lodestar/config";
import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {Epoch, ssz} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {ClockEvent, IClock} from "../../util/clock.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {getActiveForkBoundaries} from "../forks.js";
import {GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {RequestedSubnet, SubnetMap} from "../peers/utils/index.js";
import {CommitteeSubscription, GossipSubscriber, SubnetsService, SubnetsServiceOpts} from "./interface.js";

/**
 * Manage sync committee subnets. Sync committees are long (~27h) so there aren't random long-lived subscriptions
 */
export class SyncnetsService implements SubnetsService {
  /**
   * All currently subscribed subnets. Syncnets do not have additional long-lived
   * random subscriptions since the committees are already active for long periods of time.
   * Also, the node will aggregate through the entire period to simplify the validator logic.
   * So `subscriptionsCommittee` represents subnets to find peers and aggregate data.
   * This class will tell gossip to subscribe and un-subscribe.
   * If a value exists for `SubscriptionId` it means that gossip subscription is active in network.gossip
   */
  private subscriptionsCommittee = new SubnetMap();

  constructor(
    private readonly config: BeaconConfig,
    private readonly clock: IClock,
    private readonly gossip: GossipSubscriber,
    private readonly metadata: MetadataController,
    private readonly logger: Logger,
    private readonly metrics: NetworkCoreMetrics | null,
    private readonly opts?: SubnetsServiceOpts
  ) {
    if (metrics) {
      metrics.syncnetsService.subscriptionsCommittee.addCollect(() => this.onScrapeLodestarMetrics(metrics));
    }

    this.clock.on(ClockEvent.epoch, this.onEpoch);
  }

  close(): void {
    this.clock.off(ClockEvent.epoch, this.onEpoch);
  }

  /**
   * Get all active subnets for the hearbeat.
   */
  getActiveSubnets(): RequestedSubnet[] {
    return this.subscriptionsCommittee.getActiveTtl(this.clock.currentSlot);
  }

  /**
   * Called from the API when validator is a part of a committee.
   */
  addCommitteeSubscriptions(subscriptions: CommitteeSubscription[]): void {
    // Trigger gossip subscription first, in batch
    if (subscriptions.length > 0) {
      this.subscribeToSubnets(subscriptions.map((sub) => sub.subnet));
    }

    // Then, register the subscriptions
    for (const {subnet, slot} of subscriptions) {
      this.subscriptionsCommittee.request({subnet, toSlot: slot});
    }

    // For syncnets regular subscriptions are persisted in the ENR
    this.updateMetadata();
  }

  /** Call ONLY ONCE: Two epoch before the fork, re-subscribe all existing random subscriptions to the new fork  */
  subscribeSubnetsNextBoundary(boundary: ForkBoundary): void {
    this.logger.info("Subscribing to random attnets for next fork boundary", boundary);
    for (const subnet of this.subscriptionsCommittee.getAll()) {
      this.gossip.subscribeTopic({type: GossipType.sync_committee, boundary, subnet});
    }
  }

  /** Call  ONLY ONCE: Two epochs after the fork, un-subscribe all subnets from the old fork */
  unsubscribeSubnetsPrevBoundary(boundary: ForkBoundary): void {
    this.logger.info("Unsubscribing from random attnets of previous fork boundary", boundary);
    for (let subnet = 0; subnet < SYNC_COMMITTEE_SUBNET_COUNT; subnet++) {
      if (!this.opts?.subscribeAllSubnets) {
        this.gossip.unsubscribeTopic({type: GossipType.sync_committee, boundary, subnet});
      }
    }
  }

  /**
   * Run per epoch, clean-up operations that are not urgent
   */
  private onEpoch = (epoch: Epoch): void => {
    try {
      const slot = computeStartSlotAtEpoch(epoch);
      // Unsubscribe to a committee subnet from subscriptionsCommittee.
      this.unsubscribeSubnets(this.subscriptionsCommittee.getExpired(slot));
    } catch (e) {
      this.logger.error("Error on SyncnetsService.onEpoch", {epoch}, e as Error);
    }
  };

  /** Update ENR */
  private updateMetadata(): void {
    const subnets = ssz.altair.SyncSubnets.defaultValue();
    for (const subnet of this.subscriptionsCommittee.getAll()) {
      subnets.set(subnet, true);
    }

    // Only update metadata if necessary, setting `metadata.[key]` triggers a write to disk
    if (!ssz.altair.SyncSubnets.equals(subnets, this.metadata.syncnets)) {
      this.metadata.syncnets = subnets;
    }
  }

  /** Tigger a gossip subcription only if not already subscribed */
  private subscribeToSubnets(subnets: number[]): void {
    const boundaries = getActiveForkBoundaries(this.config, this.clock.currentEpoch);
    for (const subnet of subnets) {
      if (!this.subscriptionsCommittee.has(subnet)) {
        for (const boundary of boundaries) {
          this.gossip.subscribeTopic({type: GossipType.sync_committee, boundary, subnet});
        }
        this.metrics?.syncnetsService.subscribeSubnets.inc({subnet});
      }
    }
  }

  /** Trigger a gossip un-subscrition only if no-one is still subscribed */
  private unsubscribeSubnets(subnets: number[]): void {
    const boundaries = getActiveForkBoundaries(this.config, this.clock.currentEpoch);
    for (const subnet of subnets) {
      // No need to check if active in subscriptionsCommittee since we only have a single SubnetMap
      if (!this.opts?.subscribeAllSubnets) {
        for (const boundary of boundaries) {
          this.gossip.unsubscribeTopic({type: GossipType.sync_committee, boundary, subnet});
        }
        this.metrics?.syncnetsService.unsubscribeSubnets.inc({subnet});
      }
    }
  }

  private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void {
    metrics.syncnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
  }
}
