import type {Message} from "@libp2p/gossipsub";
import type {RPC} from "@libp2p/gossipsub/message";
import type {DataTransform} from "@libp2p/gossipsub/types";
// snappyjs is better for compression for smaller payloads
import xxhashFactory from "xxhash-wasm";
import {digest} from "@chainsafe/as-sha256";
import snappyWasm from "@chainsafe/snappy-wasm";
import {ForkName} from "@lodestar/params";
import {intToBytes} from "@lodestar/utils";
import {MESSAGE_DOMAIN_VALID_SNAPPY} from "./constants.js";
import {Eth2GossipsubMetrics} from "./metrics.js";
import {GossipTopicCache, getGossipSSZType} from "./topic.js";

// Load WASM
const xxhash = await xxhashFactory();

// Use salt to prevent msgId from being mined for collisions
const h64Seed = BigInt(Math.floor(Math.random() * 1e9));

// create singleton snappy encoder + decoder
const encoder = new snappyWasm.Encoder();
const decoder = new snappyWasm.Decoder();

// Shared buffer to convert msgId to string
const sharedMsgIdBuf = Buffer.alloc(20);

// Cache topic -> seed to avoid per-message allocations on the hot path.
// Topics are a fixed set per fork (changes only at fork boundaries).
const topicSeedCache = new Map<string, bigint>();

/**
 * The function used to generate a gossipsub message id
 * We use the first 8 bytes of SHA256(data) for content addressing
 */
export function fastMsgIdFn(rpcMsg: RPC.Message): string {
  if (rpcMsg.data) {
    if (rpcMsg.topic) {
      // Use topic-derived seed to prevent cross-topic deduplication of identical messages.
      // SyncCommitteeMessages are published to multiple sync_committee_{subnet} topics with
      // identical data, so hashing only the data incorrectly deduplicates across subnets.
      // See https://github.com/ChainSafe/lodestar/issues/8294
      let topicSeed = topicSeedCache.get(rpcMsg.topic);
      if (topicSeed === undefined) {
        topicSeed = xxhash.h64Raw(Buffer.from(rpcMsg.topic), h64Seed);
        topicSeedCache.set(rpcMsg.topic, topicSeed);
      }
      return xxhash.h64Raw(rpcMsg.data, topicSeed).toString(16);
    }
    return xxhash.h64Raw(rpcMsg.data, h64Seed).toString(16);
  }
  return "0000000000000000";
}

export function msgIdToStrFn(msgId: Uint8Array): string {
  // this is the same logic to `toHex(msgId)` with better performance
  sharedMsgIdBuf.set(msgId);
  return `0x${sharedMsgIdBuf.toString("hex")}`;
}

/**
 * Only valid msgId. Messages that fail to snappy_decompress() are not tracked
 */
export function msgIdFn(gossipTopicCache: GossipTopicCache, msg: Message): Uint8Array {
  const topic = gossipTopicCache.getTopic(msg.topic);

  let vec: Uint8Array[];

  if (topic.boundary.fork === ForkName.phase0) {
    // message id for phase0.
    // ```
    // SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20]
    // ```
    vec = [MESSAGE_DOMAIN_VALID_SNAPPY, msg.data];
  } else {
    // message id for altair and subsequent future forks.
    // ```
    // SHA256(
    //   MESSAGE_DOMAIN_VALID_SNAPPY +
    //   uint_to_bytes(uint64(len(message.topic))) +
    //   message.topic +
    //   snappy_decompress(message.data)
    // )[:20]
    // ```
    // https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages
    vec = [MESSAGE_DOMAIN_VALID_SNAPPY, intToBytes(msg.topic.length, 8), Buffer.from(msg.topic), msg.data];
  }

  return digest(Buffer.concat(vec)).subarray(0, 20);
}

export class DataTransformSnappy implements DataTransform {
  constructor(
    private readonly gossipTopicCache: GossipTopicCache,
    private readonly maxSizePerMessage: number,
    private readonly metrics: Eth2GossipsubMetrics | null
  ) {}

  /**
   * Takes the data published by peers on a topic and transforms the data.
   * Should be the reverse of outboundTransform(). Example:
   * - `inboundTransform()`: decompress snappy payload
   * - `outboundTransform()`: compress snappy payload
   */
  inboundTransform(topicStr: string, data: Uint8Array): Uint8Array {
    // check uncompressed data length before we actually decompress
    const uncompressedDataLength = snappyWasm.decompress_len(data);
    if (uncompressedDataLength > this.maxSizePerMessage) {
      throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage}`);
    }

    const topic = this.gossipTopicCache.getTopic(topicStr);
    const sszType = getGossipSSZType(topic);
    this.metrics?.dataTransform.inbound.inc({type: topic.type});

    if (uncompressedDataLength < sszType.minSize) {
      throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} < ${sszType.minSize}`);
    }
    if (uncompressedDataLength > sszType.maxSize) {
      throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize}`);
    }

    // Only after sanity length checks, we can decompress the data
    // Using Buffer.alloc() instead of Buffer.allocUnsafe() to mitigate high GC pressure observed in some environments
    const uncompressedData = Buffer.alloc(uncompressedDataLength);
    decoder.decompress_into(data, uncompressedData);
    return uncompressedData;
  }

  /**
   * Takes the data to be published (a topic and associated data) transforms the data. The
   * transformed data will then be used to create a `RawGossipsubMessage` to be sent to peers.
   */
  outboundTransform(topicStr: string, data: Uint8Array): Uint8Array {
    const topic = this.gossipTopicCache.getTopic(topicStr);
    this.metrics?.dataTransform.outbound.inc({type: topic.type});
    if (data.length > this.maxSizePerMessage) {
      throw Error(`ssz_snappy encoded data length ${data.length} > ${this.maxSizePerMessage}`);
    }

    // Using Buffer.alloc() instead of Buffer.allocUnsafe() to mitigate high GC pressure observed in some environments
    const compressedData = Buffer.alloc(snappyWasm.max_compress_len(data.length));
    const compressedLen = encoder.compress_into(data, compressedData);
    return compressedData.subarray(0, compressedLen);
  }
}
