{"version":3,"file":"subscription.cjs","sources":["../../../src/collection/subscription.ts"],"sourcesContent":["import { ensureIndexForExpression } from '../indexes/auto-index.js'\nimport { and, eq, gte, lt } from '../query/builder/functions.js'\nimport { PropRef, Value } from '../query/ir.js'\nimport { EventEmitter } from '../event-emitter.js'\nimport { compileExpression } from '../query/compiler/evaluators.js'\nimport { buildCursor } from '../utils/cursor.js'\nimport {\n  createFilterFunctionFromExpression,\n  createFilteredCallback,\n} from './change-events.js'\nimport type { BasicExpression, OrderBy } from '../query/ir.js'\nimport type { IndexInterface } from '../indexes/base-index.js'\nimport type {\n  ChangeMessage,\n  LoadSubsetOptions,\n  Subscription,\n  SubscriptionEvents,\n  SubscriptionStatus,\n  SubscriptionUnsubscribedEvent,\n} from '../types.js'\nimport type { CollectionImpl } from './index.js'\n\ntype RequestSnapshotOptions = {\n  where?: BasicExpression<boolean>\n  optimizedOnly?: boolean\n  trackLoadSubsetPromise?: boolean\n  /** Optional orderBy to pass to loadSubset for backend optimization */\n  orderBy?: OrderBy\n  /** Optional limit to pass to loadSubset for backend optimization */\n  limit?: number\n  /** Callback that receives the raw loadSubset result for external tracking */\n  onLoadSubsetResult?: (result: Promise<void> | true) => void\n}\n\ntype RequestLimitedSnapshotOptions = {\n  orderBy: OrderBy\n  limit: number\n  /** All column values for cursor (first value used for local index, all values for sync layer) */\n  minValues?: Array<unknown>\n  /** Row offset for offset-based pagination (passed to sync layer) */\n  offset?: number\n  /** Whether to track the loadSubset promise on this subscription (default: true) */\n  trackLoadSubsetPromise?: boolean\n  /** Callback that receives the raw loadSubset result for external tracking */\n  onLoadSubsetResult?: (result: Promise<void> | true) => void\n}\n\ntype CollectionSubscriptionOptions = {\n  includeInitialState?: boolean\n  /** Pre-compiled expression for filtering changes */\n  whereExpression?: BasicExpression<boolean>\n  /** Callback to call when the subscription is unsubscribed */\n  onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void\n}\n\nexport class CollectionSubscription\n  extends EventEmitter<SubscriptionEvents>\n  implements Subscription\n{\n  private loadedInitialState = false\n\n  // Flag to skip filtering in filterAndFlipChanges.\n  // This is separate from loadedInitialState because we want to allow\n  // requestSnapshot to still work even when filtering is skipped.\n  private skipFiltering = false\n\n  // Flag to indicate that we have sent at least 1 snapshot.\n  // While `snapshotSent` is false we filter out all changes from subscription to the collection.\n  private snapshotSent = false\n\n  /**\n   * Track all loadSubset calls made by this subscription so we can unload them on cleanup.\n   * We store the exact LoadSubsetOptions we passed to loadSubset to ensure symmetric unload.\n   */\n  private loadedSubsets: Array<LoadSubsetOptions> = []\n\n  // Keep track of the keys we've sent (needed for join and orderBy optimizations)\n  private sentKeys = new Set<string | number>()\n\n  // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination\n  private limitedSnapshotRowCount = 0\n\n  // Track the last key sent via requestLimitedSnapshot for cursor-based pagination\n  private lastSentKey: string | number | undefined\n\n  private filteredCallback: (changes: Array<ChangeMessage<any, any>>) => void\n\n  private orderByIndex: IndexInterface<string | number> | undefined\n\n  // Status tracking\n  private _status: SubscriptionStatus = `ready`\n  private pendingLoadSubsetPromises: Set<Promise<void>> = new Set()\n\n  // Cleanup function for truncate event listener\n  private truncateCleanup: (() => void) | undefined\n\n  // Truncate buffering state\n  // When a truncate occurs, we buffer changes until all loadSubset refetches complete\n  // This prevents a flash of missing content between deletes and new inserts\n  private isBufferingForTruncate = false\n  private truncateBuffer: Array<Array<ChangeMessage<any, any>>> = []\n  private pendingTruncateRefetches: Set<Promise<void>> = new Set()\n\n  public get status(): SubscriptionStatus {\n    return this._status\n  }\n\n  constructor(\n    private collection: CollectionImpl<any, any, any, any, any>,\n    private callback: (changes: Array<ChangeMessage<any, any>>) => void,\n    private options: CollectionSubscriptionOptions,\n  ) {\n    super()\n    if (options.onUnsubscribe) {\n      this.on(`unsubscribed`, (event) => options.onUnsubscribe!(event))\n    }\n\n    // Auto-index for where expressions if enabled\n    if (options.whereExpression) {\n      ensureIndexForExpression(options.whereExpression, this.collection)\n    }\n\n    const callbackWithSentKeysTracking = (\n      changes: Array<ChangeMessage<any, any>>,\n    ) => {\n      callback(changes)\n      this.trackSentKeys(changes)\n    }\n\n    this.callback = callbackWithSentKeysTracking\n\n    // Create a filtered callback if where clause is provided\n    this.filteredCallback = options.whereExpression\n      ? createFilteredCallback(this.callback, options)\n      : this.callback\n\n    // Listen for truncate events to re-request data after must-refetch\n    // When a truncate happens (e.g., from a 409 must-refetch), all collection data is cleared.\n    // We need to re-request all previously loaded subsets to repopulate the data.\n    this.truncateCleanup = this.collection.on(`truncate`, () => {\n      this.handleTruncate()\n    })\n  }\n\n  /**\n   * Handle collection truncate event by resetting state and re-requesting subsets.\n   * This is called when the sync layer receives a must-refetch and clears all data.\n   *\n   * To prevent a flash of missing content, we buffer all changes (deletes from truncate\n   * and inserts from refetch) until all loadSubset promises resolve, then emit them together.\n   */\n  private handleTruncate() {\n    // Copy the loaded subsets before clearing (we'll re-request them)\n    const subsetsToReload = [...this.loadedSubsets]\n\n    // Only buffer if there's an actual loadSubset handler that can do async work.\n    // Without a loadSubset handler, there's nothing to re-request and no reason to buffer.\n    // This prevents unnecessary buffering in eager sync mode or when loadSubset isn't implemented.\n    const hasLoadSubsetHandler = this.collection._sync.syncLoadSubsetFn !== null\n\n    // If there are no subsets to reload OR no loadSubset handler, just reset state\n    if (subsetsToReload.length === 0 || !hasLoadSubsetHandler) {\n      this.snapshotSent = false\n      this.loadedInitialState = false\n      this.limitedSnapshotRowCount = 0\n      this.lastSentKey = undefined\n      this.loadedSubsets = []\n      return\n    }\n\n    // Start buffering BEFORE we receive the delete events from the truncate commit\n    // This ensures we capture both the deletes and subsequent inserts\n    this.isBufferingForTruncate = true\n    this.truncateBuffer = []\n    this.pendingTruncateRefetches.clear()\n\n    // Reset snapshot/pagination tracking state\n    // Note: We don't need to populate sentKeys here because filterAndFlipChanges\n    // will skip the delete filter when isBufferingForTruncate is true\n    this.snapshotSent = false\n    this.loadedInitialState = false\n    this.limitedSnapshotRowCount = 0\n    this.lastSentKey = undefined\n\n    // Clear the loadedSubsets array since we're re-requesting fresh\n    this.loadedSubsets = []\n\n    // Defer the loadSubset calls to a microtask so the truncate commit's delete events\n    // are buffered BEFORE the loadSubset calls potentially trigger nested commits.\n    // This ensures correct event ordering: deletes first, then inserts.\n    queueMicrotask(() => {\n      // Check if we were unsubscribed while waiting\n      if (!this.isBufferingForTruncate) {\n        return\n      }\n\n      // Re-request all previously loaded subsets and track their promises\n      for (const options of subsetsToReload) {\n        const syncResult = this.collection._sync.loadSubset(options)\n\n        // Track this loadSubset call so we can unload it later\n        this.loadedSubsets.push(options)\n        this.trackLoadSubsetPromise(syncResult)\n\n        // Track the promise for buffer flushing\n        if (syncResult instanceof Promise) {\n          this.pendingTruncateRefetches.add(syncResult)\n          syncResult\n            .catch(() => {\n              // Ignore errors - we still want to flush the buffer even if some requests fail\n            })\n            .finally(() => {\n              this.pendingTruncateRefetches.delete(syncResult)\n              this.checkTruncateRefetchComplete()\n            })\n        }\n      }\n\n      // If all loadSubset calls were synchronous (returned true), flush now\n      // At this point, delete events have already been buffered from the truncate commit\n      if (this.pendingTruncateRefetches.size === 0) {\n        this.flushTruncateBuffer()\n      }\n    })\n  }\n\n  /**\n   * Check if all truncate refetch promises have completed and flush buffer if so\n   */\n  private checkTruncateRefetchComplete() {\n    if (\n      this.pendingTruncateRefetches.size === 0 &&\n      this.isBufferingForTruncate\n    ) {\n      this.flushTruncateBuffer()\n    }\n  }\n\n  /**\n   * Flush the truncate buffer, emitting all buffered changes to the callback\n   */\n  private flushTruncateBuffer() {\n    this.isBufferingForTruncate = false\n\n    // Flatten all buffered changes into a single array for atomic emission\n    // This ensures consumers see all truncate changes (deletes + inserts) in one callback\n    const merged = this.truncateBuffer.flat()\n    if (merged.length > 0) {\n      this.filteredCallback(merged)\n    }\n\n    this.truncateBuffer = []\n  }\n\n  setOrderByIndex(index: IndexInterface<any>) {\n    this.orderByIndex = index\n  }\n\n  /**\n   * Check if an orderBy index has been set for this subscription\n   */\n  hasOrderByIndex(): boolean {\n    return this.orderByIndex !== undefined\n  }\n\n  /**\n   * Set subscription status and emit events if changed\n   */\n  private setStatus(newStatus: SubscriptionStatus) {\n    if (this._status === newStatus) {\n      return // No change\n    }\n\n    const previousStatus = this._status\n    this._status = newStatus\n\n    // Emit status:change event\n    this.emitInner(`status:change`, {\n      type: `status:change`,\n      subscription: this,\n      previousStatus,\n      status: newStatus,\n    })\n\n    // Emit specific status event\n    const eventKey: `status:${SubscriptionStatus}` = `status:${newStatus}`\n    this.emitInner(eventKey, {\n      type: eventKey,\n      subscription: this,\n      previousStatus,\n      status: newStatus,\n    } as SubscriptionEvents[typeof eventKey])\n  }\n\n  /**\n   * Track a loadSubset promise and manage loading status\n   */\n  private trackLoadSubsetPromise(syncResult: Promise<void> | true) {\n    // Track the promise if it's actually a promise (async work)\n    if (syncResult instanceof Promise) {\n      this.pendingLoadSubsetPromises.add(syncResult)\n      this.setStatus(`loadingSubset`)\n\n      syncResult.finally(() => {\n        this.pendingLoadSubsetPromises.delete(syncResult)\n        if (this.pendingLoadSubsetPromises.size === 0) {\n          this.setStatus(`ready`)\n        }\n      })\n    }\n  }\n\n  hasLoadedInitialState() {\n    return this.loadedInitialState\n  }\n\n  hasSentAtLeastOneSnapshot() {\n    return this.snapshotSent\n  }\n\n  emitEvents(changes: Array<ChangeMessage<any, any>>) {\n    const newChanges = this.filterAndFlipChanges(changes)\n\n    if (this.isBufferingForTruncate) {\n      // Buffer the changes instead of emitting immediately\n      // This prevents a flash of missing content during truncate/refetch\n      if (newChanges.length > 0) {\n        this.truncateBuffer.push(newChanges)\n      }\n    } else {\n      this.filteredCallback(newChanges)\n    }\n  }\n\n  /**\n   * Sends the snapshot to the callback.\n   * Returns a boolean indicating if it succeeded.\n   * It can only fail if there is no index to fulfill the request\n   * and the optimizedOnly option is set to true,\n   * or, the entire state was already loaded.\n   */\n  requestSnapshot(opts?: RequestSnapshotOptions): boolean {\n    if (this.loadedInitialState) {\n      // Subscription was deoptimized so we already sent the entire initial state\n      return false\n    }\n\n    const stateOpts: RequestSnapshotOptions = {\n      where: this.options.whereExpression,\n      optimizedOnly: opts?.optimizedOnly ?? false,\n    }\n\n    if (opts) {\n      if (`where` in opts) {\n        const snapshotWhereExp = opts.where\n        if (stateOpts.where) {\n          // Combine the two where expressions\n          const subWhereExp = stateOpts.where\n          const combinedWhereExp = and(subWhereExp, snapshotWhereExp)\n          stateOpts.where = combinedWhereExp\n        } else {\n          stateOpts.where = snapshotWhereExp\n        }\n      }\n    } else {\n      // No options provided so it's loading the entire initial state\n      this.loadedInitialState = true\n    }\n\n    // Request the sync layer to load more data\n    // don't await it, we will load the data into the collection when it comes in\n    const loadOptions: LoadSubsetOptions = {\n      where: stateOpts.where,\n      subscription: this,\n      // Include orderBy and limit if provided so sync layer can optimize the query\n      orderBy: opts?.orderBy,\n      limit: opts?.limit,\n    }\n    const syncResult = this.collection._sync.loadSubset(loadOptions)\n\n    // Pass the raw loadSubset result to the caller for external tracking\n    opts?.onLoadSubsetResult?.(syncResult)\n\n    // Track this loadSubset call so we can unload it later\n    this.loadedSubsets.push(loadOptions)\n\n    const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true\n    if (trackLoadSubsetPromise) {\n      this.trackLoadSubsetPromise(syncResult)\n    }\n\n    // Also load data immediately from the collection\n    const snapshot = this.collection.currentStateAsChanges(stateOpts)\n\n    if (snapshot === undefined) {\n      // Couldn't load from indexes\n      return false\n    }\n\n    // Only send changes that have not been sent yet\n    const filteredSnapshot = snapshot.filter(\n      (change) => !this.sentKeys.has(change.key),\n    )\n\n    // Add keys to sentKeys BEFORE calling callback to prevent race condition.\n    // If a change event arrives while the callback is executing, it will see\n    // the keys already in sentKeys and filter out duplicates correctly.\n    for (const change of filteredSnapshot) {\n      this.sentKeys.add(change.key)\n    }\n\n    this.snapshotSent = true\n    this.callback(filteredSnapshot)\n    return true\n  }\n\n  /**\n   * Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to the cursor.\n   * Requires a range index to be set with `setOrderByIndex` prior to calling this method.\n   * It uses that range index to load the items in the order of the index.\n   *\n   * For multi-column orderBy:\n   * - Uses first value from `minValues` for LOCAL index operations (wide bounds, ensures no missed rows)\n   * - Uses all `minValues` to build a precise composite cursor for SYNC layer loadSubset\n   *\n   * Note 1: it may load more rows than the provided LIMIT because it loads all values equal to the first cursor value + limit values greater.\n   *         This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values.\n   * Note 2: it does not send keys that have already been sent before.\n   */\n  requestLimitedSnapshot({\n    orderBy,\n    limit,\n    minValues,\n    offset,\n    trackLoadSubsetPromise: shouldTrackLoadSubsetPromise = true,\n    onLoadSubsetResult,\n  }: RequestLimitedSnapshotOptions) {\n    if (!limit) throw new Error(`limit is required`)\n\n    if (!this.orderByIndex) {\n      throw new Error(\n        `Ordered snapshot was requested but no index was found. You have to call setOrderByIndex before requesting an ordered snapshot.`,\n      )\n    }\n\n    // Check if minValues has a first element (regardless of its value)\n    // This distinguishes between \"no min value provided\" vs \"min value is undefined\"\n    const hasMinValue = minValues !== undefined && minValues.length > 0\n    // Derive first column value from minValues (used for local index operations)\n    const minValue = minValues?.[0]\n    // Cast for index operations (index expects string | number)\n    const minValueForIndex = minValue as string | number | undefined\n\n    const index = this.orderByIndex\n    const where = this.options.whereExpression\n    const whereFilterFn = where\n      ? createFilterFunctionFromExpression(where)\n      : undefined\n\n    const filterFn = (key: string | number | undefined): boolean => {\n      if (key !== undefined && this.sentKeys.has(key)) {\n        return false\n      }\n\n      const value = this.collection.get(key)\n      if (value === undefined) {\n        return false\n      }\n\n      return whereFilterFn?.(value) ?? true\n    }\n\n    let biggestObservedValue = minValueForIndex\n    const changes: Array<ChangeMessage<any, string | number>> = []\n\n    // If we have a minValue we need to handle the case\n    // where there might be duplicate values equal to minValue that we need to include\n    // because we can have data like this: [1, 2, 3, 3, 3, 4, 5]\n    // so if minValue is 3 then the previous snapshot may not have included all 3s\n    // e.g. if it was offset 0 and limit 3 it would only have loaded the first 3\n    //      so we load all rows equal to minValue first, to be sure we don't skip any duplicate values\n    //\n    // For multi-column orderBy, we use the first column value for index operations (wide bounds)\n    // This may load some duplicates but ensures we never miss any rows.\n    let keys: Array<string | number> = []\n    if (hasMinValue) {\n      // First, get all items with the same FIRST COLUMN value as minValue\n      // This provides wide bounds for the local index\n      const { expression } = orderBy[0]!\n      const allRowsWithMinValue = this.collection.currentStateAsChanges({\n        where: eq(expression, new Value(minValueForIndex)),\n      })\n\n      if (allRowsWithMinValue) {\n        const keysWithMinValue = allRowsWithMinValue\n          .map((change) => change.key)\n          .filter((key) => !this.sentKeys.has(key) && filterFn(key))\n\n        // Add items with the minValue first\n        keys.push(...keysWithMinValue)\n\n        // Then get items greater than minValue\n        const keysGreaterThanMin = index.take(\n          limit - keys.length,\n          minValueForIndex!,\n          filterFn,\n        )\n        keys.push(...keysGreaterThanMin)\n      } else {\n        keys = index.take(limit, minValueForIndex!, filterFn)\n      }\n    } else {\n      // No min value provided, start from the beginning\n      keys = index.takeFromStart(limit, filterFn)\n    }\n\n    const valuesNeeded = () => Math.max(limit - changes.length, 0)\n    const collectionExhausted = () => keys.length === 0\n\n    // Create a value extractor for the orderBy field to properly track the biggest indexed value\n    const orderByExpression = orderBy[0]!.expression\n    const valueExtractor =\n      orderByExpression.type === `ref`\n        ? compileExpression(new PropRef(orderByExpression.path), true)\n        : null\n\n    while (valuesNeeded() > 0 && !collectionExhausted()) {\n      const insertedKeys = new Set<string | number>() // Track keys we add to `changes` in this iteration\n\n      for (const key of keys) {\n        const value = this.collection.get(key)!\n        changes.push({\n          type: `insert`,\n          key,\n          value,\n        })\n        // Extract the indexed value (e.g., salary) from the row, not the full row\n        // This is needed for index.take() to work correctly with the BTree comparator\n        biggestObservedValue = valueExtractor ? valueExtractor(value) : value\n        insertedKeys.add(key) // Track this key\n      }\n\n      keys = index.take(valuesNeeded(), biggestObservedValue!, filterFn)\n    }\n\n    // Track row count for offset-based pagination (before sending to callback)\n    // Use the current count as the offset for this load\n    const currentOffset = this.limitedSnapshotRowCount\n\n    // Add keys to sentKeys BEFORE calling callback to prevent race condition.\n    // If a change event arrives while the callback is executing, it will see\n    // the keys already in sentKeys and filter out duplicates correctly.\n    for (const change of changes) {\n      this.sentKeys.add(change.key)\n    }\n\n    this.callback(changes)\n\n    // Update the row count and last key after sending (for next call's offset/cursor)\n    this.limitedSnapshotRowCount += changes.length\n    if (changes.length > 0) {\n      this.lastSentKey = changes[changes.length - 1]!.key\n    }\n\n    // Build cursor expressions for sync layer loadSubset\n    // The cursor expressions are separate from the main where clause\n    // so the sync layer can choose cursor-based or offset-based pagination\n    let cursorExpressions:\n      | {\n          whereFrom: BasicExpression<boolean>\n          whereCurrent: BasicExpression<boolean>\n          lastKey?: string | number\n        }\n      | undefined\n\n    if (minValues !== undefined && minValues.length > 0) {\n      const whereFromCursor = buildCursor(orderBy, minValues)\n\n      if (whereFromCursor) {\n        const { expression } = orderBy[0]!\n        const cursorMinValue = minValues[0]\n\n        // Build the whereCurrent expression for the first orderBy column\n        // For Date values, we need to handle precision differences between JS (ms) and backends (μs)\n        // A JS Date represents a 1ms range, so we query for all values within that range\n        let whereCurrentCursor: BasicExpression<boolean>\n        if (cursorMinValue instanceof Date) {\n          const cursorMinValuePlus1ms = new Date(cursorMinValue.getTime() + 1)\n          whereCurrentCursor = and(\n            gte(expression, new Value(cursorMinValue)),\n            lt(expression, new Value(cursorMinValuePlus1ms)),\n          )\n        } else {\n          whereCurrentCursor = eq(expression, new Value(cursorMinValue))\n        }\n\n        cursorExpressions = {\n          whereFrom: whereFromCursor,\n          whereCurrent: whereCurrentCursor,\n          lastKey: this.lastSentKey,\n        }\n      }\n    }\n\n    // Request the sync layer to load more data\n    // don't await it, we will load the data into the collection when it comes in\n    // Note: `where` does NOT include cursor expressions - they are passed separately\n    // The sync layer can choose to use cursor-based or offset-based pagination\n    const loadOptions: LoadSubsetOptions = {\n      where, // Main filter only, no cursor\n      limit,\n      orderBy,\n      cursor: cursorExpressions, // Cursor expressions passed separately\n      offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset\n      subscription: this,\n    }\n    const syncResult = this.collection._sync.loadSubset(loadOptions)\n\n    // Pass the raw loadSubset result to the caller for external tracking\n    onLoadSubsetResult?.(syncResult)\n\n    // Track this loadSubset call\n    this.loadedSubsets.push(loadOptions)\n    if (shouldTrackLoadSubsetPromise) {\n      this.trackLoadSubsetPromise(syncResult)\n    }\n  }\n\n  // TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function\n  //       and that that also works properly (i.e. does not skip duplicate values)\n\n  /**\n   * Filters and flips changes for keys that have not been sent yet.\n   * Deletes are filtered out for keys that have not been sent yet.\n   * Updates are flipped into inserts for keys that have not been sent yet.\n   * Duplicate inserts are filtered out to prevent D2 multiplicity > 1.\n   */\n  private filterAndFlipChanges(changes: Array<ChangeMessage<any, any>>) {\n    if (this.loadedInitialState || this.skipFiltering) {\n      // We loaded the entire initial state or filtering is explicitly skipped\n      // so no need to filter or flip changes\n      return changes\n    }\n\n    // When buffering for truncate, we need all changes (including deletes) to pass through.\n    // This is important because:\n    // 1. If loadedInitialState was previously true, sentKeys will be empty\n    //    (trackSentKeys early-returns when loadedInitialState is true)\n    // 2. The truncate deletes are for keys that WERE sent to the subscriber\n    // 3. We're collecting all changes atomically, so filtering doesn't make sense\n    const skipDeleteFilter = this.isBufferingForTruncate\n\n    const newChanges = []\n    for (const change of changes) {\n      let newChange = change\n      const keyInSentKeys = this.sentKeys.has(change.key)\n\n      if (!keyInSentKeys) {\n        if (change.type === `update`) {\n          newChange = { ...change, type: `insert`, previousValue: undefined }\n        } else if (change.type === `delete`) {\n          // Filter out deletes for keys that have not been sent,\n          // UNLESS we're buffering for truncate (where all deletes should pass through)\n          if (!skipDeleteFilter) {\n            continue\n          }\n        }\n        this.sentKeys.add(change.key)\n      } else {\n        // Key was already sent - handle based on change type\n        if (change.type === `insert`) {\n          // Filter out duplicate inserts - the key was already inserted.\n          // This prevents D2 multiplicity from going above 1, which would\n          // cause deletes to not properly remove items (multiplicity would\n          // go from 2 to 1 instead of 1 to 0).\n          continue\n        } else if (change.type === `delete`) {\n          // Remove from sentKeys so future inserts for this key are allowed\n          // (e.g., after truncate + reinsert)\n          this.sentKeys.delete(change.key)\n        }\n      }\n      newChanges.push(newChange)\n    }\n    return newChanges\n  }\n\n  private trackSentKeys(changes: Array<ChangeMessage<any, string | number>>) {\n    if (this.loadedInitialState || this.skipFiltering) {\n      // No need to track sent keys if we loaded the entire state or filtering is skipped.\n      // Since filtering won't be applied, all keys are effectively \"observed\".\n      return\n    }\n\n    for (const change of changes) {\n      if (change.type === `delete`) {\n        this.sentKeys.delete(change.key)\n      } else {\n        this.sentKeys.add(change.key)\n      }\n    }\n\n    // Keep the limited snapshot offset in sync with keys we've actually sent.\n    // This matters when loadSubset resolves asynchronously and requestLimitedSnapshot\n    // didn't have local rows to count yet.\n    if (this.orderByIndex) {\n      this.limitedSnapshotRowCount = Math.max(\n        this.limitedSnapshotRowCount,\n        this.sentKeys.size,\n      )\n    }\n  }\n\n  /**\n   * Mark that the subscription should not filter any changes.\n   * This is used when includeInitialState is explicitly set to false,\n   * meaning the caller doesn't want initial state but does want ALL future changes.\n   */\n  markAllStateAsSeen() {\n    this.skipFiltering = true\n  }\n\n  unsubscribe() {\n    // Clean up truncate event listener\n    this.truncateCleanup?.()\n    this.truncateCleanup = undefined\n\n    // Clean up truncate buffer state\n    this.isBufferingForTruncate = false\n    this.truncateBuffer = []\n    this.pendingTruncateRefetches.clear()\n\n    // Unload all subsets that this subscription loaded\n    // We pass the exact same LoadSubsetOptions we used for loadSubset\n    for (const options of this.loadedSubsets) {\n      this.collection._sync.unloadSubset(options)\n    }\n    this.loadedSubsets = []\n\n    this.emitInner(`unsubscribed`, {\n      type: `unsubscribed`,\n      subscription: this,\n    })\n    // Clear all event listeners to prevent memory leaks\n    this.clearListeners()\n  }\n}\n"],"names":["EventEmitter","ensureIndexForExpression","createFilteredCallback","and","createFilterFunctionFromExpression","eq","Value","compileExpression","PropRef","buildCursor","gte","lt"],"mappings":";;;;;;;;;AAuDO,MAAM,+BACHA,aAAAA,aAEV;AAAA,EAiDE,YACU,YACA,UACA,SACR;AACA,UAAA;AAJQ,SAAA,aAAA;AACA,SAAA,WAAA;AACA,SAAA,UAAA;AAnDV,SAAQ,qBAAqB;AAK7B,SAAQ,gBAAgB;AAIxB,SAAQ,eAAe;AAMvB,SAAQ,gBAA0C,CAAA;AAGlD,SAAQ,+BAAe,IAAA;AAGvB,SAAQ,0BAA0B;AAUlC,SAAQ,UAA8B;AACtC,SAAQ,gDAAoD,IAAA;AAQ5D,SAAQ,yBAAyB;AACjC,SAAQ,iBAAwD,CAAA;AAChE,SAAQ,+CAAmD,IAAA;AAYzD,QAAI,QAAQ,eAAe;AACzB,WAAK,GAAG,gBAAgB,CAAC,UAAU,QAAQ,cAAe,KAAK,CAAC;AAAA,IAClE;AAGA,QAAI,QAAQ,iBAAiB;AAC3BC,gBAAAA,yBAAyB,QAAQ,iBAAiB,KAAK,UAAU;AAAA,IACnE;AAEA,UAAM,+BAA+B,CACnC,YACG;AACH,eAAS,OAAO;AAChB,WAAK,cAAc,OAAO;AAAA,IAC5B;AAEA,SAAK,WAAW;AAGhB,SAAK,mBAAmB,QAAQ,kBAC5BC,aAAAA,uBAAuB,KAAK,UAAU,OAAO,IAC7C,KAAK;AAKT,SAAK,kBAAkB,KAAK,WAAW,GAAG,YAAY,MAAM;AAC1D,WAAK,eAAA;AAAA,IACP,CAAC;AAAA,EACH;AAAA,EAvCA,IAAW,SAA6B;AACtC,WAAO,KAAK;AAAA,EACd;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EA8CQ,iBAAiB;AAEvB,UAAM,kBAAkB,CAAC,GAAG,KAAK,aAAa;AAK9C,UAAM,uBAAuB,KAAK,WAAW,MAAM,qBAAqB;AAGxE,QAAI,gBAAgB,WAAW,KAAK,CAAC,sBAAsB;AACzD,WAAK,eAAe;AACpB,WAAK,qBAAqB;AAC1B,WAAK,0BAA0B;AAC/B,WAAK,cAAc;AACnB,WAAK,gBAAgB,CAAA;AACrB;AAAA,IACF;AAIA,SAAK,yBAAyB;AAC9B,SAAK,iBAAiB,CAAA;AACtB,SAAK,yBAAyB,MAAA;AAK9B,SAAK,eAAe;AACpB,SAAK,qBAAqB;AAC1B,SAAK,0BAA0B;AAC/B,SAAK,cAAc;AAGnB,SAAK,gBAAgB,CAAA;AAKrB,mBAAe,MAAM;AAEnB,UAAI,CAAC,KAAK,wBAAwB;AAChC;AAAA,MACF;AAGA,iBAAW,WAAW,iBAAiB;AACrC,cAAM,aAAa,KAAK,WAAW,MAAM,WAAW,OAAO;AAG3D,aAAK,cAAc,KAAK,OAAO;AAC/B,aAAK,uBAAuB,UAAU;AAGtC,YAAI,sBAAsB,SAAS;AACjC,eAAK,yBAAyB,IAAI,UAAU;AAC5C,qBACG,MAAM,MAAM;AAAA,UAEb,CAAC,EACA,QAAQ,MAAM;AACb,iBAAK,yBAAyB,OAAO,UAAU;AAC/C,iBAAK,6BAAA;AAAA,UACP,CAAC;AAAA,QACL;AAAA,MACF;AAIA,UAAI,KAAK,yBAAyB,SAAS,GAAG;AAC5C,aAAK,oBAAA;AAAA,MACP;AAAA,IACF,CAAC;AAAA,EACH;AAAA;AAAA;AAAA;AAAA,EAKQ,+BAA+B;AACrC,QACE,KAAK,yBAAyB,SAAS,KACvC,KAAK,wBACL;AACA,WAAK,oBAAA;AAAA,IACP;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,sBAAsB;AAC5B,SAAK,yBAAyB;AAI9B,UAAM,SAAS,KAAK,eAAe,KAAA;AACnC,QAAI,OAAO,SAAS,GAAG;AACrB,WAAK,iBAAiB,MAAM;AAAA,IAC9B;AAEA,SAAK,iBAAiB,CAAA;AAAA,EACxB;AAAA,EAEA,gBAAgB,OAA4B;AAC1C,SAAK,eAAe;AAAA,EACtB;AAAA;AAAA;AAAA;AAAA,EAKA,kBAA2B;AACzB,WAAO,KAAK,iBAAiB;AAAA,EAC/B;AAAA;AAAA;AAAA;AAAA,EAKQ,UAAU,WAA+B;AAC/C,QAAI,KAAK,YAAY,WAAW;AAC9B;AAAA,IACF;AAEA,UAAM,iBAAiB,KAAK;AAC5B,SAAK,UAAU;AAGf,SAAK,UAAU,iBAAiB;AAAA,MAC9B,MAAM;AAAA,MACN,cAAc;AAAA,MACd;AAAA,MACA,QAAQ;AAAA,IAAA,CACT;AAGD,UAAM,WAA2C,UAAU,SAAS;AACpE,SAAK,UAAU,UAAU;AAAA,MACvB,MAAM;AAAA,MACN,cAAc;AAAA,MACd;AAAA,MACA,QAAQ;AAAA,IAAA,CAC8B;AAAA,EAC1C;AAAA;AAAA;AAAA;AAAA,EAKQ,uBAAuB,YAAkC;AAE/D,QAAI,sBAAsB,SAAS;AACjC,WAAK,0BAA0B,IAAI,UAAU;AAC7C,WAAK,UAAU,eAAe;AAE9B,iBAAW,QAAQ,MAAM;AACvB,aAAK,0BAA0B,OAAO,UAAU;AAChD,YAAI,KAAK,0BAA0B,SAAS,GAAG;AAC7C,eAAK,UAAU,OAAO;AAAA,QACxB;AAAA,MACF,CAAC;AAAA,IACH;AAAA,EACF;AAAA,EAEA,wBAAwB;AACtB,WAAO,KAAK;AAAA,EACd;AAAA,EAEA,4BAA4B;AAC1B,WAAO,KAAK;AAAA,EACd;AAAA,EAEA,WAAW,SAAyC;AAClD,UAAM,aAAa,KAAK,qBAAqB,OAAO;AAEpD,QAAI,KAAK,wBAAwB;AAG/B,UAAI,WAAW,SAAS,GAAG;AACzB,aAAK,eAAe,KAAK,UAAU;AAAA,MACrC;AAAA,IACF,OAAO;AACL,WAAK,iBAAiB,UAAU;AAAA,IAClC;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,gBAAgB,MAAwC;AACtD,QAAI,KAAK,oBAAoB;AAE3B,aAAO;AAAA,IACT;AAEA,UAAM,YAAoC;AAAA,MACxC,OAAO,KAAK,QAAQ;AAAA,MACpB,eAAe,MAAM,iBAAiB;AAAA,IAAA;AAGxC,QAAI,MAAM;AACR,UAAI,WAAW,MAAM;AACnB,cAAM,mBAAmB,KAAK;AAC9B,YAAI,UAAU,OAAO;AAEnB,gBAAM,cAAc,UAAU;AAC9B,gBAAM,mBAAmBC,UAAAA,IAAI,aAAa,gBAAgB;AAC1D,oBAAU,QAAQ;AAAA,QACpB,OAAO;AACL,oBAAU,QAAQ;AAAA,QACpB;AAAA,MACF;AAAA,IACF,OAAO;AAEL,WAAK,qBAAqB;AAAA,IAC5B;AAIA,UAAM,cAAiC;AAAA,MACrC,OAAO,UAAU;AAAA,MACjB,cAAc;AAAA;AAAA,MAEd,SAAS,MAAM;AAAA,MACf,OAAO,MAAM;AAAA,IAAA;AAEf,UAAM,aAAa,KAAK,WAAW,MAAM,WAAW,WAAW;AAG/D,UAAM,qBAAqB,UAAU;AAGrC,SAAK,cAAc,KAAK,WAAW;AAEnC,UAAM,yBAAyB,MAAM,0BAA0B;AAC/D,QAAI,wBAAwB;AAC1B,WAAK,uBAAuB,UAAU;AAAA,IACxC;AAGA,UAAM,WAAW,KAAK,WAAW,sBAAsB,SAAS;AAEhE,QAAI,aAAa,QAAW;AAE1B,aAAO;AAAA,IACT;AAGA,UAAM,mBAAmB,SAAS;AAAA,MAChC,CAAC,WAAW,CAAC,KAAK,SAAS,IAAI,OAAO,GAAG;AAAA,IAAA;AAM3C,eAAW,UAAU,kBAAkB;AACrC,WAAK,SAAS,IAAI,OAAO,GAAG;AAAA,IAC9B;AAEA,SAAK,eAAe;AACpB,SAAK,SAAS,gBAAgB;AAC9B,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAeA,uBAAuB;AAAA,IACrB;AAAA,IACA;AAAA,IACA;AAAA,IACA;AAAA,IACA,wBAAwB,+BAA+B;AAAA,IACvD;AAAA,EAAA,GACgC;AAChC,QAAI,CAAC,MAAO,OAAM,IAAI,MAAM,mBAAmB;AAE/C,QAAI,CAAC,KAAK,cAAc;AACtB,YAAM,IAAI;AAAA,QACR;AAAA,MAAA;AAAA,IAEJ;AAIA,UAAM,cAAc,cAAc,UAAa,UAAU,SAAS;AAElE,UAAM,WAAW,YAAY,CAAC;AAE9B,UAAM,mBAAmB;AAEzB,UAAM,QAAQ,KAAK;AACnB,UAAM,QAAQ,KAAK,QAAQ;AAC3B,UAAM,gBAAgB,QAClBC,gDAAmC,KAAK,IACxC;AAEJ,UAAM,WAAW,CAAC,QAA8C;AAC9D,UAAI,QAAQ,UAAa,KAAK,SAAS,IAAI,GAAG,GAAG;AAC/C,eAAO;AAAA,MACT;AAEA,YAAM,QAAQ,KAAK,WAAW,IAAI,GAAG;AACrC,UAAI,UAAU,QAAW;AACvB,eAAO;AAAA,MACT;AAEA,aAAO,gBAAgB,KAAK,KAAK;AAAA,IACnC;AAEA,QAAI,uBAAuB;AAC3B,UAAM,UAAsD,CAAA;AAW5D,QAAI,OAA+B,CAAA;AACnC,QAAI,aAAa;AAGf,YAAM,EAAE,WAAA,IAAe,QAAQ,CAAC;AAChC,YAAM,sBAAsB,KAAK,WAAW,sBAAsB;AAAA,QAChE,OAAOC,UAAAA,GAAG,YAAY,IAAIC,GAAAA,MAAM,gBAAgB,CAAC;AAAA,MAAA,CAClD;AAED,UAAI,qBAAqB;AACvB,cAAM,mBAAmB,oBACtB,IAAI,CAAC,WAAW,OAAO,GAAG,EAC1B,OAAO,CAAC,QAAQ,CAAC,KAAK,SAAS,IAAI,GAAG,KAAK,SAAS,GAAG,CAAC;AAG3D,aAAK,KAAK,GAAG,gBAAgB;AAG7B,cAAM,qBAAqB,MAAM;AAAA,UAC/B,QAAQ,KAAK;AAAA,UACb;AAAA,UACA;AAAA,QAAA;AAEF,aAAK,KAAK,GAAG,kBAAkB;AAAA,MACjC,OAAO;AACL,eAAO,MAAM,KAAK,OAAO,kBAAmB,QAAQ;AAAA,MACtD;AAAA,IACF,OAAO;AAEL,aAAO,MAAM,cAAc,OAAO,QAAQ;AAAA,IAC5C;AAEA,UAAM,eAAe,MAAM,KAAK,IAAI,QAAQ,QAAQ,QAAQ,CAAC;AAC7D,UAAM,sBAAsB,MAAM,KAAK,WAAW;AAGlD,UAAM,oBAAoB,QAAQ,CAAC,EAAG;AACtC,UAAM,iBACJ,kBAAkB,SAAS,QACvBC,WAAAA,kBAAkB,IAAIC,GAAAA,QAAQ,kBAAkB,IAAI,GAAG,IAAI,IAC3D;AAEN,WAAO,aAAA,IAAiB,KAAK,CAAC,uBAAuB;AACnD,YAAM,mCAAmB,IAAA;AAEzB,iBAAW,OAAO,MAAM;AACtB,cAAM,QAAQ,KAAK,WAAW,IAAI,GAAG;AACrC,gBAAQ,KAAK;AAAA,UACX,MAAM;AAAA,UACN;AAAA,UACA;AAAA,QAAA,CACD;AAGD,+BAAuB,iBAAiB,eAAe,KAAK,IAAI;AAChE,qBAAa,IAAI,GAAG;AAAA,MACtB;AAEA,aAAO,MAAM,KAAK,aAAA,GAAgB,sBAAuB,QAAQ;AAAA,IACnE;AAIA,UAAM,gBAAgB,KAAK;AAK3B,eAAW,UAAU,SAAS;AAC5B,WAAK,SAAS,IAAI,OAAO,GAAG;AAAA,IAC9B;AAEA,SAAK,SAAS,OAAO;AAGrB,SAAK,2BAA2B,QAAQ;AACxC,QAAI,QAAQ,SAAS,GAAG;AACtB,WAAK,cAAc,QAAQ,QAAQ,SAAS,CAAC,EAAG;AAAA,IAClD;AAKA,QAAI;AAQJ,QAAI,cAAc,UAAa,UAAU,SAAS,GAAG;AACnD,YAAM,kBAAkBC,OAAAA,YAAY,SAAS,SAAS;AAEtD,UAAI,iBAAiB;AACnB,cAAM,EAAE,WAAA,IAAe,QAAQ,CAAC;AAChC,cAAM,iBAAiB,UAAU,CAAC;AAKlC,YAAI;AACJ,YAAI,0BAA0B,MAAM;AAClC,gBAAM,wBAAwB,IAAI,KAAK,eAAe,QAAA,IAAY,CAAC;AACnE,+BAAqBN,UAAAA;AAAAA,YACnBO,UAAAA,IAAI,YAAY,IAAIJ,GAAAA,MAAM,cAAc,CAAC;AAAA,YACzCK,UAAAA,GAAG,YAAY,IAAIL,GAAAA,MAAM,qBAAqB,CAAC;AAAA,UAAA;AAAA,QAEnD,OAAO;AACL,+BAAqBD,UAAAA,GAAG,YAAY,IAAIC,GAAAA,MAAM,cAAc,CAAC;AAAA,QAC/D;AAEA,4BAAoB;AAAA,UAClB,WAAW;AAAA,UACX,cAAc;AAAA,UACd,SAAS,KAAK;AAAA,QAAA;AAAA,MAElB;AAAA,IACF;AAMA,UAAM,cAAiC;AAAA,MACrC;AAAA;AAAA,MACA;AAAA,MACA;AAAA,MACA,QAAQ;AAAA;AAAA,MACR,QAAQ,UAAU;AAAA;AAAA,MAClB,cAAc;AAAA,IAAA;AAEhB,UAAM,aAAa,KAAK,WAAW,MAAM,WAAW,WAAW;AAG/D,yBAAqB,UAAU;AAG/B,SAAK,cAAc,KAAK,WAAW;AACnC,QAAI,8BAA8B;AAChC,WAAK,uBAAuB,UAAU;AAAA,IACxC;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWQ,qBAAqB,SAAyC;AACpE,QAAI,KAAK,sBAAsB,KAAK,eAAe;AAGjD,aAAO;AAAA,IACT;AAQA,UAAM,mBAAmB,KAAK;AAE9B,UAAM,aAAa,CAAA;AACnB,eAAW,UAAU,SAAS;AAC5B,UAAI,YAAY;AAChB,YAAM,gBAAgB,KAAK,SAAS,IAAI,OAAO,GAAG;AAElD,UAAI,CAAC,eAAe;AAClB,YAAI,OAAO,SAAS,UAAU;AAC5B,sBAAY,EAAE,GAAG,QAAQ,MAAM,UAAU,eAAe,OAAA;AAAA,QAC1D,WAAW,OAAO,SAAS,UAAU;AAGnC,cAAI,CAAC,kBAAkB;AACrB;AAAA,UACF;AAAA,QACF;AACA,aAAK,SAAS,IAAI,OAAO,GAAG;AAAA,MAC9B,OAAO;AAEL,YAAI,OAAO,SAAS,UAAU;AAK5B;AAAA,QACF,WAAW,OAAO,SAAS,UAAU;AAGnC,eAAK,SAAS,OAAO,OAAO,GAAG;AAAA,QACjC;AAAA,MACF;AACA,iBAAW,KAAK,SAAS;AAAA,IAC3B;AACA,WAAO;AAAA,EACT;AAAA,EAEQ,cAAc,SAAqD;AACzE,QAAI,KAAK,sBAAsB,KAAK,eAAe;AAGjD;AAAA,IACF;AAEA,eAAW,UAAU,SAAS;AAC5B,UAAI,OAAO,SAAS,UAAU;AAC5B,aAAK,SAAS,OAAO,OAAO,GAAG;AAAA,MACjC,OAAO;AACL,aAAK,SAAS,IAAI,OAAO,GAAG;AAAA,MAC9B;AAAA,IACF;AAKA,QAAI,KAAK,cAAc;AACrB,WAAK,0BAA0B,KAAK;AAAA,QAClC,KAAK;AAAA,QACL,KAAK,SAAS;AAAA,MAAA;AAAA,IAElB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,qBAAqB;AACnB,SAAK,gBAAgB;AAAA,EACvB;AAAA,EAEA,cAAc;AAEZ,SAAK,kBAAA;AACL,SAAK,kBAAkB;AAGvB,SAAK,yBAAyB;AAC9B,SAAK,iBAAiB,CAAA;AACtB,SAAK,yBAAyB,MAAA;AAI9B,eAAW,WAAW,KAAK,eAAe;AACxC,WAAK,WAAW,MAAM,aAAa,OAAO;AAAA,IAC5C;AACA,SAAK,gBAAgB,CAAA;AAErB,SAAK,UAAU,gBAAgB;AAAA,MAC7B,MAAM;AAAA,MACN,cAAc;AAAA,IAAA,CACf;AAED,SAAK,eAAA;AAAA,EACP;AACF;;"}