{"version":3,"file":"utils.cjs","sources":["../../../../src/query/live/utils.ts"],"sourcesContent":["import { MultiSet, serializeValue } from '@tanstack/db-ivm'\nimport { UnsupportedRootScalarSelectError } from '../../errors.js'\nimport { normalizeOrderByPaths } from '../compiler/expressions.js'\nimport { buildQuery, getQueryIR } from '../builder/index.js'\nimport { IncludesSubquery } from '../ir.js'\nimport type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm'\nimport type { Collection } from '../../collection/index.js'\nimport type { ChangeMessage } from '../../types.js'\nimport type { InitialQueryBuilder, QueryBuilder } from '../builder/index.js'\nimport type { Context } from '../builder/types.js'\nimport type { OrderBy, QueryIR } from '../ir.js'\nimport type { OrderByOptimizationInfo } from '../compiler/order-by.js'\n\n/**\n * Helper function to extract collections from a compiled query.\n * Traverses the query IR to find all collection references.\n * Maps collections by their ID (not alias) as expected by the compiler.\n */\nexport function extractCollectionsFromQuery(\n  query: any,\n): Record<string, Collection<any, any, any>> {\n  const collections: Record<string, any> = {}\n\n  // Helper function to recursively extract collections from a query or source\n  function extractFromSource(source: any) {\n    if (source.type === `collectionRef`) {\n      collections[source.collection.id] = source.collection\n    } else if (source.type === `queryRef`) {\n      // Recursively extract from subquery\n      extractFromQuery(source.query)\n    }\n  }\n\n  // Helper function to recursively extract collections from a query\n  function extractFromQuery(q: any) {\n    // Extract from FROM clause\n    if (q.from) {\n      extractFromSource(q.from)\n    }\n\n    // Extract from JOIN clauses\n    if (q.join && Array.isArray(q.join)) {\n      for (const joinClause of q.join) {\n        if (joinClause.from) {\n          extractFromSource(joinClause.from)\n        }\n      }\n    }\n\n    // Extract from SELECT (for IncludesSubquery)\n    if (q.select) {\n      extractFromSelect(q.select)\n    }\n  }\n\n  function extractFromSelect(select: any) {\n    for (const [key, value] of Object.entries(select)) {\n      if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) {\n        continue\n      }\n      if (value instanceof IncludesSubquery) {\n        extractFromQuery(value.query)\n      } else if (isNestedSelectObject(value)) {\n        extractFromSelect(value)\n      }\n    }\n  }\n\n  // Start extraction from the root query\n  extractFromQuery(query)\n\n  return collections\n}\n\n/**\n * Helper function to extract the collection that is referenced in the query's FROM clause.\n * The FROM clause may refer directly to a collection or indirectly to a subquery.\n */\nexport function extractCollectionFromSource(\n  query: any,\n): Collection<any, any, any> {\n  const from = query.from\n\n  if (from.type === `collectionRef`) {\n    return from.collection\n  } else if (from.type === `queryRef`) {\n    // Recursively extract from subquery\n    return extractCollectionFromSource(from.query)\n  }\n\n  throw new Error(\n    `Failed to extract collection. Invalid FROM clause: ${JSON.stringify(query)}`,\n  )\n}\n\n/**\n * Extracts all aliases used for each collection across the entire query tree.\n *\n * Traverses the QueryIR recursively to build a map from collection ID to all aliases\n * that reference that collection. This is essential for self-join support, where the\n * same collection may be referenced multiple times with different aliases.\n *\n * For example, given a query like:\n * ```ts\n * q.from({ employee: employeesCollection })\n *   .join({ manager: employeesCollection }, ({ employee, manager }) =>\n *     eq(employee.managerId, manager.id)\n *   )\n * ```\n *\n * This function would return:\n * ```\n * Map { \"employees\" => Set { \"employee\", \"manager\" } }\n * ```\n *\n * @param query - The query IR to extract aliases from\n * @returns A map from collection ID to the set of all aliases referencing that collection\n */\nexport function extractCollectionAliases(\n  query: QueryIR,\n): Map<string, Set<string>> {\n  const aliasesById = new Map<string, Set<string>>()\n\n  function recordAlias(source: any) {\n    if (!source) return\n\n    if (source.type === `collectionRef`) {\n      const { id } = source.collection\n      const existing = aliasesById.get(id)\n      if (existing) {\n        existing.add(source.alias)\n      } else {\n        aliasesById.set(id, new Set([source.alias]))\n      }\n    } else if (source.type === `queryRef`) {\n      traverse(source.query)\n    }\n  }\n\n  function traverseSelect(select: any) {\n    for (const [key, value] of Object.entries(select)) {\n      if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) {\n        continue\n      }\n      if (value instanceof IncludesSubquery) {\n        traverse(value.query)\n      } else if (isNestedSelectObject(value)) {\n        traverseSelect(value)\n      }\n    }\n  }\n\n  function traverse(q?: QueryIR) {\n    if (!q) return\n\n    recordAlias(q.from)\n\n    if (q.join) {\n      for (const joinClause of q.join) {\n        recordAlias(joinClause.from)\n      }\n    }\n\n    if (q.select) {\n      traverseSelect(q.select)\n    }\n  }\n\n  traverse(query)\n\n  return aliasesById\n}\n\n/**\n * Check if a value is a nested select object (plain object, not an expression)\n */\nfunction isNestedSelectObject(obj: any): boolean {\n  if (obj === null || typeof obj !== `object`) return false\n  if (obj instanceof IncludesSubquery) return false\n  // Expression-like objects have a .type property\n  if (`type` in obj && typeof obj.type === `string`) return false\n  // Ref proxies from spread operations\n  if (obj.__refProxy) return false\n  return true\n}\n\n/**\n * Builds a query IR from a config object that contains either a query builder\n * function or a QueryBuilder instance.\n */\nexport function buildQueryFromConfig<TContext extends Context>(config: {\n  query:\n    | ((q: InitialQueryBuilder) => QueryBuilder<TContext>)\n    | QueryBuilder<TContext>\n  requireObjectResult?: boolean\n}): QueryIR {\n  // Build the query using the provided query builder function or instance\n  const query =\n    typeof config.query === `function`\n      ? buildQuery<TContext>(config.query)\n      : getQueryIR(config.query)\n\n  if (\n    config.requireObjectResult &&\n    query.select &&\n    !isNestedSelectObject(query.select)\n  ) {\n    throw new UnsupportedRootScalarSelectError()\n  }\n\n  return query\n}\n\n/**\n * Helper function to send changes to a D2 input stream.\n * Converts ChangeMessages to D2 MultiSet data and sends to the input.\n *\n * @returns The number of multiset entries sent\n */\nexport function sendChangesToInput(\n  input: RootStreamBuilder<unknown>,\n  changes: Iterable<ChangeMessage>,\n  getKey: (item: ChangeMessage[`value`]) => any,\n): number {\n  const multiSetArray: MultiSetArray<unknown> = []\n  for (const change of changes) {\n    const key = getKey(change.value)\n    if (change.type === `insert`) {\n      multiSetArray.push([[key, change.value], 1])\n    } else if (change.type === `update`) {\n      multiSetArray.push([[key, change.previousValue], -1])\n      multiSetArray.push([[key, change.value], 1])\n    } else {\n      // change.type === `delete`\n      multiSetArray.push([[key, change.value], -1])\n    }\n  }\n\n  if (multiSetArray.length !== 0) {\n    input.sendData(new MultiSet(multiSetArray))\n  }\n\n  return multiSetArray.length\n}\n\n/** Splits updates into a delete of the old value and an insert of the new value */\nexport function* splitUpdates<\n  T extends object = Record<string, unknown>,\n  TKey extends string | number = string | number,\n>(\n  changes: Iterable<ChangeMessage<T, TKey>>,\n): Generator<ChangeMessage<T, TKey>> {\n  for (const change of changes) {\n    if (change.type === `update`) {\n      yield { type: `delete`, key: change.key, value: change.previousValue! }\n      yield { type: `insert`, key: change.key, value: change.value }\n    } else {\n      yield change\n    }\n  }\n}\n\n/**\n * Filter changes to prevent duplicate inserts to a D2 pipeline.\n * Maintains D2 multiplicity at 1 for visible items so that deletes\n * properly reduce multiplicity to 0.\n *\n * Mutates `sentKeys` in place: adds keys on insert, removes on delete.\n */\nexport function filterDuplicateInserts(\n  changes: Array<ChangeMessage<any, string | number>>,\n  sentKeys: Set<string | number>,\n): Array<ChangeMessage<any, string | number>> {\n  const filtered: Array<ChangeMessage<any, string | number>> = []\n  for (const change of changes) {\n    if (change.type === `insert`) {\n      if (sentKeys.has(change.key)) {\n        continue // Skip duplicate\n      }\n      sentKeys.add(change.key)\n    } else if (change.type === `delete`) {\n      sentKeys.delete(change.key)\n    }\n    filtered.push(change)\n  }\n  return filtered\n}\n\n/**\n * Track the biggest value seen in a stream of changes, used for cursor-based\n * pagination in ordered subscriptions. Returns whether the load request key\n * should be reset (allowing another load).\n *\n * @param changes   - changes to process (deletes are skipped)\n * @param current   - the current biggest value (or undefined if none)\n * @param sentKeys  - set of keys already sent to D2 (for new-key detection)\n * @param comparator - orderBy comparator\n * @returns `{ biggest, shouldResetLoadKey }` — the new biggest value and\n *          whether the caller should clear its last-load-request-key\n */\nexport function trackBiggestSentValue(\n  changes: Array<ChangeMessage<any, string | number>>,\n  current: unknown | undefined,\n  sentKeys: Set<string | number>,\n  comparator: (a: any, b: any) => number,\n): { biggest: unknown; shouldResetLoadKey: boolean } {\n  let biggest = current\n  let shouldResetLoadKey = false\n\n  for (const change of changes) {\n    if (change.type === `delete`) continue\n\n    const isNewKey = !sentKeys.has(change.key)\n\n    if (biggest === undefined) {\n      biggest = change.value\n      shouldResetLoadKey = true\n    } else if (comparator(biggest, change.value) < 0) {\n      biggest = change.value\n      shouldResetLoadKey = true\n    } else if (isNewKey) {\n      // New key at same sort position — allow another load if needed\n      shouldResetLoadKey = true\n    }\n  }\n\n  return { biggest, shouldResetLoadKey }\n}\n\n/**\n * Compute orderBy/limit subscription hints for an alias.\n * Returns normalised orderBy and effective limit suitable for passing to\n * `subscribeChanges`, or `undefined` values when the query's orderBy cannot\n * be scoped to the given alias (e.g. cross-collection refs or aggregates).\n */\nexport function computeSubscriptionOrderByHints(\n  query: { orderBy?: OrderBy; limit?: number; offset?: number },\n  alias: string,\n): { orderBy: OrderBy | undefined; limit: number | undefined } {\n  const { orderBy, limit, offset } = query\n  const effectiveLimit =\n    limit !== undefined && offset !== undefined ? limit + offset : limit\n\n  const normalizedOrderBy = orderBy\n    ? normalizeOrderByPaths(orderBy, alias)\n    : undefined\n\n  // Only pass orderBy when it is scoped to this alias and uses simple refs,\n  // to avoid leaking cross-collection paths into backend-specific compilers.\n  const canPassOrderBy =\n    normalizedOrderBy?.every((clause) => {\n      const exp = clause.expression\n      if (exp.type !== `ref`) return false\n      const path = exp.path\n      return Array.isArray(path) && path.length === 1\n    }) ?? false\n\n  return {\n    orderBy: canPassOrderBy ? normalizedOrderBy : undefined,\n    limit: canPassOrderBy ? effectiveLimit : undefined,\n  }\n}\n\n/**\n * Compute the cursor for loading the next batch of ordered data.\n * Extracts values from the biggest sent row and builds the `minValues`\n * array and a deduplication key.\n *\n * @returns `undefined` if the load should be skipped (duplicate request),\n *          otherwise `{ minValues, normalizedOrderBy, loadRequestKey }`.\n */\nexport function computeOrderedLoadCursor(\n  orderByInfo: Pick<\n    OrderByOptimizationInfo,\n    'orderBy' | 'valueExtractorForRawRow' | 'offset'\n  >,\n  biggestSentRow: unknown | undefined,\n  lastLoadRequestKey: string | undefined,\n  alias: string,\n  limit: number,\n):\n  | {\n      minValues: Array<unknown> | undefined\n      normalizedOrderBy: OrderBy\n      loadRequestKey: string\n    }\n  | undefined {\n  const { orderBy, valueExtractorForRawRow, offset } = orderByInfo\n\n  // Extract all orderBy column values from the biggest sent row\n  // For single-column: returns single value, for multi-column: returns array\n  const extractedValues = biggestSentRow\n    ? valueExtractorForRawRow(biggestSentRow as Record<string, unknown>)\n    : undefined\n\n  // Normalize to array format for minValues\n  let minValues: Array<unknown> | undefined\n  if (extractedValues !== undefined) {\n    minValues = Array.isArray(extractedValues)\n      ? extractedValues\n      : [extractedValues]\n  }\n\n  // Deduplicate: skip if we already issued an identical load request\n  const loadRequestKey = serializeValue({\n    minValues: minValues ?? null,\n    offset,\n    limit,\n  })\n  if (lastLoadRequestKey === loadRequestKey) {\n    return undefined\n  }\n\n  const normalizedOrderBy = normalizeOrderByPaths(orderBy, alias)\n\n  return { minValues, normalizedOrderBy, loadRequestKey }\n}\n"],"names":["IncludesSubquery","buildQuery","getQueryIR","UnsupportedRootScalarSelectError","MultiSet","normalizeOrderByPaths","serializeValue"],"mappings":";;;;;;;AAkBO,SAAS,4BACd,OAC2C;AAC3C,QAAM,cAAmC,CAAA;AAGzC,WAAS,kBAAkB,QAAa;AACtC,QAAI,OAAO,SAAS,iBAAiB;AACnC,kBAAY,OAAO,WAAW,EAAE,IAAI,OAAO;AAAA,IAC7C,WAAW,OAAO,SAAS,YAAY;AAErC,uBAAiB,OAAO,KAAK;AAAA,IAC/B;AAAA,EACF;AAGA,WAAS,iBAAiB,GAAQ;AAEhC,QAAI,EAAE,MAAM;AACV,wBAAkB,EAAE,IAAI;AAAA,IAC1B;AAGA,QAAI,EAAE,QAAQ,MAAM,QAAQ,EAAE,IAAI,GAAG;AACnC,iBAAW,cAAc,EAAE,MAAM;AAC/B,YAAI,WAAW,MAAM;AACnB,4BAAkB,WAAW,IAAI;AAAA,QACnC;AAAA,MACF;AAAA,IACF;AAGA,QAAI,EAAE,QAAQ;AACZ,wBAAkB,EAAE,MAAM;AAAA,IAC5B;AAAA,EACF;AAEA,WAAS,kBAAkB,QAAa;AACtC,eAAW,CAAC,KAAK,KAAK,KAAK,OAAO,QAAQ,MAAM,GAAG;AACjD,UAAI,OAAO,QAAQ,YAAY,IAAI,WAAW,qBAAqB,GAAG;AACpE;AAAA,MACF;AACA,UAAI,iBAAiBA,GAAAA,kBAAkB;AACrC,yBAAiB,MAAM,KAAK;AAAA,MAC9B,WAAW,qBAAqB,KAAK,GAAG;AACtC,0BAAkB,KAAK;AAAA,MACzB;AAAA,IACF;AAAA,EACF;AAGA,mBAAiB,KAAK;AAEtB,SAAO;AACT;AAMO,SAAS,4BACd,OAC2B;AAC3B,QAAM,OAAO,MAAM;AAEnB,MAAI,KAAK,SAAS,iBAAiB;AACjC,WAAO,KAAK;AAAA,EACd,WAAW,KAAK,SAAS,YAAY;AAEnC,WAAO,4BAA4B,KAAK,KAAK;AAAA,EAC/C;AAEA,QAAM,IAAI;AAAA,IACR,sDAAsD,KAAK,UAAU,KAAK,CAAC;AAAA,EAAA;AAE/E;AAyBO,SAAS,yBACd,OAC0B;AAC1B,QAAM,kCAAkB,IAAA;AAExB,WAAS,YAAY,QAAa;AAChC,QAAI,CAAC,OAAQ;AAEb,QAAI,OAAO,SAAS,iBAAiB;AACnC,YAAM,EAAE,OAAO,OAAO;AACtB,YAAM,WAAW,YAAY,IAAI,EAAE;AACnC,UAAI,UAAU;AACZ,iBAAS,IAAI,OAAO,KAAK;AAAA,MAC3B,OAAO;AACL,oBAAY,IAAI,IAAI,oBAAI,IAAI,CAAC,OAAO,KAAK,CAAC,CAAC;AAAA,MAC7C;AAAA,IACF,WAAW,OAAO,SAAS,YAAY;AACrC,eAAS,OAAO,KAAK;AAAA,IACvB;AAAA,EACF;AAEA,WAAS,eAAe,QAAa;AACnC,eAAW,CAAC,KAAK,KAAK,KAAK,OAAO,QAAQ,MAAM,GAAG;AACjD,UAAI,OAAO,QAAQ,YAAY,IAAI,WAAW,qBAAqB,GAAG;AACpE;AAAA,MACF;AACA,UAAI,iBAAiBA,GAAAA,kBAAkB;AACrC,iBAAS,MAAM,KAAK;AAAA,MACtB,WAAW,qBAAqB,KAAK,GAAG;AACtC,uBAAe,KAAK;AAAA,MACtB;AAAA,IACF;AAAA,EACF;AAEA,WAAS,SAAS,GAAa;AAC7B,QAAI,CAAC,EAAG;AAER,gBAAY,EAAE,IAAI;AAElB,QAAI,EAAE,MAAM;AACV,iBAAW,cAAc,EAAE,MAAM;AAC/B,oBAAY,WAAW,IAAI;AAAA,MAC7B;AAAA,IACF;AAEA,QAAI,EAAE,QAAQ;AACZ,qBAAe,EAAE,MAAM;AAAA,IACzB;AAAA,EACF;AAEA,WAAS,KAAK;AAEd,SAAO;AACT;AAKA,SAAS,qBAAqB,KAAmB;AAC/C,MAAI,QAAQ,QAAQ,OAAO,QAAQ,SAAU,QAAO;AACpD,MAAI,eAAeA,GAAAA,iBAAkB,QAAO;AAE5C,MAAI,UAAU,OAAO,OAAO,IAAI,SAAS,SAAU,QAAO;AAE1D,MAAI,IAAI,WAAY,QAAO;AAC3B,SAAO;AACT;AAMO,SAAS,qBAA+C,QAKnD;AAEV,QAAM,QACJ,OAAO,OAAO,UAAU,aACpBC,iBAAqB,OAAO,KAAK,IACjCC,iBAAW,OAAO,KAAK;AAE7B,MACE,OAAO,uBACP,MAAM,UACN,CAAC,qBAAqB,MAAM,MAAM,GAClC;AACA,UAAM,IAAIC,OAAAA,iCAAA;AAAA,EACZ;AAEA,SAAO;AACT;AAQO,SAAS,mBACd,OACA,SACA,QACQ;AACR,QAAM,gBAAwC,CAAA;AAC9C,aAAW,UAAU,SAAS;AAC5B,UAAM,MAAM,OAAO,OAAO,KAAK;AAC/B,QAAI,OAAO,SAAS,UAAU;AAC5B,oBAAc,KAAK,CAAC,CAAC,KAAK,OAAO,KAAK,GAAG,CAAC,CAAC;AAAA,IAC7C,WAAW,OAAO,SAAS,UAAU;AACnC,oBAAc,KAAK,CAAC,CAAC,KAAK,OAAO,aAAa,GAAG,EAAE,CAAC;AACpD,oBAAc,KAAK,CAAC,CAAC,KAAK,OAAO,KAAK,GAAG,CAAC,CAAC;AAAA,IAC7C,OAAO;AAEL,oBAAc,KAAK,CAAC,CAAC,KAAK,OAAO,KAAK,GAAG,EAAE,CAAC;AAAA,IAC9C;AAAA,EACF;AAEA,MAAI,cAAc,WAAW,GAAG;AAC9B,UAAM,SAAS,IAAIC,MAAAA,SAAS,aAAa,CAAC;AAAA,EAC5C;AAEA,SAAO,cAAc;AACvB;AAGO,UAAU,aAIf,SACmC;AACnC,aAAW,UAAU,SAAS;AAC5B,QAAI,OAAO,SAAS,UAAU;AAC5B,YAAM,EAAE,MAAM,UAAU,KAAK,OAAO,KAAK,OAAO,OAAO,cAAA;AACvD,YAAM,EAAE,MAAM,UAAU,KAAK,OAAO,KAAK,OAAO,OAAO,MAAA;AAAA,IACzD,OAAO;AACL,YAAM;AAAA,IACR;AAAA,EACF;AACF;AASO,SAAS,uBACd,SACA,UAC4C;AAC5C,QAAM,WAAuD,CAAA;AAC7D,aAAW,UAAU,SAAS;AAC5B,QAAI,OAAO,SAAS,UAAU;AAC5B,UAAI,SAAS,IAAI,OAAO,GAAG,GAAG;AAC5B;AAAA,MACF;AACA,eAAS,IAAI,OAAO,GAAG;AAAA,IACzB,WAAW,OAAO,SAAS,UAAU;AACnC,eAAS,OAAO,OAAO,GAAG;AAAA,IAC5B;AACA,aAAS,KAAK,MAAM;AAAA,EACtB;AACA,SAAO;AACT;AAcO,SAAS,sBACd,SACA,SACA,UACA,YACmD;AACnD,MAAI,UAAU;AACd,MAAI,qBAAqB;AAEzB,aAAW,UAAU,SAAS;AAC5B,QAAI,OAAO,SAAS,SAAU;AAE9B,UAAM,WAAW,CAAC,SAAS,IAAI,OAAO,GAAG;AAEzC,QAAI,YAAY,QAAW;AACzB,gBAAU,OAAO;AACjB,2BAAqB;AAAA,IACvB,WAAW,WAAW,SAAS,OAAO,KAAK,IAAI,GAAG;AAChD,gBAAU,OAAO;AACjB,2BAAqB;AAAA,IACvB,WAAW,UAAU;AAEnB,2BAAqB;AAAA,IACvB;AAAA,EACF;AAEA,SAAO,EAAE,SAAS,mBAAA;AACpB;AAQO,SAAS,gCACd,OACA,OAC6D;AAC7D,QAAM,EAAE,SAAS,OAAO,OAAA,IAAW;AACnC,QAAM,iBACJ,UAAU,UAAa,WAAW,SAAY,QAAQ,SAAS;AAEjE,QAAM,oBAAoB,UACtBC,YAAAA,sBAAsB,SAAS,KAAK,IACpC;AAIJ,QAAM,iBACJ,mBAAmB,MAAM,CAAC,WAAW;AACnC,UAAM,MAAM,OAAO;AACnB,QAAI,IAAI,SAAS,MAAO,QAAO;AAC/B,UAAM,OAAO,IAAI;AACjB,WAAO,MAAM,QAAQ,IAAI,KAAK,KAAK,WAAW;AAAA,EAChD,CAAC,KAAK;AAER,SAAO;AAAA,IACL,SAAS,iBAAiB,oBAAoB;AAAA,IAC9C,OAAO,iBAAiB,iBAAiB;AAAA,EAAA;AAE7C;AAUO,SAAS,yBACd,aAIA,gBACA,oBACA,OACA,OAOY;AACZ,QAAM,EAAE,SAAS,yBAAyB,OAAA,IAAW;AAIrD,QAAM,kBAAkB,iBACpB,wBAAwB,cAAyC,IACjE;AAGJ,MAAI;AACJ,MAAI,oBAAoB,QAAW;AACjC,gBAAY,MAAM,QAAQ,eAAe,IACrC,kBACA,CAAC,eAAe;AAAA,EACtB;AAGA,QAAM,iBAAiBC,MAAAA,eAAe;AAAA,IACpC,WAAW,aAAa;AAAA,IACxB;AAAA,IACA;AAAA,EAAA,CACD;AACD,MAAI,uBAAuB,gBAAgB;AACzC,WAAO;AAAA,EACT;AAEA,QAAM,oBAAoBD,YAAAA,sBAAsB,SAAS,KAAK;AAE9D,SAAO,EAAE,WAAW,mBAAmB,eAAA;AACzC;;;;;;;;;;;"}