/**
 * YjsProvider - Yjs provider implementing the Yjs Durable Streams Protocol.
 *
 * This provider uses the DurableStream client to sync Yjs documents:
 * - Single document URL with query parameters
 * - Snapshot discovery via ?offset=snapshot (307 redirects)
 * - Updates via long-polling
 * - Awareness via ?awareness=<name> query parameter
 *
 * Protocol: https://github.com/durable-streams/durable-streams/blob/main/packages/y-durable-streams/PROTOCOL.md
 */

import * as Y from "yjs"
import * as awarenessProtocol from "y-protocols/awareness"
import { ObservableV2 } from "lib0/observable"
import * as decoding from "lib0/decoding"
import * as encoding from "lib0/encoding"
import {
  DurableStream,
  DurableStreamError,
  FetchError,
  IdempotentProducer,
} from "@durable-streams/client"
import type { HeadersRecord } from "@durable-streams/client"

// ---- State Machine ----

/**
 * Primary connection states for the provider.
 */
type ConnectionState = `disconnected` | `connecting` | `connected`

/**
 * Valid state transitions - documents the state machine at a glance.
 * disconnected -> connecting (connect() called)
 * connecting -> connected (initial sync complete)
 * connecting -> disconnected (error or disconnect() called)
 * connected -> disconnected (disconnect() or error)
 */
const VALID_TRANSITIONS: Record<ConnectionState, Array<ConnectionState>> = {
  disconnected: [`connecting`],
  connecting: [`connected`, `disconnected`],
  connected: [`disconnected`],
}

/**
 * Connection context bundles all state for a single connection attempt.
 * Each connect() creates a new context with a unique ID.
 */
interface ConnectionContext {
  /** Unique ID for this connection attempt */
  readonly id: number
  /** Abort signal for this connection */
  readonly controller: AbortController
  /** Starting offset for updates (set after snapshot discovery) */
  startOffset: string
  /** Idempotent producer for sending updates */
  producer: IdempotentProducer | null
}

/**
 * Connection status of the provider.
 */
export type YjsProviderStatus = `disconnected` | `connecting` | `connected`

/**
 * Options for creating a YjsProvider.
 */
export interface YjsProviderOptions {
  /**
   * The Yjs document to synchronize.
   */
  doc: Y.Doc

  /**
   * Base URL of the Yjs server.
   * E.g., "http://localhost:4438/v1/yjs/my-service"
   */
  baseUrl: string

  /**
   * Document path (can include forward slashes).
   * E.g., "my-doc" or "project/chapter-1"
   */
  docId: string

  /**
   * Optional Awareness instance for presence support.
   */
  awareness?: awarenessProtocol.Awareness

  /**
   * Optional HTTP headers for requests.
   */
  headers?: HeadersRecord

  /**
   * Live mode for streaming updates.
   * @default "sse"
   */
  liveMode?: `sse` | `long-poll`

  /**
   * Whether to automatically connect on construction.
   * @default true
   */
  connect?: boolean
}

/**
 * Events emitted by the YjsProvider.
 */
export interface YjsProviderEvents {
  synced: (synced: boolean) => void
  status: (status: YjsProviderStatus) => void
  error: (error: Error) => void
}

/**
 * Internal type for awareness update events.
 */
interface AwarenessUpdate {
  added: Array<number>
  updated: Array<number>
  removed: Array<number>
}

/**
 * Interval for awareness heartbeats (15 seconds).
 */
export const AWARENESS_HEARTBEAT_INTERVAL = 15000

/**
 * YjsProvider for the Yjs Durable Streams Protocol.
 */
export class YjsProvider extends ObservableV2<YjsProviderEvents> {
  readonly doc: Y.Doc
  readonly awareness?: awarenessProtocol.Awareness

  private readonly baseUrl: string
  private readonly docId: string
  private readonly headers: HeadersRecord
  private readonly liveMode: `sse` | `long-poll`

  // ---- State Machine ----
  private _state: ConnectionState = `disconnected`
  private _connectionId = 0
  private _ctx: ConnectionContext | null = null
  private _synced = false

  // ---- Connection-related state ----
  private updatesStreamGeneration = 0
  private updatesSubscription: (() => void) | null = null

  private sendingAwareness = false
  private pendingAwareness: AwarenessUpdate | null = null

  private awarenessHeartbeat: ReturnType<typeof setInterval> | null = null

  constructor(options: YjsProviderOptions) {
    super()
    this.doc = options.doc
    this.awareness = options.awareness
    this.baseUrl = options.baseUrl.replace(/\/$/, ``)
    this.docId = options.docId
    this.headers = options.headers ?? {}
    this.liveMode = options.liveMode ?? `sse`

    this.doc.on(`update`, this.handleDocumentUpdate)

    if (this.awareness) {
      this.awareness.on(`update`, this.handleAwarenessUpdate)
    }

    if (options.connect !== false) {
      this.connect()
    }
  }

  // ---- State getters ----

  get synced(): boolean {
    return this._synced
  }

  private set synced(state: boolean) {
    if (this._synced !== state) {
      this._synced = state
      this.emit(`synced`, [state])
    }
  }

  /** True when connected to the server */
  get connected(): boolean {
    return this._state === `connected`
  }

  /** True when connection is in progress */
  get connecting(): boolean {
    return this._state === `connecting`
  }

  // ---- State Machine Methods ----

  /**
   * Transition to a new connection state.
   * Returns false if the transition is invalid (logs a warning).
   */
  private transition(to: ConnectionState): boolean {
    const allowed = VALID_TRANSITIONS[this._state]
    if (!allowed.includes(to)) {
      console.warn(`[YjsProvider] Invalid transition: ${this._state} -> ${to}`)
      return false
    }

    this._state = to
    // Emit status for all transitions
    this.emit(`status`, [to])
    return true
  }

  /**
   * Create a new connection context with a unique ID.
   */
  private createConnectionContext(): ConnectionContext {
    this._connectionId += 1
    const ctx: ConnectionContext = {
      id: this._connectionId,
      controller: new AbortController(),
      startOffset: `-1`,
      producer: null,
    }
    this._ctx = ctx
    return ctx
  }

  /**
   * Check if a connection context is stale (disconnected or replaced).
   * Use this after every await to detect race conditions.
   */
  private isStale(ctx: ConnectionContext): boolean {
    return this._ctx !== ctx || ctx.controller.signal.aborted
  }

  // ---- Connection management ----

  async connect(): Promise<void> {
    // Only allow connecting from disconnected state
    if (this._state !== `disconnected`) return

    if (!this.transition(`connecting`)) return

    const ctx = this.createConnectionContext()

    try {
      // Step 1: Create document (idempotent — succeeds if already exists)
      await this.ensureDocument(ctx)
      if (this.isStale(ctx)) return

      // Step 2: Discover snapshot and get starting offset
      await this.discoverSnapshot(ctx)
      if (this.isStale(ctx)) return

      // Step 3: Create idempotent producer for sending updates
      this.createUpdatesProducer(ctx)

      // Step 4: Start updates stream (will load snapshot if needed)
      await this.startUpdatesStream(ctx, ctx.startOffset)
      if (this.isStale(ctx)) return

      // Step 5: Start awareness if configured
      if (this.awareness) {
        this.startAwareness(ctx)
      }

      // Note: transition to 'connected' happens in runUpdatesStream.markSynced()
      // so that connected=true before synced=true (tests depend on this ordering)
    } catch (err) {
      const isAborted = err instanceof Error && err.name === `AbortError`
      if (!isAborted && !this.isStale(ctx)) {
        this.emit(`error`, [
          err instanceof Error ? err : new Error(String(err)),
        ])
        this.disconnect()
      }
    }
  }

  async disconnect(): Promise<void> {
    // Guard against concurrent disconnect calls or disconnecting when already disconnected
    const ctx = this._ctx
    if (!ctx || this._state === `disconnected`) return

    // Transition immediately to prevent races
    this.transition(`disconnected`)
    this._ctx = null
    this.synced = false

    if (this.awarenessHeartbeat) {
      clearInterval(this.awarenessHeartbeat)
      this.awarenessHeartbeat = null
    }

    if (this.awareness) {
      this.broadcastAwarenessRemoval()
    }

    this.updatesStreamGeneration += 1
    if (this.updatesSubscription) {
      this.updatesSubscription()
      this.updatesSubscription = null
    }

    // Flush and close producer before aborting
    await this.closeUpdatesProducer(ctx)

    ctx.controller.abort()

    this.pendingAwareness = null
  }

  destroy(): void {
    // Fire-and-forget disconnect - we're destroying anyway
    this.disconnect().catch(() => {})
    this.doc.off(`update`, this.handleDocumentUpdate)
    if (this.awareness) {
      this.awareness.off(`update`, this.handleAwarenessUpdate)
    }
    super.destroy()
  }

  /**
   * Flush any pending updates to the server.
   *
   * @internal This method is primarily for testing to ensure all batched
   * updates have been sent before making assertions. In production, updates
   * are sent automatically via the IdempotentProducer's batching/linger mechanism.
   */
  async flush(): Promise<void> {
    if (this._ctx?.producer) {
      await this._ctx.producer.flush()
    }
  }

  // ---- URL builders ----

  /**
   * Get the document URL.
   */
  private docUrl(): string {
    return `${this.baseUrl}/docs/${this.docId}`
  }

  /**
   * Get the awareness URL for a named stream.
   */
  private awarenessUrl(name: string = `default`): string {
    return `${this.docUrl()}?awareness=${encodeURIComponent(name)}`
  }

  /**
   * Create the document on the server via PUT.
   * Idempotent: succeeds if document already exists with matching config.
   */
  private async ensureDocument(ctx: ConnectionContext): Promise<void> {
    const url = this.docUrl()

    const response = await fetch(url, {
      method: `PUT`,
      headers: {
        ...(this.headers as Record<string, string>),
        "content-type": `application/octet-stream`,
      },
      signal: ctx.controller.signal,
    })

    // 201 Created or 200 OK (already exists) are both fine
    if (response.status === 201 || response.status === 200) {
      await response.arrayBuffer()
      return
    }

    // 409 Conflict means it exists with different config — acceptable
    if (response.status === 409) {
      await response.arrayBuffer()
      return
    }

    // Any other status is an error
    const text = await response.text().catch(() => ``)
    throw new Error(`Failed to create document: ${response.status} ${text}`)
  }

  // ---- Snapshot Discovery ----

  /**
   * Discover the current snapshot state via ?offset=snapshot.
   * Handles 307 redirect to determine starting offset.
   */
  private async discoverSnapshot(ctx: ConnectionContext): Promise<void> {
    const url = `${this.docUrl()}?offset=snapshot`

    const response = await fetch(url, {
      method: `GET`,
      headers: this.headers as Record<string, string>,
      redirect: `manual`, // Don't follow redirects automatically
      signal: ctx.controller.signal,
    })

    if (response.status === 307) {
      // Parse the redirect location
      const location = response.headers.get(`location`)
      if (location) {
        const redirectUrl = new URL(location, url)
        const offset = redirectUrl.searchParams.get(`offset`)
        if (offset) {
          if (offset.endsWith(`_snapshot`)) {
            // Snapshot exists - load it
            await this.loadSnapshot(ctx, offset)
          } else {
            // No snapshot - start from the indicated offset
            ctx.startOffset = offset
          }
          return
        }
      }
    }

    // Fallback: if redirect parsing fails, start from beginning
    ctx.startOffset = `-1`
  }

  /**
   * Load a snapshot from the server.
   */
  private async loadSnapshot(
    ctx: ConnectionContext,
    snapshotOffset: string
  ): Promise<void> {
    const url = `${this.docUrl()}?offset=${encodeURIComponent(snapshotOffset)}`

    try {
      const response = await fetch(url, {
        method: `GET`,
        headers: this.headers as Record<string, string>,
        signal: ctx.controller.signal,
      })

      if (!response.ok) {
        if (response.status === 404) {
          // Snapshot deleted - retry discovery
          await this.discoverSnapshot(ctx)
          return
        }
        throw new Error(`Failed to load snapshot: ${response.status}`)
      }

      // Apply snapshot
      const data = new Uint8Array(await response.arrayBuffer())
      if (data.length > 0) {
        Y.applyUpdate(this.doc, data, `server`)
      }

      // Get the next offset from header
      const nextOffset = response.headers.get(`stream-next-offset`)
      ctx.startOffset = nextOffset ?? `-1`
    } catch (err) {
      if (this.isNotFoundError(err)) {
        // Snapshot deleted - retry discovery
        await this.discoverSnapshot(ctx)
        return
      }
      throw err
    }
  }

  // ---- Updates Producer ----

  private createUpdatesProducer(ctx: ConnectionContext): void {
    const stream = new DurableStream({
      url: this.docUrl(),
      headers: this.headers,
      contentType: `application/octet-stream`,
    })

    // Use doc clientID for unique producer ID per client
    const producerId = `${this.docId}-${this.doc.clientID}`

    ctx.producer = new IdempotentProducer(stream, producerId, {
      autoClaim: true,
      signal: ctx.controller.signal,
      onError: (err) => {
        // Ignore AbortError - this happens during intentional disconnect
        if (err instanceof Error && err.name === `AbortError`) {
          return
        }
        console.error(`[YjsProvider] Producer error:`, err)
        this.emit(`error`, [err])
        // Disconnect and reconnect on producer errors (unless auth error)
        if (!this.isAuthError(err)) {
          this.disconnect()
          this.connect()
        }
      },
    })
  }

  private async closeUpdatesProducer(ctx: ConnectionContext): Promise<void> {
    if (!ctx.producer) return

    try {
      await ctx.producer.close()
    } catch {
      // Ignore errors during close
    }
    ctx.producer = null
  }

  // ---- Live updates streaming ----

  private startUpdatesStream(
    ctx: ConnectionContext,
    offset: string
  ): Promise<void> {
    if (ctx.controller.signal.aborted) {
      return Promise.resolve()
    }

    this.updatesStreamGeneration += 1
    const generation = this.updatesStreamGeneration

    this.updatesSubscription?.()
    this.updatesSubscription = null

    let settled = false
    let resolveInitial: () => void
    let rejectInitial: (error: Error) => void

    const initialPromise = new Promise<void>((resolve, reject) => {
      resolveInitial = () => {
        if (!settled) {
          settled = true
          resolve()
        }
      }
      rejectInitial = (error: Error) => {
        if (!settled) {
          settled = true
          reject(error)
        }
      }
    })

    this.runUpdatesStream(
      ctx,
      offset,
      generation,
      resolveInitial!,
      rejectInitial!
    ).catch((err) => {
      rejectInitial(err instanceof Error ? err : new Error(String(err)))
    })

    return initialPromise
  }

  private async runUpdatesStream(
    ctx: ConnectionContext,
    offset: string,
    generation: number,
    resolveInitialSync: () => void,
    rejectInitialSync: (error: Error) => void
  ): Promise<void> {
    let currentOffset = offset
    let initialSyncPending = true

    const markSynced = (): void => {
      if (!initialSyncPending) return
      initialSyncPending = false
      // Transition to connected BEFORE setting synced, so that when synced event
      // fires, connected is already true (tests depend on this ordering)
      if (this._state === `connecting`) {
        this.transition(`connected`)
      }
      this.synced = true
      resolveInitialSync()
    }

    const isStale = (): boolean =>
      this.isStale(ctx) || this.updatesStreamGeneration !== generation

    while (this.updatesStreamGeneration === generation) {
      if (ctx.controller.signal.aborted) {
        markSynced()
        return
      }

      const stream = new DurableStream({
        url: this.docUrl(),
        headers: this.headers,
        contentType: `application/octet-stream`,
      })

      try {
        const response = await stream.stream({
          offset: currentOffset,
          live: this.liveMode,
          signal: ctx.controller.signal,
        })

        this.updatesSubscription?.()
        // eslint-disable-next-line @typescript-eslint/require-await
        this.updatesSubscription = response.subscribeBytes(async (chunk) => {
          if (isStale()) return

          currentOffset = chunk.offset

          if (chunk.data.length > 0) {
            this.applyUpdates(chunk.data)
          }

          if (initialSyncPending && chunk.upToDate) {
            markSynced()
          } else if (chunk.data.length > 0) {
            this.synced = true
          }
        })

        await response.closed
        markSynced()
        // SSE connection closed (server closes ~60s per protocol) — reconnect
        continue
      } catch (err) {
        if (isStale()) {
          markSynced()
          return
        }

        if (this.isNotFoundError(err)) {
          // Document stream not found — fail (document should be created via PUT)
          // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- markSynced mutates this
          if (initialSyncPending) {
            rejectInitialSync(
              err instanceof Error ? err : new Error(String(err))
            )
            return
          }
          // After initial sync, a 404 means the stream was deleted — disconnect
          this.emit(`error`, [
            err instanceof Error ? err : new Error(String(err)),
          ])
          this.disconnect()
          return
        }

        // Non-404 error during initial sync - fail
        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- markSynced mutates this
        if (initialSyncPending) {
          rejectInitialSync(err instanceof Error ? err : new Error(String(err)))
          return
        }

        await new Promise((resolve) => setTimeout(resolve, 1000))
      } finally {
        if (this.updatesSubscription) {
          this.updatesSubscription()
          this.updatesSubscription = null
        }
      }
    }
  }

  /**
   * Frame data with lib0 length-prefix encoding for transport.
   */
  private static frameUpdate(data: Uint8Array): Uint8Array {
    const encoder = encoding.createEncoder()
    encoding.writeVarUint8Array(encoder, data)
    return encoding.toUint8Array(encoder)
  }

  /**
   * Apply lib0-framed updates from the server.
   */
  private applyUpdates(data: Uint8Array): void {
    if (data.length === 0) return

    const decoder = decoding.createDecoder(data)
    while (decoding.hasContent(decoder)) {
      const update = decoding.readVarUint8Array(decoder)
      Y.applyUpdate(this.doc, update, `server`)
    }
  }

  /**
   * Apply lib0-framed awareness updates from the server.
   */
  private applyAwarenessUpdates(data: Uint8Array): void {
    if (data.length === 0 || !this.awareness) return

    try {
      const decoder = decoding.createDecoder(data)
      while (decoding.hasContent(decoder)) {
        const update = decoding.readVarUint8Array(decoder)
        try {
          awarenessProtocol.applyAwarenessUpdate(
            this.awareness,
            update,
            `server`
          )
        } catch {
          // Ignore invalid awareness updates - they're ephemeral
        }
      }
    } catch {
      // Ignore malformed lib0 frames - awareness is ephemeral
    }
  }

  // ---- Document updates ----

  private handleDocumentUpdate = (
    update: Uint8Array,
    origin: unknown
  ): void => {
    if (origin === `server`) return
    const producer = this._ctx?.producer
    if (!producer || !this.connected) return

    // Mark as unsynced - will become true when our write echoes back
    this.synced = false

    // Frame update with lib0 encoding before appending.
    // This is critical because the IdempotentProducer batches multiple appends
    // by concatenating bytes. Without framing, concatenated raw Yjs updates
    // would be invalid. With framing, each update is length-prefixed so
    // concatenation produces valid lib0-framed data.
    producer.append(YjsProvider.frameUpdate(update))
  }

  // ---- Awareness ----

  private startAwareness(ctx: ConnectionContext): void {
    if (!this.awareness) return
    if (ctx.controller.signal.aborted) return

    this.broadcastAwareness()

    this.awarenessHeartbeat = setInterval(() => {
      this.broadcastAwareness()
    }, AWARENESS_HEARTBEAT_INTERVAL)

    this.subscribeAwareness(ctx)
  }

  private handleAwarenessUpdate = (
    update: AwarenessUpdate,
    origin: unknown
  ): void => {
    if (!this.awareness || origin === `server` || origin === this) return

    const { added, updated, removed } = update
    const changedClients = added.concat(updated).concat(removed)
    if (!changedClients.includes(this.awareness.clientID)) return

    this.pendingAwareness = update
    this.sendAwareness()
  }

  private broadcastAwareness(): void {
    if (!this.awareness) return

    this.pendingAwareness = {
      added: [this.awareness.clientID],
      updated: [],
      removed: [],
    }
    this.sendAwareness()
  }

  private broadcastAwarenessRemoval(): void {
    if (!this.awareness) return

    try {
      this.awareness.setLocalState(null)
      const encoded = awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
        this.awareness.clientID,
      ])

      const stream = new DurableStream({
        url: this.awarenessUrl(),
        headers: this.headers,
        contentType: `application/octet-stream`,
      })

      stream
        .append(YjsProvider.frameUpdate(encoded), {
          contentType: `application/octet-stream`,
        })
        .catch(() => {})
    } catch {
      // Ignore errors during disconnect
    }
  }

  private async sendAwareness(): Promise<void> {
    if (
      !this.awareness ||
      (!this.connected && !this.connecting) ||
      this.sendingAwareness
    )
      return

    this.sendingAwareness = true

    try {
      while (this.pendingAwareness) {
        const update = this.pendingAwareness
        this.pendingAwareness = null

        const { added, updated, removed } = update
        const changedClients = added.concat(updated).concat(removed)

        const encoded = awarenessProtocol.encodeAwarenessUpdate(
          this.awareness,
          changedClients
        )

        const stream = new DurableStream({
          url: this.awarenessUrl(),
          headers: this.headers,
          contentType: `application/octet-stream`,
        })

        await stream.append(YjsProvider.frameUpdate(encoded), {
          contentType: `application/octet-stream`,
        })
      }
    } catch (err) {
      console.error(`[YjsProvider] Failed to send awareness:`, err)
    } finally {
      this.sendingAwareness = false
    }
  }

  private async subscribeAwareness(ctx: ConnectionContext): Promise<void> {
    if (!this.awareness) return
    const signal = ctx.controller.signal
    if (signal.aborted) return

    const stream = new DurableStream({
      url: this.awarenessUrl(),
      headers: this.headers,
      contentType: `application/octet-stream`,
    })

    try {
      const response = await stream.stream({
        offset: `now`,
        live: `sse`,
        signal,
      })
      // Ensure closed promise is handled to avoid unhandled rejections.
      void response.closed.catch(() => {})

      // eslint-disable-next-line @typescript-eslint/require-await
      response.subscribeBytes(async (chunk) => {
        if (signal.aborted) return

        if (chunk.data.length > 0) {
          this.applyAwarenessUpdates(chunk.data)
        }
      })

      await response.closed

      // Stream ended cleanly (EOF) - resubscribe if still connected
      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- signal.aborted can change asynchronously
      if (this.connected && !signal.aborted) {
        await new Promise((r) => setTimeout(r, 250))
        this.subscribeAwareness(ctx)
      }
    } catch (err) {
      // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- signal.aborted can change asynchronously
      if (signal.aborted || (!this.connected && !this.connecting)) return

      if (this.isNotFoundError(err)) {
        // Awareness stream not found — should have been created with document via PUT
        console.error(`[YjsProvider] Awareness stream not found`)
        return // Don't disconnect - awareness is optional
      }

      console.error(`[YjsProvider] Awareness stream error:`, err)
      // Retry after delay for other errors
      await new Promise((resolve) => setTimeout(resolve, 1000))
      if (this.connected) {
        this.subscribeAwareness(ctx)
      }
    }
  }

  // ---- Helpers ----

  private isNotFoundError(err: unknown): boolean {
    return (
      (err instanceof DurableStreamError && err.code === `NOT_FOUND`) ||
      (err instanceof FetchError && err.status === 404)
    )
  }

  private isAuthError(err: unknown): boolean {
    return (
      (err instanceof DurableStreamError &&
        (err.code === `UNAUTHORIZED` || err.code === `FORBIDDEN`)) ||
      (err instanceof FetchError && (err.status === 401 || err.status === 403))
    )
  }
}
