import {
  concat as concatOperator,
  distinct,
  filter,
  join as joinOperator,
  map,
  reduce,
  tap,
} from '@tanstack/db-ivm'
import { optimizeQuery } from '../optimizer.js'
import {
  CollectionInputNotFoundError,
  DistinctRequiresSelectError,
  DuplicateAliasInSubqueryError,
  FnSelectWithGroupByError,
  HavingRequiresGroupByError,
  LimitOffsetRequireOrderByError,
  UnsupportedFromTypeError,
} from '../../errors.js'
import { VIRTUAL_PROP_NAMES } from '../../virtual-props.js'
import {
  ConditionalSelect,
  IncludesSubquery,
  PropRef,
  Value as ValClass,
  getWhereExpression,
  isExpressionLike,
} from '../ir.js'
import { ensureIndexForField } from '../../indexes/auto-index.js'
import { inArray } from '../builder/functions.js'
import {
  compileExpression,
  isCaseWhenConditionTrue,
  toBooleanPredicate,
} from './evaluators.js'
import { processJoins } from './joins.js'
import { containsAggregate, processGroupBy } from './group-by.js'
import { getLazyLoadTargets } from './lazy-targets.js'
import { processOrderBy } from './order-by.js'
import { processSelect } from './select.js'
import type { CollectionSubscription } from '../../collection/subscription.js'
import type { OrderByOptimizationInfo } from './order-by.js'
import type {
  BasicExpression,
  CollectionRef,
  IncludesMaterialization,
  QueryIR,
  QueryRef,
  UnionAll,
  UnionFrom,
} from '../ir.js'
import type { LazyCollectionCallbacks } from './joins.js'
import type { Collection } from '../../collection/index.js'
import type {
  KeyedStream,
  NamespacedAndKeyedStream,
  ResultStream,
} from '../../types.js'
import type { QueryCache, QueryMapping, WindowOptions } from './types.js'

export type { WindowOptions } from './types.js'

/** Symbol used to tag parent $selected with routing metadata for includes */
export const INCLUDES_ROUTING = Symbol(`includesRouting`)
export const FN_SELECT_STATE = Symbol(`fnSelectState`)
const SKIP_INCLUDE = Symbol(`skipInclude`)

type ConditionalSelectGuard = {
  condition: BasicExpression
  expected: boolean
}

type SourceInclude = {
  sourceAlias: string
  include: IncludesCompilationResult
}

type ProjectedSourceIncludePath = {
  path: Array<string>
  guards: Array<ConditionalSelectGuard>
}

/**
 * Result of compiling an includes subquery, including the child pipeline
 * and metadata needed to route child results to parent-scoped Collections.
 */
export interface IncludesCompilationResult {
  /** Filtered child pipeline (post inner-join with parent keys) */
  pipeline: ResultStream
  /** Result field name on parent (e.g., "issues") */
  fieldName: string
  /** Path where the included value is written in the parent result */
  resultPath: Array<string>
  /** Parent-side correlation ref (e.g., project.id) */
  correlationField: PropRef
  /** Child-side correlation ref (e.g., issue.projectId) */
  childCorrelationField: PropRef
  /** Whether the child query has an ORDER BY clause */
  hasOrderBy: boolean
  /** Full compilation result for the child query (for nested includes + alias tracking) */
  childCompilationResult: CompilationResult
  /** Parent-side projection refs for parent-referencing filters */
  parentProjection?: Array<PropRef>
  /** How the output layer materializes the child result on the parent row */
  materialization: IncludesMaterialization
  /** Internal field used to unwrap scalar child selects */
  scalarField?: string
}

/**
 * Result of query compilation including both the pipeline and source-specific WHERE clauses
 */
export interface CompilationResult {
  /** The ID of the main collection */
  collectionId: string

  /** The compiled query pipeline (D2 stream) */
  pipeline: ResultStream

  /** Map of source aliases to their WHERE clauses for index optimization */
  sourceWhereClauses: Map<string, BasicExpression<boolean>>

  /**
   * Maps each source alias to its collection ID. Enables per-alias subscriptions for self-joins.
   * Example: `{ employee: 'employees-col-id', manager: 'employees-col-id' }`
   */
  aliasToCollectionId: Record<string, string>

  /**
   * Flattened mapping from outer alias to innermost alias for subqueries.
   * Always provides one-hop lookups, never recursive chains.
   *
   * Example: `{ activeUser: 'user' }` when `.from({ activeUser: subquery })`
   * where the subquery uses `.from({ user: collection })`.
   *
   * For deeply nested subqueries, the mapping goes directly to the innermost alias:
   * `{ author: 'user' }` (not `{ author: 'activeUser' }`), so `aliasRemapping[alias]`
   * always resolves in a single lookup.
   *
   * Used to resolve subscriptions during lazy loading when join aliases differ from
   * the inner aliases where collection subscriptions were created.
   */
  aliasRemapping: Record<string, string>

  /** Child pipelines for includes subqueries */
  includes?: Array<IncludesCompilationResult>
}

/**
 * Compiles a query IR into a D2 pipeline
 * @param rawQuery The query IR to compile
 * @param inputs Mapping of source aliases to input streams (e.g., `{ employee: input1, manager: input2 }`)
 * @param collections Mapping of collection IDs to Collection instances
 * @param subscriptions Mapping of source aliases to CollectionSubscription instances
 * @param callbacks Mapping of source aliases to lazy loading callbacks
 * @param lazySources Set of source aliases that should load data lazily
 * @param optimizableOrderByCollections Map of collection IDs to order-by optimization info
 * @param cache Optional cache for compiled subqueries (used internally for recursion)
 * @param queryMapping Optional mapping from optimized queries to original queries
 * @returns A CompilationResult with the pipeline, source WHERE clauses, and alias metadata
 */
export function compileQuery(
  rawQuery: QueryIR,
  inputs: Record<string, KeyedStream>,
  collections: Record<string, Collection<any, any, any, any, any>>,
  subscriptions: Record<string, CollectionSubscription>,
  callbacks: Record<string, LazyCollectionCallbacks>,
  lazySources: Set<string>,
  optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
  setWindowFn: (windowFn: (options: WindowOptions) => void) => void,
  cache: QueryCache = new WeakMap(),
  queryMapping: QueryMapping = new WeakMap(),
  // For includes: parent key stream to inner-join with this query's FROM
  parentKeyStream?: KeyedStream,
  childCorrelationField?: PropRef,
): CompilationResult {
  // Check if the original raw query has already been compiled
  const cachedResult = cache.get(rawQuery)
  if (cachedResult) {
    return cachedResult
  }

  // Validate the raw query BEFORE optimization to check user's original structure.
  // This must happen before optimization because the optimizer may create internal
  // subqueries (e.g., for predicate pushdown) that reuse aliases, which is fine.
  validateQueryStructure(rawQuery)

  // Optimize the query before compilation
  const { optimizedQuery, sourceWhereClauses } = optimizeQuery(rawQuery)
  // Use a mutable binding so we can shallow-clone select before includes mutation
  let query = optimizedQuery

  // Create mapping from optimized query to original for caching
  queryMapping.set(query, rawQuery)
  mapNestedQueries(query, rawQuery, queryMapping)

  // Create a copy of the inputs map to avoid modifying the original
  const allInputs = { ...inputs }

  // Track alias to collection id relationships discovered during compilation.
  // This includes all user-declared aliases plus inner aliases from subqueries.
  const aliasToCollectionId: Record<string, string> = {}

  // Track alias remapping for subqueries (outer alias → inner alias)
  // e.g., when .join({ activeUser: subquery }) where subquery uses .from({ user: collection })
  // we store: aliasRemapping['activeUser'] = 'user'
  const aliasRemapping: Record<string, string> = {}

  // Create a map of source aliases to input streams.
  // Inputs MUST be keyed by alias (e.g., `{ employee: input1, manager: input2 }`),
  // not by collection ID. This enables per-alias subscriptions where different aliases
  // of the same collection (e.g., self-joins) maintain independent filtered streams.
  const sources: Record<string, KeyedStream> = {}

  // Process the FROM clause to get the source stream.
  const {
    alias: mainSource,
    collectionId: mainCollectionId,
    pipeline: initialPipeline,
    sources: fromSources,
    sourceIncludes,
    directIncludes,
    isUnionFrom,
  } = processFromClause(
    query.from,
    allInputs,
    collections,
    subscriptions,
    callbacks,
    lazySources,
    optimizableOrderByCollections,
    setWindowFn,
    cache,
    queryMapping,
    aliasToCollectionId,
    aliasRemapping,
    sourceWhereClauses,
  )
  Object.assign(sources, fromSources)

  // If this is an includes child query, inner-join the raw input with parent keys.
  // This filters the child collection to only rows matching parents in the result set.
  // The inner join happens BEFORE namespace wrapping / WHERE / SELECT / ORDER BY,
  // so the child pipeline only processes rows that match parents.
  let pipeline: NamespacedAndKeyedStream = initialPipeline
  if (!isUnionFrom && parentKeyStream && childCorrelationField) {
    const mainInput = sources[mainSource]!
    let filteredMainInput = mainInput
    // Re-key child input by correlation field: [correlationValue, [childKey, childRow]]
    const childFieldPath = childCorrelationField.path.slice(1) // remove alias prefix
    const childRekeyed = mainInput.pipe(
      map(([key, row]: [unknown, any]) => {
        const correlationValue = getNestedValue(row, childFieldPath)
        return [correlationValue, [key, row]] as [unknown, [unknown, any]]
      }),
    )

    // Inner join: only children whose correlation key exists in parent keys pass through
    const joined = childRekeyed.pipe(joinOperator(parentKeyStream, `inner`))

    // Extract: [correlationValue, [[childKey, childRow], parentContext]] → [childKey, childRow]
    // Tag the row with __correlationKey for output routing
    // If parentSide is non-null (parent context projected), attach as __parentContext
    filteredMainInput = joined.pipe(
      filter(([_correlationValue, [childSide]]: any) => {
        return childSide != null
      }),
      map(([correlationValue, [childSide, parentSide]]: any) => {
        const [childKey, childRow] = childSide
        const tagged: any = { ...childRow, __correlationKey: correlationValue }
        if (parentSide != null) {
          tagged.__parentContext = parentSide
        }
        const effectiveKey =
          parentSide != null
            ? `${String(childKey)}::${JSON.stringify(parentSide)}`
            : childKey
        return [effectiveKey, tagged]
      }),
    )

    // Update sources so the rest of the pipeline uses the filtered input
    sources[mainSource] = filteredMainInput

    pipeline = wrapInputWithAlias(filteredMainInput, mainSource)
  }

  // Process JOIN clauses if they exist
  if (query.join && query.join.length > 0) {
    pipeline = processJoins(
      pipeline,
      query.join,
      sources,
      mainCollectionId,
      mainSource,
      allInputs,
      cache,
      queryMapping,
      collections,
      subscriptions,
      callbacks,
      lazySources,
      optimizableOrderByCollections,
      setWindowFn,
      rawQuery,
      compileQuery,
      aliasToCollectionId,
      aliasRemapping,
      sourceWhereClauses,
    )
  }

  // Process the WHERE clause if it exists
  if (query.where && query.where.length > 0) {
    // Apply each WHERE condition as a filter (they are ANDed together)
    for (const where of query.where) {
      const whereExpression = getWhereExpression(where)
      const compiledWhere = compileExpression(whereExpression)
      pipeline = pipeline.pipe(
        filter(([_key, namespacedRow]) => {
          return toBooleanPredicate(compiledWhere(namespacedRow))
        }),
      )
    }
  }

  // Process functional WHERE clauses if they exist
  if (query.fnWhere && query.fnWhere.length > 0) {
    for (const fnWhere of query.fnWhere) {
      pipeline = pipeline.pipe(
        filter(([_key, namespacedRow]) => {
          return toBooleanPredicate(fnWhere(namespacedRow))
        }),
      )
    }
  }

  // Extract includes from SELECT, compile child pipelines, and replace with placeholders.
  // This must happen AFTER WHERE (so parent pipeline is filtered) but BEFORE processSelect
  // (so IncludesSubquery nodes are stripped before select compilation).
  const includesResults: Array<IncludesCompilationResult> = !query.select
    ? [...directIncludes]
    : []
  const includesRoutingFns: Array<{
    fieldName: string
    getRouting: (nsRow: any) => {
      correlationKey: unknown
      parentContext: Record<string, any> | null
    }
  }> = []
  for (const { sourceAlias, include } of sourceIncludes) {
    const projectedPaths =
      query.select != null
        ? findProjectedSourceIncludePaths(
            query.select,
            sourceAlias,
            include.resultPath,
          )
        : query.fnSelect
          ? []
          : [
              {
                path: [sourceAlias, ...include.resultPath],
                guards: [],
              },
            ]

    if (projectedPaths.length === 0) {
      continue
    }

    for (const { path: resultPath, guards } of projectedPaths) {
      const fieldName = getUniqueIncludesRoutingKey(
        `${sourceAlias}.${resultPath.join(`.`)}`,
        includesRoutingFns,
      )
      const compiledGuards = guards.map((guard) => ({
        condition: compileExpression(guard.condition),
        expected: guard.expected,
      }))
      includesResults.push({
        ...include,
        fieldName,
        resultPath,
      })

      includesRoutingFns.push({
        fieldName,
        getRouting: (nsRow: any) => {
          if (!matchesConditionalSelectGuards(compiledGuards, nsRow)) {
            return { correlationKey: null, parentContext: null }
          }
          return (
            nsRow[sourceAlias]?.[INCLUDES_ROUTING]?.[include.fieldName] ?? {
              correlationKey: null,
              parentContext: null,
            }
          )
        },
      })
    }
  }
  if (query.select && directIncludes.length > 0) {
    for (const include of directIncludes) {
      const projectedPaths = findProjectedResultIncludePaths(
        query.select,
        include.resultPath,
      )

      for (const { path: resultPath, guards } of projectedPaths) {
        const fieldName = getUniqueIncludesRoutingKey(
          resultPath.join(`.`),
          includesRoutingFns,
        )
        const compiledGuards = guards.map((guard) => ({
          condition: compileExpression(guard.condition),
          expected: guard.expected,
        }))

        includesResults.push({
          ...include,
          fieldName,
          resultPath,
        })

        includesRoutingFns.push({
          fieldName,
          getRouting: (nsRow: any) => {
            if (!matchesConditionalSelectGuards(compiledGuards, nsRow)) {
              return { correlationKey: null, parentContext: null }
            }
            return (
              nsRow[INCLUDES_ROUTING]?.[include.fieldName] ?? {
                correlationKey: null,
                parentContext: null,
              }
            )
          },
        })
      }
    }
  }
  if (query.select) {
    const includesEntries = extractIncludesFromSelect(query.select)
    if (includesEntries.length > 0) {
      query = { ...query, select: { ...query.select } }
    }
    for (const { key, path, subquery, guards } of includesEntries) {
      const fieldName = getUniqueIncludesRoutingKey(key, includesRoutingFns)
      // Branch parent pipeline: map to [correlationValue, parentContext]
      // When parentProjection exists, project referenced parent fields; otherwise null (zero overhead)
      const compiledCorrelation = compileExpression(subquery.correlationField)
      const compiledGuards = guards.map((guard) => ({
        condition: compileExpression(guard.condition),
        expected: guard.expected,
      }))
      let parentKeys: any
      if (subquery.parentProjection && subquery.parentProjection.length > 0) {
        const compiledProjections = subquery.parentProjection.map((ref) => ({
          alias: ref.path[0]!,
          field: ref.path.slice(1),
          compiled: compileExpression(ref),
        }))
        parentKeys = pipeline.pipe(
          map(([_key, nsRow]: any) => {
            if (!matchesConditionalSelectGuards(compiledGuards, nsRow)) {
              return [SKIP_INCLUDE, null] as any
            }
            const parentContext: Record<string, Record<string, any>> = {}
            for (const proj of compiledProjections) {
              if (!parentContext[proj.alias]) {
                parentContext[proj.alias] = {}
              }
              const value = proj.compiled(nsRow)
              // Set nested field in the alias namespace
              let target = parentContext[proj.alias]!
              for (let i = 0; i < proj.field.length - 1; i++) {
                if (!target[proj.field[i]!]) {
                  target[proj.field[i]!] = {}
                }
                target = target[proj.field[i]!]
              }
              target[proj.field[proj.field.length - 1]!] = value
            }
            return [compiledCorrelation(nsRow), parentContext] as any
          }),
        )
      } else {
        parentKeys = pipeline.pipe(
          map(([_key, nsRow]: any) => {
            if (!matchesConditionalSelectGuards(compiledGuards, nsRow)) {
              return [SKIP_INCLUDE, null] as any
            }
            return [compiledCorrelation(nsRow), null] as any
          }),
        )
      }
      parentKeys = parentKeys.pipe(
        filter(([correlationValue]: any) => correlationValue !== SKIP_INCLUDE),
      )

      // Deduplicate: when multiple parents share the same correlation key (and
      // parentContext), clamp multiplicity to 1 so the inner join doesn't
      // produce duplicate child entries that cause incorrect deletions.
      parentKeys = parentKeys.pipe(
        reduce((values: Array<[any, number]>) =>
          values.map(([v, mult]) => [v, mult > 0 ? 1 : 0] as [any, number]),
        ),
      )

      // --- Includes lazy loading (mirrors join lazy loading in joins.ts) ---
      // Resolve the child correlation field to concrete collection targets so
      // subquery and union child sources can load by branch when it is safe.
      const childCorrelationAlias = subquery.childCorrelationField.path[0]!
      const directChildCollection =
        subquery.query.from.type === `collectionRef`
          ? subquery.query.from.collection
          : undefined
      const lazyTargets = getLazyLoadTargets(
        subquery.query,
        subquery.query.from,
        childCorrelationAlias,
        subquery.childCorrelationField,
        directChildCollection,
        aliasRemapping,
      )

      if (lazyTargets.length > 0) {
        // 1. Mark child source as lazy so CollectionSubscriber skips initial full load
        for (const target of lazyTargets) {
          lazySources.add(target.alias)
        }

        // 2. Ensure an index on the correlation field for efficient lookups
        for (const target of lazyTargets) {
          const targetFieldName = target.path[0]
          if (targetFieldName) {
            ensureIndexForField(targetFieldName, target.path, target.collection)
          }
        }

        // 3. Tap parent keys to intercept correlation values and request
        //    matching child rows on-demand via the child's subscription
        parentKeys = parentKeys.pipe(
          tap((data: any) => {
            const joinKeys = [
              ...new Set(
                data
                  .getInner()
                  .map(
                    ([[correlationValue]]: any) => correlationValue as unknown,
                  )
                  .filter((joinKey: unknown) => joinKey != null),
              ),
            ]

            if (joinKeys.length === 0) {
              return
            }

            for (const target of lazyTargets) {
              const lazySourceSubscription = subscriptions[target.alias]

              if (!lazySourceSubscription) {
                continue
              }

              if (lazySourceSubscription.hasLoadedInitialState()) {
                continue
              }

              const lazyJoinRef = new PropRef(target.path)
              lazySourceSubscription.requestSnapshot({
                where: inArray(lazyJoinRef, joinKeys),
              })
            }
          }),
        )
      }

      // If parent filters exist, append them to the child query's WHERE
      const childQuery =
        subquery.parentFilters && subquery.parentFilters.length > 0
          ? {
              ...subquery.query,
              where: [
                ...(subquery.query.where || []),
                ...subquery.parentFilters,
              ],
            }
          : subquery.query

      // Recursively compile child query WITH the parent key stream
      const childResult = compileQuery(
        childQuery,
        allInputs,
        collections,
        subscriptions,
        callbacks,
        lazySources,
        optimizableOrderByCollections,
        setWindowFn,
        cache,
        queryMapping,
        parentKeys,
        subquery.childCorrelationField,
      )

      // Merge child's alias metadata into parent's
      Object.assign(aliasToCollectionId, childResult.aliasToCollectionId)
      Object.assign(aliasRemapping, childResult.aliasRemapping)
      for (const [alias, whereClause] of childResult.sourceWhereClauses) {
        sourceWhereClauses.set(alias, whereClause)
      }

      includesResults.push({
        pipeline: childResult.pipeline,
        fieldName,
        resultPath: path,
        correlationField: subquery.correlationField,
        childCorrelationField: subquery.childCorrelationField,
        hasOrderBy: !!(
          subquery.query.orderBy && subquery.query.orderBy.length > 0
        ),
        childCompilationResult: childResult,
        parentProjection: subquery.parentProjection,
        materialization: subquery.materialization,
        scalarField: subquery.scalarField,
      })

      // Capture routing function for INCLUDES_ROUTING tagging
      if (subquery.parentProjection && subquery.parentProjection.length > 0) {
        const compiledProjs = subquery.parentProjection.map((ref) => ({
          alias: ref.path[0]!,
          field: ref.path.slice(1),
          compiled: compileExpression(ref),
        }))
        const compiledCorr = compiledCorrelation
        const compiledRoutingGuards = compiledGuards
        includesRoutingFns.push({
          fieldName,
          getRouting: (nsRow: any) => {
            if (!matchesConditionalSelectGuards(compiledRoutingGuards, nsRow)) {
              return { correlationKey: null, parentContext: null }
            }
            const parentContext: Record<string, Record<string, any>> = {}
            for (const proj of compiledProjs) {
              if (!parentContext[proj.alias]) {
                parentContext[proj.alias] = {}
              }
              const value = proj.compiled(nsRow)
              let target = parentContext[proj.alias]!
              for (let i = 0; i < proj.field.length - 1; i++) {
                if (!target[proj.field[i]!]) {
                  target[proj.field[i]!] = {}
                }
                target = target[proj.field[i]!]
              }
              target[proj.field[proj.field.length - 1]!] = value
            }
            return { correlationKey: compiledCorr(nsRow), parentContext }
          },
        })
      } else {
        const compiledRoutingGuards = compiledGuards
        includesRoutingFns.push({
          fieldName,
          getRouting: (nsRow: any) => {
            if (!matchesConditionalSelectGuards(compiledRoutingGuards, nsRow)) {
              return { correlationKey: null, parentContext: null }
            }
            return {
              correlationKey: compiledCorrelation(nsRow),
              parentContext: null,
            }
          },
        })
      }

      // Replace includes entry in select with a null placeholder
      query = {
        ...query,
        select: replaceIncludesInSelect(query.select!, path),
      }
    }
  }

  if (
    query.distinct &&
    !query.fnSelect &&
    !query.select &&
    query.from.type !== `unionAll`
  ) {
    throw new DistinctRequiresSelectError()
  }

  if (query.fnSelect && query.groupBy && query.groupBy.length > 0) {
    throw new FnSelectWithGroupByError()
  }

  // Process the SELECT clause early - always create $selected
  // This eliminates duplication and allows for DISTINCT implementation
  if (query.fnSelect) {
    // Handle functional select - apply the function to transform the row
    pipeline = pipeline.pipe(
      map(([key, namespacedRow]) => {
        const selectResults = query.fnSelect!(namespacedRow)
        if (selectResults && typeof selectResults === `object`) {
          const routing = (namespacedRow as any)[INCLUDES_ROUTING]
          if (routing) {
            selectResults[INCLUDES_ROUTING] = routing
          }
          if (directIncludes.length > 0) {
            Object.defineProperty(selectResults, FN_SELECT_STATE, {
              value: {
                sourceRow: namespacedRow,
                fnSelect: query.fnSelect!,
              },
              enumerable: true,
              configurable: true,
            })
          }
        }
        return [
          key,
          {
            ...namespacedRow,
            $selected: selectResults,
          },
        ] as [string, typeof namespacedRow & { $selected: any }]
      }),
    )
  } else if (query.select) {
    pipeline = processSelect(pipeline, query.select, allInputs)
  } else {
    // If no SELECT clause, create $selected with the main table data
    pipeline = pipeline.pipe(
      map(([key, namespacedRow]) => {
        const selectResults =
          !isUnionFrom && !query.join && !query.groupBy
            ? namespacedRow[mainSource]
            : namespacedRow

        return [
          key,
          {
            ...namespacedRow,
            $selected: selectResults,
          },
        ] as [string, typeof namespacedRow & { $selected: any }]
      }),
    )
  }

  // Tag $selected with routing metadata for includes.
  // This lets collection-config-builder extract routing info (correlationKey + parentContext)
  // from parent results without depending on the user's select.
  if (includesRoutingFns.length > 0) {
    pipeline = pipeline.pipe(
      map(([key, namespacedRow]: any) => {
        const routing: Record<
          string,
          { correlationKey: unknown; parentContext: Record<string, any> | null }
        > = {}
        for (const { fieldName, getRouting } of includesRoutingFns) {
          routing[fieldName] = getRouting(namespacedRow)
        }
        namespacedRow.$selected[INCLUDES_ROUTING] = routing
        return [key, namespacedRow]
      }),
    )
  }

  // Process the GROUP BY clause if it exists.
  // When in includes mode (parentKeyStream), pass mainSource so that groupBy
  // preserves __correlationKey for per-parent aggregation.
  const groupByMainSource = parentKeyStream ? mainSource : undefined
  if (query.groupBy && query.groupBy.length > 0) {
    pipeline = processGroupBy(
      pipeline,
      query.groupBy,
      query.having,
      query.select,
      query.fnHaving,
      mainCollectionId,
      groupByMainSource,
    )
  } else if (query.select) {
    // Check if SELECT contains aggregates but no GROUP BY (implicit single-group aggregation)
    const hasAggregates = Object.values(query.select).some(
      (expr) => expr.type === `agg` || containsAggregate(expr),
    )
    if (hasAggregates) {
      // Handle implicit single-group aggregation
      pipeline = processGroupBy(
        pipeline,
        [], // Empty group by means single group
        query.having,
        query.select,
        query.fnHaving,
        mainCollectionId,
        groupByMainSource,
      )
    }
  }

  // Process the HAVING clause if it exists (only applies after GROUP BY)
  if (query.having && (!query.groupBy || query.groupBy.length === 0)) {
    // Check if we have aggregates in SELECT that would trigger implicit grouping
    const hasAggregates = query.select
      ? Object.values(query.select).some((expr) => expr.type === `agg`)
      : false

    if (!hasAggregates) {
      throw new HavingRequiresGroupByError()
    }
  }

  // Process functional HAVING clauses outside of GROUP BY (treat as additional WHERE filters)
  if (
    query.fnHaving &&
    query.fnHaving.length > 0 &&
    (!query.groupBy || query.groupBy.length === 0)
  ) {
    // If there's no GROUP BY but there are fnHaving clauses, apply them as filters
    for (const fnHaving of query.fnHaving) {
      pipeline = pipeline.pipe(
        filter(([_key, namespacedRow]) => {
          return fnHaving(namespacedRow)
        }),
      )
    }
  }

  // Process the DISTINCT clause if it exists
  if (query.distinct) {
    pipeline = pipeline.pipe(distinct(([_key, row]) => row.$selected))
  }

  // Process orderBy parameter if it exists
  if (query.orderBy && query.orderBy.length > 0) {
    // When in includes mode with limit/offset, use grouped ordering so that
    // the limit is applied per parent (per correlation key), not globally.
    const includesGroupKeyFn =
      parentKeyStream &&
      (query.limit !== undefined || query.offset !== undefined)
        ? (_key: unknown, row: unknown) => {
            const correlationKey = (row as any)?.[mainSource]?.__correlationKey
            const parentContext = (row as any)?.__parentContext
            if (parentContext != null) {
              return JSON.stringify([correlationKey, parentContext])
            }
            return correlationKey
          }
        : undefined

    const orderedPipeline = processOrderBy(
      rawQuery,
      pipeline,
      query.orderBy,
      query.select || {},
      collections[mainCollectionId]!,
      optimizableOrderByCollections,
      setWindowFn,
      query.limit,
      query.offset,
      includesGroupKeyFn,
    )

    // Final step: extract the $selected and include orderBy index
    const resultPipeline: ResultStream = orderedPipeline.pipe(
      map(([key, [row, orderByIndex]]) => {
        // Extract the final results from $selected and include orderBy index
        const raw = (row as any).$selected
        const finalResults = attachVirtualPropsToSelected(
          unwrapValue(raw),
          row as Record<string, any>,
        )
        // When in includes mode, embed the correlation key and parentContext
        if (parentKeyStream) {
          const correlationKey = (row as any)[mainSource]?.__correlationKey
          const parentContext = (row as any).__parentContext ?? null
          // Strip internal routing properties that may leak via spread selects
          delete finalResults.__correlationKey
          delete finalResults.__parentContext
          return [
            key,
            [finalResults, orderByIndex, correlationKey, parentContext],
          ] as any
        }
        return [key, [finalResults, orderByIndex]] as [unknown, [any, string]]
      }),
    ) as ResultStream

    const result = resultPipeline
    // Cache the result before returning (use original query as key)
    const compilationResult: CompilationResult = {
      collectionId: mainCollectionId,
      pipeline: result,
      sourceWhereClauses,
      aliasToCollectionId,
      aliasRemapping,
      includes: includesResults.length > 0 ? includesResults : undefined,
    }
    cache.set(rawQuery, compilationResult)

    return compilationResult
  } else if (query.limit !== undefined || query.offset !== undefined) {
    // If there's a limit or offset without orderBy, throw an error
    throw new LimitOffsetRequireOrderByError()
  }

  // Final step: extract the $selected and return tuple format (no orderBy)
  const resultPipeline: ResultStream = pipeline.pipe(
    map(([key, row]) => {
      // Extract the final results from $selected and return [key, [results, undefined]]
      const raw = (row as any).$selected
      const finalResults = attachVirtualPropsToSelected(
        unwrapValue(raw),
        row as Record<string, any>,
      )
      // When in includes mode, embed the correlation key and parentContext
      if (parentKeyStream) {
        const correlationKey = (row as any)[mainSource]?.__correlationKey
        const parentContext = (row as any).__parentContext ?? null
        // Strip internal routing properties that may leak via spread selects
        delete finalResults.__correlationKey
        delete finalResults.__parentContext
        return [
          key,
          [finalResults, undefined, correlationKey, parentContext],
        ] as any
      }
      return [key, [finalResults, undefined]] as [
        unknown,
        [any, string | undefined],
      ]
    }),
  )

  const result = resultPipeline
  // Cache the result before returning (use original query as key)
  const compilationResult: CompilationResult = {
    collectionId: mainCollectionId,
    pipeline: result,
    sourceWhereClauses,
    aliasToCollectionId,
    aliasRemapping,
    includes: includesResults.length > 0 ? includesResults : undefined,
  }
  cache.set(rawQuery, compilationResult)

  return compilationResult
}

/**
 * Collects aliases used for DIRECT collection references (not subqueries).
 * Used to validate that subqueries don't reuse parent query collection aliases.
 * Only direct CollectionRef aliases matter - QueryRef aliases don't cause conflicts.
 */
function collectDirectCollectionAliases(query: QueryIR): Set<string> {
  const aliases = new Set<string>()

  // Collect FROM alias only if it's a direct collection reference
  for (const source of getFromSources(query.from)) {
    if (source.type === `collectionRef`) {
      aliases.add(source.alias)
    }
  }

  // Collect JOIN aliases only for direct collection references
  if (query.join) {
    for (const joinClause of query.join) {
      if (joinClause.from.type === `collectionRef`) {
        aliases.add(joinClause.from.alias)
      }
    }
  }

  return aliases
}

/**
 * Validates the structure of a query and its subqueries.
 * Checks that subqueries don't reuse collection aliases from parent queries.
 * This must be called on the RAW query before optimization.
 */
function validateQueryStructure(
  query: QueryIR,
  parentCollectionAliases: Set<string> = new Set(),
): void {
  // Collect direct collection aliases from this query level
  const currentLevelAliases = collectDirectCollectionAliases(query)

  // Check if any current alias conflicts with parent aliases
  for (const alias of currentLevelAliases) {
    if (parentCollectionAliases.has(alias)) {
      throw new DuplicateAliasInSubqueryError(
        alias,
        Array.from(parentCollectionAliases),
      )
    }
  }

  // Combine parent and current aliases for checking nested subqueries
  const combinedAliases = new Set([
    ...parentCollectionAliases,
    ...currentLevelAliases,
  ])

  // Recursively validate FROM subqueries
  if (query.from.type === `unionAll`) {
    for (const branch of query.from.queries) {
      validateQueryStructure(branch, combinedAliases)
    }
  } else {
    for (const source of getFromSources(query.from)) {
      if (source.type === `queryRef`) {
        validateQueryStructure(source.query, combinedAliases)
      }
    }
  }

  // Recursively validate JOIN subqueries
  if (query.join) {
    for (const joinClause of query.join) {
      if (joinClause.from.type === `queryRef`) {
        validateQueryStructure(joinClause.from.query, combinedAliases)
      }
    }
  }
}

/**
 * Processes the FROM clause, handling direct collection references and subqueries.
 * Populates `aliasToCollectionId` and `aliasRemapping` for per-alias subscription tracking.
 */
function processFromClause(
  from: CollectionRef | QueryRef | UnionFrom | UnionAll,
  allInputs: Record<string, KeyedStream>,
  collections: Record<string, Collection>,
  subscriptions: Record<string, CollectionSubscription>,
  callbacks: Record<string, LazyCollectionCallbacks>,
  lazySources: Set<string>,
  optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
  setWindowFn: (windowFn: (options: WindowOptions) => void) => void,
  cache: QueryCache,
  queryMapping: QueryMapping,
  aliasToCollectionId: Record<string, string>,
  aliasRemapping: Record<string, string>,
  sourceWhereClauses: Map<string, BasicExpression<boolean>>,
): {
  alias: string
  pipeline: NamespacedAndKeyedStream
  collectionId: string
  sources: Record<string, KeyedStream>
  sourceIncludes: Array<SourceInclude>
  directIncludes: Array<IncludesCompilationResult>
  isUnionFrom: boolean
} {
  if (from.type === `unionAll`) {
    return processUnionAll(
      from,
      allInputs,
      collections,
      subscriptions,
      callbacks,
      lazySources,
      optimizableOrderByCollections,
      setWindowFn,
      cache,
      queryMapping,
      aliasToCollectionId,
      aliasRemapping,
      sourceWhereClauses,
    )
  }

  if (from.type !== `unionFrom`) {
    const { alias, input, collectionId, sourceIncludes } = processFrom(
      from,
      allInputs,
      collections,
      subscriptions,
      callbacks,
      lazySources,
      optimizableOrderByCollections,
      setWindowFn,
      cache,
      queryMapping,
      aliasToCollectionId,
      aliasRemapping,
      sourceWhereClauses,
    )

    return {
      alias,
      pipeline: wrapInputWithAlias(input, alias),
      collectionId,
      sources: { [alias]: input },
      sourceIncludes,
      directIncludes: [],
      isUnionFrom: false,
    }
  }

  if (from.sources.length === 0) {
    throw new UnsupportedFromTypeError(`empty unionFrom`)
  }

  const sources: Record<string, KeyedStream> = {}
  const sourceIncludes: Array<SourceInclude> = []
  let pipeline: NamespacedAndKeyedStream | undefined
  let mainAlias = ``
  let mainCollectionId = ``

  for (const source of from.sources) {
    const {
      alias,
      input,
      collectionId,
      sourceIncludes: childSourceIncludes,
    } = processFrom(
      source,
      allInputs,
      collections,
      subscriptions,
      callbacks,
      lazySources,
      optimizableOrderByCollections,
      setWindowFn,
      cache,
      queryMapping,
      aliasToCollectionId,
      aliasRemapping,
      sourceWhereClauses,
    )

    if (!mainAlias) {
      mainAlias = alias
      mainCollectionId = collectionId
    }
    sources[alias] = input
    sourceIncludes.push(...childSourceIncludes)

    const branch = wrapInputWithAlias(input, alias).pipe(
      map(([key, row]) => {
        return [`${alias}:${encodeKeyForUnionBranch(key)}`, row] as [
          string,
          typeof row,
        ]
      }),
    )

    pipeline = pipeline ? pipeline.pipe(concatOperator(branch)) : branch
  }

  return {
    alias: mainAlias,
    pipeline: pipeline!,
    collectionId: mainCollectionId,
    sources,
    sourceIncludes,
    directIncludes: [],
    isUnionFrom: true,
  }
}

function processUnionAll(
  from: UnionAll,
  allInputs: Record<string, KeyedStream>,
  collections: Record<string, Collection>,
  subscriptions: Record<string, CollectionSubscription>,
  callbacks: Record<string, LazyCollectionCallbacks>,
  lazySources: Set<string>,
  optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
  setWindowFn: (windowFn: (options: WindowOptions) => void) => void,
  cache: QueryCache,
  queryMapping: QueryMapping,
  aliasToCollectionId: Record<string, string>,
  aliasRemapping: Record<string, string>,
  sourceWhereClauses: Map<string, BasicExpression<boolean>>,
): {
  alias: string
  pipeline: NamespacedAndKeyedStream
  collectionId: string
  sources: Record<string, KeyedStream>
  sourceIncludes: Array<SourceInclude>
  directIncludes: Array<IncludesCompilationResult>
  isUnionFrom: boolean
} {
  if (from.queries.length === 0) {
    throw new UnsupportedFromTypeError(`empty unionAll`)
  }

  const sources: Record<string, KeyedStream> = {}
  const sourceIncludes: Array<SourceInclude> = []
  const directIncludes: Array<IncludesCompilationResult> = []
  let pipeline: NamespacedAndKeyedStream | undefined
  let mainCollectionId = ``
  const branchAliases = new Set<string>()

  for (let index = 0; index < from.queries.length; index++) {
    const branch = from.queries[index]!
    for (const source of getAllSources(branch)) {
      if (branchAliases.has(source.alias)) {
        throw new Error(
          `Duplicate source alias "${source.alias}" in unionAll query branches. ` +
            `Use distinct aliases in each branch before passing them to unionAll().`,
        )
      }
      branchAliases.add(source.alias)
    }
    const branchResult = compileQuery(
      branch,
      allInputs,
      collections,
      subscriptions,
      callbacks,
      lazySources,
      optimizableOrderByCollections,
      setWindowFn,
      cache,
      queryMapping,
    )

    if (!mainCollectionId) {
      mainCollectionId = branchResult.collectionId
    }
    Object.assign(aliasToCollectionId, branchResult.aliasToCollectionId)
    Object.assign(aliasRemapping, branchResult.aliasRemapping)
    directIncludes.push(...(branchResult.includes ?? []))
    Object.assign(sources, allInputs)
    for (const [alias, where] of branchResult.sourceWhereClauses) {
      sourceWhereClauses.set(alias, where)
    }

    const branchPipeline = branchResult.pipeline.pipe(
      map(([key, [row]]) => {
        return [`${index}:${encodeKeyForUnionBranch(key)}`, row] as [
          string,
          Record<string, any>,
        ]
      }),
    )

    pipeline = pipeline
      ? pipeline.pipe(concatOperator(branchPipeline))
      : branchPipeline
  }

  return {
    alias: ``,
    pipeline: pipeline!,
    collectionId: mainCollectionId,
    sources,
    sourceIncludes,
    directIncludes,
    isUnionFrom: true,
  }
}

function wrapInputWithAlias(
  input: KeyedStream,
  alias: string,
): NamespacedAndKeyedStream {
  return input.pipe(
    map(([key, row]) => {
      // Initialize the record with a nested structure.
      // If __parentContext exists (from parent-referencing includes), merge parent
      // aliases into the namespaced row so WHERE can resolve parent refs.
      const { __parentContext, ...cleanRow } = row as any
      const nsRow: Record<string, any> = { [alias]: cleanRow }
      if (__parentContext) {
        Object.assign(nsRow, __parentContext)
        ;(nsRow as any).__parentContext = __parentContext
      }
      return [key, nsRow] as [unknown, Record<string, typeof row>]
    }),
  )
}

function encodeKeyForUnionBranch(key: unknown): string {
  if (typeof key === `string`) {
    return `string:${key}`
  }
  if (typeof key === `number`) {
    return `number:${String(key)}`
  }
  if (typeof key === `bigint`) {
    return `bigint:${String(key)}`
  }
  return `${typeof key}:${JSON.stringify(key)}`
}

function processFrom(
  from: CollectionRef | QueryRef,
  allInputs: Record<string, KeyedStream>,
  collections: Record<string, Collection>,
  subscriptions: Record<string, CollectionSubscription>,
  callbacks: Record<string, LazyCollectionCallbacks>,
  lazySources: Set<string>,
  optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
  setWindowFn: (windowFn: (options: WindowOptions) => void) => void,
  cache: QueryCache,
  queryMapping: QueryMapping,
  aliasToCollectionId: Record<string, string>,
  aliasRemapping: Record<string, string>,
  sourceWhereClauses: Map<string, BasicExpression<boolean>>,
): {
  alias: string
  input: KeyedStream
  collectionId: string
  sourceIncludes: Array<SourceInclude>
} {
  switch (from.type) {
    case `collectionRef`: {
      const input = allInputs[from.alias]
      if (!input) {
        throw new CollectionInputNotFoundError(
          from.alias,
          from.collection.id,
          Object.keys(allInputs),
        )
      }
      aliasToCollectionId[from.alias] = from.collection.id
      return {
        alias: from.alias,
        input,
        collectionId: from.collection.id,
        sourceIncludes: [],
      }
    }
    case `queryRef`: {
      // Find the original query for caching purposes
      const originalQuery = queryMapping.get(from.query) || from.query

      // Recursively compile the sub-query with cache
      const subQueryResult = compileQuery(
        originalQuery,
        allInputs,
        collections,
        subscriptions,
        callbacks,
        lazySources,
        optimizableOrderByCollections,
        setWindowFn,
        cache,
        queryMapping,
      )

      // Pull up alias mappings from subquery to parent scope.
      // This includes both the innermost alias-to-collection mappings AND
      // any existing remappings from nested subquery levels.
      Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId)
      Object.assign(aliasRemapping, subQueryResult.aliasRemapping)

      // Pull up source WHERE clauses from subquery to parent scope.
      // This enables loadSubset to receive the correct where clauses for subquery collections.
      //
      // IMPORTANT: Skip pull-up for optimizer-created subqueries. These are detected when:
      // 1. The outer alias (from.alias) matches the inner alias (from.query.from.alias)
      // 2. The subquery was found in queryMapping (it's a user-defined subquery, not optimizer-created)
      //
      // For optimizer-created subqueries, the parent already has the sourceWhereClauses
      // extracted from the original raw query, so pulling up would be redundant.
      // More importantly, pulling up for optimizer-created subqueries can cause issues
      // when the optimizer has restructured the query.
      const isUserDefinedSubquery = queryMapping.has(from.query)
      const subqueryFromAlias = getFirstFromAlias(from.query.from)
      const isOptimizerCreated =
        !isUserDefinedSubquery && from.alias === subqueryFromAlias

      if (!isOptimizerCreated) {
        for (const [alias, whereClause] of subQueryResult.sourceWhereClauses) {
          sourceWhereClauses.set(alias, whereClause)
        }
      }

      // Create a FLATTENED remapping from outer alias to innermost alias.
      // For nested subqueries, this ensures one-hop lookups (not recursive chains).
      //
      // Example with 3-level nesting:
      //   Inner:  .from({ user: usersCollection })
      //   Middle: .from({ activeUser: innerSubquery })     → creates: activeUser → user
      //   Outer:  .from({ author: middleSubquery })        → creates: author → user (not author → activeUser)
      //
      // The key insight: We search through the PULLED-UP aliasToCollectionId (which contains
      // the innermost 'user' alias), so we always map directly to the deepest level.
      // This means aliasRemapping[alias] is always a single lookup, never recursive.
      // Needed for subscription resolution during lazy loading.
      const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find(
        (alias) =>
          subQueryResult.aliasToCollectionId[alias] ===
          subQueryResult.collectionId,
      )
      if (innerAlias && innerAlias !== from.alias) {
        aliasRemapping[from.alias] = innerAlias
      }

      // Extract the pipeline from the compilation result
      const subQueryInput = subQueryResult.pipeline

      // Subqueries may return [key, [value, orderByIndex]] (with ORDER BY) or [key, value] (without ORDER BY)
      // We need to extract just the value for use in parent queries
      const extractedInput = subQueryInput.pipe(
        map((data: any) => {
          const [key, [value, _orderByIndex]] = data
          // Unwrap Value expressions that might have leaked through as the entire row
          const unwrapped = unwrapValue(value)
          return [key, unwrapped] as [unknown, any]
        }),
      )

      return {
        alias: from.alias,
        input: extractedInput,
        collectionId: subQueryResult.collectionId,
        sourceIncludes:
          subQueryResult.includes?.map((include) => ({
            sourceAlias: from.alias,
            include,
          })) ?? [],
      }
    }
    default:
      throw new UnsupportedFromTypeError((from as any).type)
  }
}

// Helper to check if a value is a Value expression
function isValue(raw: any): boolean {
  return (
    raw instanceof ValClass ||
    (raw && typeof raw === `object` && `type` in raw && raw.type === `val`)
  )
}

// Helper to unwrap a Value expression or return the value itself
function unwrapValue(value: any): any {
  return isValue(value) ? value.value : value
}

function attachVirtualPropsToSelected(
  selected: any,
  row: Record<string, any>,
): any {
  if (!selected || typeof selected !== `object`) {
    return selected
  }

  let needsMerge = false
  for (const prop of VIRTUAL_PROP_NAMES) {
    if (selected[prop] == null && prop in row) {
      needsMerge = true
      break
    }
  }

  if (!needsMerge) {
    return selected
  }

  for (const prop of VIRTUAL_PROP_NAMES) {
    if (selected[prop] == null && prop in row) {
      selected[prop] = row[prop]
    }
  }

  return selected
}

/**
 * Recursively maps optimized subqueries to their original queries for proper caching.
 * This ensures that when we encounter the same QueryRef object in different contexts,
 * we can find the original query to check the cache.
 */
function mapNestedQueries(
  optimizedQuery: QueryIR,
  originalQuery: QueryIR,
  queryMapping: QueryMapping,
): void {
  mapNestedFromQueries(optimizedQuery.from, originalQuery.from, queryMapping)

  // Map JOIN clauses if they exist
  if (optimizedQuery.join && originalQuery.join) {
    for (
      let i = 0;
      i < optimizedQuery.join.length && i < originalQuery.join.length;
      i++
    ) {
      const optimizedJoin = optimizedQuery.join[i]!
      const originalJoin = originalQuery.join[i]!

      if (
        optimizedJoin.from.type === `queryRef` &&
        originalJoin.from.type === `queryRef`
      ) {
        queryMapping.set(optimizedJoin.from.query, originalJoin.from.query)
        // Recursively map nested queries in joins
        mapNestedQueries(
          optimizedJoin.from.query,
          originalJoin.from.query,
          queryMapping,
        )
      }
    }
  }
}

function getRefFromAlias(
  query: QueryIR,
  alias: string,
): CollectionRef | QueryRef | void {
  for (const source of getFromSources(query.from)) {
    if (source.alias === alias) {
      return source
    }
  }

  for (const join of query.join || []) {
    if (join.from.alias === alias) {
      return join.from
    }
  }
}

function getFromSources(
  from: QueryIR[`from`],
): Array<CollectionRef | QueryRef> {
  if (from.type === `unionFrom`) {
    return from.sources
  }
  if (from.type === `unionAll`) {
    return []
  }
  return [from]
}

function getAllSources(query: QueryIR): Array<CollectionRef | QueryRef> {
  return [
    ...getFromSources(query.from),
    ...(query.join?.map((join) => join.from) ?? []),
  ]
}

function getFirstFromAlias(from: QueryIR[`from`]): string {
  return getFromSources(from)[0]?.alias ?? ``
}

function findProjectedSourceIncludePaths(
  select: Record<string, any>,
  sourceAlias: string,
  sourcePath: Array<string>,
): Array<ProjectedSourceIncludePath> {
  const targetPath = [sourceAlias, ...sourcePath]
  return findProjectedIncludePaths(select, targetPath)
}

function findProjectedResultIncludePaths(
  select: Record<string, any>,
  resultPath: Array<string>,
): Array<ProjectedSourceIncludePath> {
  return findProjectedIncludePaths(select, resultPath)
}

function findProjectedIncludePaths(
  select: Record<string, any>,
  targetPath: Array<string>,
): Array<ProjectedSourceIncludePath> {
  const resultPaths: Array<ProjectedSourceIncludePath> = []

  const visitSelectObject = (
    obj: Record<string, any>,
    prefix: Array<string>,
    guards: Array<ConditionalSelectGuard>,
  ) => {
    for (const [key, value] of Object.entries(obj)) {
      if (key.startsWith(`__SPREAD_SENTINEL__`)) {
        visitSpreadSentinel(key, value, prefix, guards)
        continue
      }
      visitSelectValue(value, [...prefix, key], guards)
    }
  }

  const visitSpreadSentinel = (
    key: string,
    value: any,
    path: Array<string>,
    guards: Array<ConditionalSelectGuard>,
  ) => {
    const rest = key.slice(`__SPREAD_SENTINEL__`.length)
    const splitIndex = rest.lastIndexOf(`__`)
    const pathStr = splitIndex >= 0 ? rest.slice(0, splitIndex) : rest
    const isRefExpr =
      value &&
      typeof value === `object` &&
      `type` in value &&
      value.type === `ref`
    const sourcePath = isRefExpr
      ? (value as PropRef).path
      : pathStr.split(`.`).filter(Boolean)

    if (pathStartsWith(targetPath, sourcePath)) {
      resultPaths.push({
        path: [...path, ...targetPath.slice(sourcePath.length)],
        guards,
      })
    }
  }

  const visitSelectValue = (
    value: any,
    path: Array<string>,
    guards: Array<ConditionalSelectGuard>,
  ) => {
    if (value instanceof PropRef && pathStartsWith(targetPath, value.path)) {
      resultPaths.push({
        path: [...path, ...targetPath.slice(value.path.length)],
        guards,
      })
      return
    }

    if (value instanceof ConditionalSelect) {
      const previousBranchGuards: Array<ConditionalSelectGuard> = []
      for (const branch of value.branches) {
        visitSelectValue(branch.value, path, [
          ...guards,
          ...previousBranchGuards,
          { condition: branch.condition, expected: true },
        ])
        previousBranchGuards.push({
          condition: branch.condition,
          expected: false,
        })
      }
      if (value.defaultValue !== undefined) {
        visitSelectValue(value.defaultValue, path, [
          ...guards,
          ...previousBranchGuards,
        ])
      }
      return
    }

    if (isNestedSelectObject(value)) {
      visitSelectObject(value, path, guards)
    }
  }

  visitSelectObject(select, [], [])
  return resultPaths
}

function pathStartsWith(path: Array<string>, prefix: Array<string>): boolean {
  return (
    prefix.length <= path.length && prefix.every((part, i) => path[i] === part)
  )
}

function mapNestedFromQueries(
  optimizedFrom: QueryIR[`from`],
  originalFrom: QueryIR[`from`],
  queryMapping: QueryMapping,
): void {
  if (optimizedFrom.type === `unionAll` && originalFrom.type === `unionAll`) {
    for (
      let i = 0;
      i < optimizedFrom.queries.length && i < originalFrom.queries.length;
      i++
    ) {
      const optimizedBranch = optimizedFrom.queries[i]!
      const originalBranch = originalFrom.queries[i]!
      queryMapping.set(optimizedBranch, originalBranch)
      mapNestedQueries(optimizedBranch, originalBranch, queryMapping)
    }
    return
  }

  const optimizedSources = getFromSources(optimizedFrom)
  const originalSources = getFromSources(originalFrom)

  for (
    let i = 0;
    i < optimizedSources.length && i < originalSources.length;
    i++
  ) {
    const optimizedSource = optimizedSources[i]!
    const originalSource = originalSources[i]!
    if (
      optimizedSource.type === `queryRef` &&
      originalSource.type === `queryRef`
    ) {
      queryMapping.set(optimizedSource.query, originalSource.query)
      mapNestedQueries(
        optimizedSource.query,
        originalSource.query,
        queryMapping,
      )
    }
  }
}

/**
 * Follows the given reference in a query
 * until its finds the root field the reference points to.
 * @returns The collection, its alias, and the path to the root field in this collection
 */
export function followRef(
  query: QueryIR,
  ref: PropRef<any>,
  collection: Collection,
): { collection: Collection; path: Array<string> } | void {
  if (ref.path.length === 0) {
    return
  }

  if (ref.path.length === 1) {
    // This field should be part of this collection
    const field = ref.path[0]!
    // is it part of the select clause?
    if (query.select) {
      const selectedField = query.select[field]
      if (selectedField && selectedField.type === `ref`) {
        return followRef(query, selectedField, collection)
      }
    }

    // Either this field is not part of the select clause
    // and thus it must be part of the collection itself
    // or it is part of the select but is not a reference
    // so we can stop here and don't have to follow it
    return { collection, path: [field] }
  }

  if (ref.path.length > 1) {
    // This is a nested field
    const [alias, ...rest] = ref.path
    const aliasRef = getRefFromAlias(query, alias!)
    if (!aliasRef) {
      return
    }

    if (aliasRef.type === `queryRef`) {
      return followRef(aliasRef.query, new PropRef(rest), collection)
    } else {
      // This is a reference to a collection
      // we can't follow it further
      // so the field must be on the collection itself
      return { collection: aliasRef.collection, path: rest }
    }
  }
}

/**
 * Walks a Select object to find IncludesSubquery entries.
 * Plain nested objects still reject includes, but ConditionalSelect branches can
 * contain guarded nested includes that are only materialized when the branch
 * condition is true.
 */
function extractIncludesFromSelect(select: Record<string, any>): Array<{
  key: string
  path: Array<string>
  subquery: IncludesSubquery
  guards: Array<ConditionalSelectGuard>
}> {
  const results: Array<{
    key: string
    path: Array<string>
    subquery: IncludesSubquery
    guards: Array<ConditionalSelectGuard>
  }> = []
  for (const [key, value] of Object.entries(select)) {
    if (key.startsWith(`__SPREAD_SENTINEL__`)) continue
    if (value instanceof IncludesSubquery) {
      results.push({
        key: getIncludesRoutingKey([key], results),
        path: [key],
        subquery: value,
        guards: [],
      })
    } else if (value instanceof ConditionalSelect) {
      collectIncludesFromConditionalSelect(value, [key], [], results)
    } else if (isNestedSelectObject(value)) {
      // Check nested objects for IncludesSubquery — not supported yet
      assertNoNestedIncludes(value, key)
    }
  }
  return results
}

function collectIncludesFromConditionalSelect(
  conditional: ConditionalSelect,
  prefixPath: Array<string>,
  guards: Array<ConditionalSelectGuard>,
  results: Array<{
    key: string
    path: Array<string>
    subquery: IncludesSubquery
    guards: Array<ConditionalSelectGuard>
  }>,
): void {
  const previousBranchGuards: Array<ConditionalSelectGuard> = []
  for (const branch of conditional.branches) {
    collectIncludesFromSelectValue(
      branch.value,
      prefixPath,
      [
        ...guards,
        ...previousBranchGuards,
        { condition: branch.condition, expected: true },
      ],
      results,
    )
    previousBranchGuards.push({
      condition: branch.condition,
      expected: false,
    })
  }

  if (conditional.defaultValue !== undefined) {
    collectIncludesFromSelectValue(
      conditional.defaultValue,
      prefixPath,
      [...guards, ...previousBranchGuards],
      results,
    )
  }
}

function collectIncludesFromSelectValue(
  value: any,
  prefixPath: Array<string>,
  guards: Array<ConditionalSelectGuard>,
  results: Array<{
    key: string
    path: Array<string>
    subquery: IncludesSubquery
    guards: Array<ConditionalSelectGuard>
  }>,
): void {
  if (value instanceof IncludesSubquery) {
    const key = getIncludesRoutingKey(prefixPath, results)
    results.push({ key, path: prefixPath, subquery: value, guards })
    return
  }

  if (value instanceof ConditionalSelect) {
    collectIncludesFromConditionalSelect(value, prefixPath, guards, results)
    return
  }

  if (!isNestedSelectObject(value)) {
    return
  }

  for (const [key, child] of Object.entries(value)) {
    if (key.startsWith(`__SPREAD_SENTINEL__`)) continue
    collectIncludesFromSelectValue(child, [...prefixPath, key], guards, results)
  }
}

function getIncludesRoutingKey(
  path: Array<string>,
  entries: Array<{ key: string }>,
): string {
  return getUniqueIncludesRoutingKey(path.join(`.`), entries)
}

function getUniqueIncludesRoutingKey(
  baseKey: string,
  entries: Array<{ key?: string; fieldName?: string }>,
): string {
  const hasKey = (key: string) =>
    entries.some((entry) => (entry.key ?? entry.fieldName) === key)

  if (!hasKey(baseKey)) {
    return baseKey
  }

  let suffix = entries.length
  let key = `${baseKey}#${suffix}`
  while (hasKey(key)) {
    suffix++
    key = `${baseKey}#${suffix}`
  }
  return key
}

/** Check if a value is a nested plain object in a select (not an IR expression node) */
function isNestedSelectObject(value: any): value is Record<string, any> {
  return (
    value != null &&
    typeof value === `object` &&
    !Array.isArray(value) &&
    !isExpressionLike(value)
  )
}

function assertNoNestedIncludes(
  obj: Record<string, any>,
  parentPath: string,
): void {
  for (const [key, value] of Object.entries(obj)) {
    if (key.startsWith(`__SPREAD_SENTINEL__`)) continue
    if (value instanceof IncludesSubquery) {
      throw new Error(
        `Includes subqueries must be at the top level of select(). ` +
          `Found nested includes at "${parentPath}.${key}".`,
      )
    }
    if (isNestedSelectObject(value)) {
      assertNoNestedIncludes(value, `${parentPath}.${key}`)
    }
  }
}

/**
 * Replaces an IncludesSubquery entry in the select object with a null Value placeholder.
 * This ensures processSelect() doesn't encounter it.
 */
function replaceIncludesInSelect(
  select: Record<string, any>,
  path: Array<string>,
): Record<string, any> {
  return replaceIncludesInSelectValue(select, path, new ValClass(null)).value
}

function replaceIncludesInSelectValue(
  value: any,
  path: Array<string>,
  replacement: ValClass,
): { value: any; replaced: boolean } {
  if (path.length === 0) {
    return replaceIncludesValue(value, replacement)
  }

  if (value instanceof ConditionalSelect) {
    return replaceIncludesInConditionalSelect(value, path, replacement)
  }

  if (!isNestedSelectObject(value)) {
    return { value, replaced: false }
  }

  if (path.length === 1) {
    const field = path[0]!
    const result = replaceIncludesValue(value[field], replacement)
    if (!result.replaced) {
      return { value, replaced: false }
    }
    return {
      value: {
        ...value,
        [field]: result.value,
      },
      replaced: true,
    }
  }

  const [head, ...rest] = path
  const result = replaceIncludesInSelectValue(value[head!], rest, replacement)
  if (!result.replaced) {
    return { value, replaced: false }
  }
  return {
    value: {
      ...value,
      [head!]: result.value,
    },
    replaced: true,
  }
}

function replaceIncludesValue(
  value: any,
  replacement: ValClass,
): { value: any; replaced: boolean } {
  if (value instanceof IncludesSubquery) {
    return { value: replacement, replaced: true }
  }

  if (value instanceof ConditionalSelect) {
    return replaceIncludesInConditionalSelect(value, [], replacement)
  }

  return { value, replaced: false }
}

function replaceIncludesInConditionalSelect(
  conditional: ConditionalSelect,
  path: Array<string>,
  replacement: ValClass,
): { value: ConditionalSelect; replaced: boolean } {
  let replaced = false
  const branches = conditional.branches.map((branch) => {
    const result =
      path.length === 0
        ? replaceIncludesValue(branch.value, replacement)
        : replaceIncludesInSelectValue(branch.value, path, replacement)
    if (!result.replaced) {
      return branch
    }
    replaced = true
    return { ...branch, value: result.value }
  })

  let defaultValue = conditional.defaultValue
  if (conditional.defaultValue !== undefined) {
    const result =
      path.length === 0
        ? replaceIncludesValue(conditional.defaultValue, replacement)
        : replaceIncludesInSelectValue(
            conditional.defaultValue,
            path,
            replacement,
          )
    if (result.replaced) {
      replaced = true
      defaultValue = result.value
    }
  }

  if (!replaced) {
    return { value: conditional, replaced: false }
  }

  return {
    value: new ConditionalSelect(branches, defaultValue),
    replaced: true,
  }
}

/**
 * Gets a nested value from an object by path segments.
 * For v1 with single-level correlation fields (e.g., `projectId`), it's just `obj[path[0]]`.
 */
function getNestedValue(obj: any, path: Array<string>): any {
  let value = obj
  for (const segment of path) {
    if (value == null) return value
    value = value[segment]
  }
  return value
}

function matchesConditionalSelectGuards(
  guards: Array<{
    condition: (row: any) => any
    expected: boolean
  }>,
  row: any,
): boolean {
  return guards.every(
    (guard) => isCaseWhenConditionTrue(guard.condition(row)) === guard.expected,
  )
}

export type CompileQueryFn = typeof compileQuery
