{"version":3,"file":"changes.cjs","sources":["../../../src/collection/changes.ts"],"sourcesContent":["import { NegativeActiveSubscribersError } from '../errors'\nimport {\n  createSingleRowRefProxy,\n  toExpression,\n} from '../query/builder/ref-proxy.js'\nimport { CollectionSubscription } from './subscription.js'\nimport type { StandardSchemaV1 } from '@standard-schema/spec'\nimport type { ChangeMessage, SubscribeChangesOptions } from '../types'\nimport type { CollectionLifecycleManager } from './lifecycle.js'\nimport type { CollectionSyncManager } from './sync.js'\nimport type { CollectionEventsManager } from './events.js'\nimport type { CollectionImpl } from './index.js'\nimport type { CollectionStateManager } from './state.js'\nimport type { WithVirtualProps } from '../virtual-props.js'\n\nexport class CollectionChangesManager<\n  TOutput extends object = Record<string, unknown>,\n  TKey extends string | number = string | number,\n  TSchema extends StandardSchemaV1 = StandardSchemaV1,\n  TInput extends object = TOutput,\n> {\n  private lifecycle!: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>\n  private sync!: CollectionSyncManager<TOutput, TKey, TSchema, TInput>\n  private events!: CollectionEventsManager\n  private collection!: CollectionImpl<TOutput, TKey, any, TSchema, TInput>\n  private state!: CollectionStateManager<TOutput, TKey, TSchema, TInput>\n\n  public activeSubscribersCount = 0\n  public changeSubscriptions = new Set<CollectionSubscription>()\n  public batchedEvents: Array<ChangeMessage<TOutput, TKey>> = []\n  public shouldBatchEvents = false\n\n  /**\n   * Creates a new CollectionChangesManager instance\n   */\n  constructor() {}\n\n  public setDeps(deps: {\n    lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>\n    sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>\n    events: CollectionEventsManager\n    collection: CollectionImpl<TOutput, TKey, any, TSchema, TInput>\n    state: CollectionStateManager<TOutput, TKey, TSchema, TInput>\n  }) {\n    this.lifecycle = deps.lifecycle\n    this.sync = deps.sync\n    this.events = deps.events\n    this.collection = deps.collection\n    this.state = deps.state\n  }\n\n  /**\n   * Emit an empty ready event to notify subscribers that the collection is ready\n   * This bypasses the normal empty array check in emitEvents\n   */\n  public emitEmptyReadyEvent(): void {\n    // Emit empty array directly to all subscribers\n    for (const subscription of this.changeSubscriptions) {\n      subscription.emitEvents([])\n    }\n  }\n\n  /**\n   * Enriches a change message with virtual properties ($synced, $origin, $key, $collectionId).\n   * Uses the \"add-if-missing\" pattern to preserve virtual properties from upstream collections.\n   */\n  private enrichChangeWithVirtualProps(\n    change: ChangeMessage<TOutput, TKey>,\n  ): ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey> {\n    return this.state.enrichChangeMessage(change)\n  }\n\n  /**\n   * Emit events either immediately or batch them for later emission\n   */\n  public emitEvents(\n    changes: Array<ChangeMessage<TOutput, TKey>>,\n    forceEmit = false,\n  ): void {\n    // Skip batching for user actions (forceEmit=true) to keep UI responsive\n    if (this.shouldBatchEvents && !forceEmit) {\n      // Add events to the batch\n      this.batchedEvents.push(...changes)\n      return\n    }\n\n    // Either we're not batching, or we're forcing emission (user action or ending batch cycle)\n    let rawEvents = changes\n\n    if (forceEmit) {\n      // Force emit is used to end a batch (e.g. after a sync commit). Combine any\n      // buffered optimistic events with the final changes so subscribers see the\n      // whole picture, even if the sync diff is empty.\n      if (this.batchedEvents.length > 0) {\n        rawEvents = [...this.batchedEvents, ...changes]\n      }\n      this.batchedEvents = []\n      this.shouldBatchEvents = false\n    }\n\n    if (rawEvents.length === 0) {\n      return\n    }\n\n    // Enrich all change messages with virtual properties\n    // This uses the \"add-if-missing\" pattern to preserve pass-through semantics\n    const enrichedEvents: Array<\n      ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>\n    > = rawEvents.map((change) => this.enrichChangeWithVirtualProps(change))\n\n    // Emit to all listeners\n    for (const subscription of this.changeSubscriptions) {\n      subscription.emitEvents(enrichedEvents)\n    }\n  }\n\n  /**\n   * Subscribe to changes in the collection\n   */\n  public subscribeChanges(\n    callback: (\n      changes: Array<ChangeMessage<WithVirtualProps<TOutput, TKey>>>,\n    ) => void,\n    options: SubscribeChangesOptions<TOutput, TKey> = {},\n  ): CollectionSubscription {\n    // Start sync and track subscriber\n    this.addSubscriber()\n\n    // Compile where callback to whereExpression if provided\n    if (options.where && options.whereExpression) {\n      throw new Error(\n        `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.`,\n      )\n    }\n\n    const { where, ...opts } = options\n    let whereExpression = opts.whereExpression\n    if (where) {\n      const proxy = createSingleRowRefProxy<WithVirtualProps<TOutput, TKey>>()\n      const result = where(proxy)\n      whereExpression = toExpression(result)\n    }\n\n    const subscription = new CollectionSubscription(this.collection, callback, {\n      ...opts,\n      whereExpression,\n      onUnsubscribe: () => {\n        this.removeSubscriber()\n        this.changeSubscriptions.delete(subscription)\n      },\n    })\n\n    // Register status listener BEFORE requesting snapshot to avoid race condition.\n    // This ensures the listener catches all status transitions, even if the\n    // loadSubset promise resolves synchronously or very quickly.\n    if (options.onStatusChange) {\n      subscription.on(`status:change`, options.onStatusChange)\n    }\n\n    if (options.includeInitialState) {\n      subscription.requestSnapshot({\n        trackLoadSubsetPromise: false,\n        orderBy: options.orderBy,\n        limit: options.limit,\n        onLoadSubsetResult: options.onLoadSubsetResult,\n      })\n    } else if (options.includeInitialState === false) {\n      // When explicitly set to false (not just undefined), mark all state as \"seen\"\n      // so that all future changes (including deletes) pass through unfiltered.\n      subscription.markAllStateAsSeen()\n    }\n\n    // Add to batched listeners\n    this.changeSubscriptions.add(subscription)\n\n    return subscription\n  }\n\n  /**\n   * Increment the active subscribers count and start sync if needed\n   */\n  private addSubscriber(): void {\n    const previousSubscriberCount = this.activeSubscribersCount\n    this.activeSubscribersCount++\n    this.lifecycle.cancelGCTimer()\n\n    // Start sync if collection was cleaned up\n    if (\n      this.lifecycle.status === `cleaned-up` ||\n      this.lifecycle.status === `idle`\n    ) {\n      this.sync.startSync()\n    }\n\n    this.events.emitSubscribersChange(\n      this.activeSubscribersCount,\n      previousSubscriberCount,\n    )\n  }\n\n  /**\n   * Decrement the active subscribers count and start GC timer if needed\n   */\n  private removeSubscriber(): void {\n    const previousSubscriberCount = this.activeSubscribersCount\n    this.activeSubscribersCount--\n\n    if (this.activeSubscribersCount === 0) {\n      this.lifecycle.startGCTimer()\n    } else if (this.activeSubscribersCount < 0) {\n      throw new NegativeActiveSubscribersError()\n    }\n\n    this.events.emitSubscribersChange(\n      this.activeSubscribersCount,\n      previousSubscriberCount,\n    )\n  }\n\n  /**\n   * Clean up the collection by stopping sync and clearing data\n   * This can be called manually or automatically by garbage collection\n   */\n  public cleanup(): void {\n    this.batchedEvents = []\n    this.shouldBatchEvents = false\n  }\n}\n"],"names":["subscription","createSingleRowRefProxy","toExpression","CollectionSubscription","NegativeActiveSubscribersError"],"mappings":";;;;;AAeO,MAAM,yBAKX;AAAA;AAAA;AAAA;AAAA,EAeA,cAAc;AARd,SAAO,yBAAyB;AAChC,SAAO,0CAA0B,IAAA;AACjC,SAAO,gBAAqD,CAAA;AAC5D,SAAO,oBAAoB;AAAA,EAKZ;AAAA,EAER,QAAQ,MAMZ;AACD,SAAK,YAAY,KAAK;AACtB,SAAK,OAAO,KAAK;AACjB,SAAK,SAAS,KAAK;AACnB,SAAK,aAAa,KAAK;AACvB,SAAK,QAAQ,KAAK;AAAA,EACpB;AAAA;AAAA;AAAA;AAAA;AAAA,EAMO,sBAA4B;AAEjC,eAAWA,iBAAgB,KAAK,qBAAqB;AACnD,MAAAA,cAAa,WAAW,EAAE;AAAA,IAC5B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA,EAMQ,6BACN,QACsD;AACtD,WAAO,KAAK,MAAM,oBAAoB,MAAM;AAAA,EAC9C;AAAA;AAAA;AAAA;AAAA,EAKO,WACL,SACA,YAAY,OACN;AAEN,QAAI,KAAK,qBAAqB,CAAC,WAAW;AAExC,WAAK,cAAc,KAAK,GAAG,OAAO;AAClC;AAAA,IACF;AAGA,QAAI,YAAY;AAEhB,QAAI,WAAW;AAIb,UAAI,KAAK,cAAc,SAAS,GAAG;AACjC,oBAAY,CAAC,GAAG,KAAK,eAAe,GAAG,OAAO;AAAA,MAChD;AACA,WAAK,gBAAgB,CAAA;AACrB,WAAK,oBAAoB;AAAA,IAC3B;AAEA,QAAI,UAAU,WAAW,GAAG;AAC1B;AAAA,IACF;AAIA,UAAM,iBAEF,UAAU,IAAI,CAAC,WAAW,KAAK,6BAA6B,MAAM,CAAC;AAGvE,eAAWA,iBAAgB,KAAK,qBAAqB;AACnD,MAAAA,cAAa,WAAW,cAAc;AAAA,IACxC;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKO,iBACL,UAGA,UAAkD,IAC1B;AAExB,SAAK,cAAA;AAGL,QAAI,QAAQ,SAAS,QAAQ,iBAAiB;AAC5C,YAAM,IAAI;AAAA,QACR;AAAA,MAAA;AAAA,IAEJ;AAEA,UAAM,EAAE,OAAO,GAAG,KAAA,IAAS;AAC3B,QAAI,kBAAkB,KAAK;AAC3B,QAAI,OAAO;AACT,YAAM,QAAQC,SAAAA,wBAAA;AACd,YAAM,SAAS,MAAM,KAAK;AAC1B,wBAAkBC,SAAAA,aAAa,MAAM;AAAA,IACvC;AAEA,UAAMF,iBAAe,IAAIG,aAAAA,uBAAuB,KAAK,YAAY,UAAU;AAAA,MACzE,GAAG;AAAA,MACH;AAAA,MACA,eAAe,MAAM;AACnB,aAAK,iBAAA;AACL,aAAK,oBAAoB,OAAOH,cAAY;AAAA,MAC9C;AAAA,IAAA,CACD;AAKD,QAAI,QAAQ,gBAAgB;AAC1BA,qBAAa,GAAG,iBAAiB,QAAQ,cAAc;AAAA,IACzD;AAEA,QAAI,QAAQ,qBAAqB;AAC/BA,qBAAa,gBAAgB;AAAA,QAC3B,wBAAwB;AAAA,QACxB,SAAS,QAAQ;AAAA,QACjB,OAAO,QAAQ;AAAA,QACf,oBAAoB,QAAQ;AAAA,MAAA,CAC7B;AAAA,IACH,WAAW,QAAQ,wBAAwB,OAAO;AAGhDA,qBAAa,mBAAA;AAAA,IACf;AAGA,SAAK,oBAAoB,IAAIA,cAAY;AAEzC,WAAOA;AAAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKQ,gBAAsB;AAC5B,UAAM,0BAA0B,KAAK;AACrC,SAAK;AACL,SAAK,UAAU,cAAA;AAGf,QACE,KAAK,UAAU,WAAW,gBAC1B,KAAK,UAAU,WAAW,QAC1B;AACA,WAAK,KAAK,UAAA;AAAA,IACZ;AAEA,SAAK,OAAO;AAAA,MACV,KAAK;AAAA,MACL;AAAA,IAAA;AAAA,EAEJ;AAAA;AAAA;AAAA;AAAA,EAKQ,mBAAyB;AAC/B,UAAM,0BAA0B,KAAK;AACrC,SAAK;AAEL,QAAI,KAAK,2BAA2B,GAAG;AACrC,WAAK,UAAU,aAAA;AAAA,IACjB,WAAW,KAAK,yBAAyB,GAAG;AAC1C,YAAM,IAAII,OAAAA,+BAAA;AAAA,IACZ;AAEA,SAAK,OAAO;AAAA,MACV,KAAK;AAAA,MACL;AAAA,IAAA;AAAA,EAEJ;AAAA;AAAA;AAAA;AAAA;AAAA,EAMO,UAAgB;AACrB,SAAK,gBAAgB,CAAA;AACrB,SAAK,oBAAoB;AAAA,EAC3B;AACF;;"}