{"version":3,"sources":["../src/events/caching-pubsub.ts"],"names":["PubSub"],"mappings":";;;;;AA6CO,IAAM,aAAA,GAAN,cAA4BA,wBAAA,CAAO;AAAA,EAMxC,WAAA,CACmB,KAAA,EACA,KAAA,EACjB,OAAA,GAAgC,EAAC,EACjC;AACA,IAAA,KAAA,EAAM;AAJW,IAAA,IAAA,CAAA,KAAA,GAAA,KAAA;AACA,IAAA,IAAA,CAAA,KAAA,GAAA,KAAA;AAIjB,IAAA,IAAA,CAAK,SAAA,GAAY,QAAQ,SAAA,IAAa,SAAA;AACtC,IAAA,IAAA,CAAK,SAAS,OAAA,CAAQ,MAAA;AAAA,EACxB;AAAA,EAPmB,KAAA;AAAA,EACA,KAAA;AAAA,EAPF,SAAA;AAAA,EACA,MAAA;AAAA;AAAA,EAET,WAAA,uBAAkB,GAAA,EAAkC;AAAA;AAAA;AAAA;AAAA,EAepD,QAAA,CAAS,SAAiB,KAAA,EAAsB;AACtD,IAAA,IAAI,KAAK,MAAA,EAAQ;AACf,MAAA,IAAA,CAAK,MAAA,CAAO,KAAA,CAAM,OAAA,EAAS,KAAK,CAAA;AAAA,IAClC,CAAA,MAAO;AACL,MAAA,OAAA,CAAQ,KAAA,CAAM,SAAS,KAAK,CAAA;AAAA,IAC9B;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKQ,YAAY,KAAA,EAAuB;AACzC,IAAA,OAAO,CAAA,EAAG,IAAA,CAAK,SAAS,CAAA,EAAG,KAAK,CAAA,CAAA;AAAA,EAClC;AAAA;AAAA;AAAA;AAAA,EAKQ,cAAc,KAAA,EAAuB;AAC3C,IAAA,OAAO,CAAA,EAAG,IAAA,CAAK,SAAS,CAAA,EAAG,KAAK,CAAA,QAAA,CAAA;AAAA,EAClC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EASA,MAAM,OAAA,CAAQ,KAAA,EAAe,KAAA,EAAiE;AAC5F,IAAA,MAAM,QAAA,GAAW,IAAA,CAAK,WAAA,CAAY,KAAK,CAAA;AACvC,IAAA,MAAM,UAAA,GAAa,IAAA,CAAK,aAAA,CAAc,KAAK,CAAA;AAE3C,IAAA,IAAI,KAAA,GAAQ,CAAA;AACZ,IAAA,IAAI,WAAA,GAAc,KAAA;AAClB,IAAA,IAAI;AAEF,MAAA,KAAA,GAAS,MAAM,IAAA,CAAK,KAAA,CAAM,SAAA,CAAU,UAAU,CAAA,GAAK,CAAA;AAAA,IACrD,SAAS,KAAA,EAAO;AACd,MAAA,IAAA,CAAK,QAAA,CAAS,CAAA,gDAAA,EAAmD,KAAK,CAAA,CAAA,EAAI,KAAK,CAAA;AAC/E,MAAA,WAAA,GAAc,IAAA;AAAA,IAChB;AAEA,IAAA,MAAM,SAAA,GAAmB;AAAA,MACvB,GAAG,KAAA;AAAA,MACH,EAAA,EAAI,OAAO,UAAA,EAAW;AAAA,MACtB,SAAA,sBAAe,IAAA,EAAK;AAAA,MACpB;AAAA,KACF;AAEA,IAAA,IAAI,CAAC,WAAA,EAAa;AAChB,MAAA,IAAI;AAEF,QAAA,MAAM,IAAA,CAAK,KAAA,CAAM,QAAA,CAAS,QAAA,EAAU,SAAS,CAAA;AAAA,MAC/C,SAAS,KAAA,EAAO;AACd,QAAA,IAAA,CAAK,QAAA,CAAS,CAAA,0CAAA,EAA6C,KAAK,CAAA,CAAA,EAAI,KAAK,CAAA;AAAA,MAC3E;AAAA,IACF;AAGA,IAAA,MAAM,IAAA,CAAK,KAAA,CAAM,OAAA,CAAQ,KAAA,EAAO,SAAS,CAAA;AAAA,EAC3C;AAAA;AAAA;AAAA;AAAA,EAKA,MAAM,SAAA,CAAU,KAAA,EAAe,EAAA,EAAmB,OAAA,EAA2C;AAC3F,IAAA,MAAM,IAAA,CAAK,KAAA,CAAM,SAAA,CAAU,KAAA,EAAO,IAAI,OAAO,CAAA;AAAA,EAC/C;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAaA,MAAM,mBAAA,CAAoB,KAAA,EAAe,EAAA,EAAkC;AAIzE,IAAA,IAAI,IAAA,uBAA+B,GAAA,EAAY;AAI/C,IAAA,MAAM,SAAA,GAA2B,CAAC,KAAA,EAAO,GAAA,KAAQ;AAC/C,MAAA,IAAI,IAAA,EAAM;AACR,QAAA,IAAI,CAAC,IAAA,CAAK,GAAA,CAAI,KAAA,CAAM,EAAE,CAAA,EAAG;AACvB,UAAA,IAAA,CAAK,GAAA,CAAI,MAAM,EAAE,CAAA;AACjB,UAAA,EAAA,CAAG,OAAO,GAAG,CAAA;AAAA,QACf;AAAA,MACF,CAAA,MAAO;AACL,QAAA,EAAA,CAAG,OAAO,GAAG,CAAA;AAAA,MACf;AAAA,IACF,CAAA;AAGA,IAAA,IAAA,CAAK,WAAA,CAAY,GAAA,CAAI,EAAA,EAAI,SAAS,CAAA;AAClC,IAAA,MAAM,IAAA,CAAK,KAAA,CAAM,SAAA,CAAU,KAAA,EAAO,SAAS,CAAA;AAG3C,IAAA,MAAM,OAAA,GAAU,MAAM,IAAA,CAAK,UAAA,CAAW,KAAK,CAAA;AAC3C,IAAA,KAAA,MAAW,SAAS,OAAA,EAAS;AAC3B,MAAA,IAAI,CAAC,IAAA,CAAM,GAAA,CAAI,KAAA,CAAM,EAAE,CAAA,EAAG;AACxB,QAAA,IAAA,CAAM,GAAA,CAAI,MAAM,EAAE,CAAA;AAClB,QAAA,EAAA,CAAG,KAAK,CAAA;AAAA,MACV;AAAA,IACF;AAIA,IAAA,IAAA,GAAO,IAAA;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAUA,MAAM,mBAAA,CAAoB,KAAA,EAAe,MAAA,EAAgB,EAAA,EAAkC;AAEzF,IAAA,IAAI,IAAA,uBAA+B,GAAA,EAAY;AAI/C,IAAA,MAAM,SAAA,GAA2B,CAAC,KAAA,EAAO,GAAA,KAAQ;AAC/C,MAAA,IAAI,IAAA,EAAM;AACR,QAAA,IAAI,CAAC,IAAA,CAAK,GAAA,CAAI,KAAA,CAAM,EAAE,CAAA,EAAG;AACvB,UAAA,IAAA,CAAK,GAAA,CAAI,MAAM,EAAE,CAAA;AACjB,UAAA,EAAA,CAAG,OAAO,GAAG,CAAA;AAAA,QACf;AAAA,MACF,CAAA,MAAO;AACL,QAAA,EAAA,CAAG,OAAO,GAAG,CAAA;AAAA,MACf;AAAA,IACF,CAAA;AAGA,IAAA,IAAA,CAAK,WAAA,CAAY,GAAA,CAAI,EAAA,EAAI,SAAS,CAAA;AAClC,IAAA,MAAM,IAAA,CAAK,KAAA,CAAM,SAAA,CAAU,KAAA,EAAO,SAAS,CAAA;AAG3C,IAAA,MAAM,OAAA,GAAU,MAAM,IAAA,CAAK,UAAA,CAAW,OAAO,MAAM,CAAA;AACnD,IAAA,KAAA,MAAW,SAAS,OAAA,EAAS;AAC3B,MAAA,IAAI,CAAC,IAAA,CAAM,GAAA,CAAI,KAAA,CAAM,EAAE,CAAA,EAAG;AACxB,QAAA,IAAA,CAAM,GAAA,CAAI,MAAM,EAAE,CAAA;AAClB,QAAA,EAAA,CAAG,KAAK,CAAA;AAAA,MACV;AAAA,IACF;AAGA,IAAA,IAAA,GAAO,IAAA;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,MAAM,WAAA,CAAY,KAAA,EAAe,EAAA,EAAkC;AACjE,IAAA,MAAM,SAAA,GAAY,IAAA,CAAK,WAAA,CAAY,GAAA,CAAI,EAAE,CAAA,IAAK,EAAA;AAC9C,IAAA,IAAA,CAAK,WAAA,CAAY,OAAO,EAAE,CAAA;AAC1B,IAAA,MAAM,IAAA,CAAK,KAAA,CAAM,WAAA,CAAY,KAAA,EAAO,SAAS,CAAA;AAAA,EAC/C;AAAA;AAAA;AAAA;AAAA,EAKA,MAAM,UAAA,CAAW,KAAA,EAAe,MAAA,GAAiB,CAAA,EAAqB;AACpE,IAAA,MAAM,QAAA,GAAW,IAAA,CAAK,WAAA,CAAY,KAAK,CAAA;AACvC,IAAA,MAAM,SAAS,MAAM,IAAA,CAAK,KAAA,CAAM,UAAA,CAAW,UAAU,MAAM,CAAA;AAC3D,IAAA,OAAO,MAAA;AAAA,EACT;AAAA;AAAA;AAAA;AAAA,EAKA,MAAM,KAAA,GAAuB;AAC3B,IAAA,MAAM,IAAA,CAAK,MAAM,KAAA,EAAM;AAAA,EACzB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,MAAM,WAAW,KAAA,EAA8B;AAC7C,IAAA,MAAM,QAAA,GAAW,IAAA,CAAK,WAAA,CAAY,KAAK,CAAA;AACvC,IAAA,MAAM,UAAA,GAAa,IAAA,CAAK,aAAA,CAAc,KAAK,CAAA;AAC3C,IAAA,MAAM,OAAA,CAAQ,GAAA,CAAI,CAAC,IAAA,CAAK,KAAA,CAAM,MAAA,CAAO,QAAQ,CAAA,EAAG,IAAA,CAAK,KAAA,CAAM,MAAA,CAAO,UAAU,CAAC,CAAC,CAAA;AAAA,EAChF;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,QAAA,GAAmB;AACjB,IAAA,OAAO,IAAA,CAAK,KAAA;AAAA,EACd;AACF;AAcO,SAAS,WAAA,CAAY,MAAA,EAAgB,KAAA,EAA0B,OAAA,EAA+C;AACnH,EAAA,OAAO,IAAI,aAAA,CAAc,MAAA,EAAQ,KAAA,EAAO,OAAO,CAAA;AACjD","file":"chunk-VEYVZLLD.cjs","sourcesContent":["import type { MastraServerCache } from '../cache/base';\nimport type { IMastraLogger } from '../logger';\nimport { PubSub } from './pubsub';\nimport type { Event, EventCallback, SubscribeOptions } from './types';\n\n/**\n * Options for CachingPubSub\n */\nexport interface CachingPubSubOptions {\n  /**\n   * Optional prefix for cache keys to namespace events.\n   * Defaults to 'pubsub:'.\n   */\n  keyPrefix?: string;\n  /**\n   * Optional logger for structured logging.\n   * Falls back to console.error if not provided.\n   */\n  logger?: IMastraLogger;\n}\n\n/**\n * A PubSub decorator that adds event caching and replay capabilities.\n *\n * Wraps any PubSub implementation and uses MastraServerCache to:\n * - Cache all published events per topic\n * - Enable replay of cached events for late subscribers\n *\n * This enables resumable streams - clients can disconnect and reconnect\n * without missing events.\n *\n * @example\n * ```typescript\n * import { EventEmitterPubSub, CachingPubSub } from '@mastra/core/events';\n * import { InMemoryServerCache } from '@mastra/core/cache';\n *\n * const cache = new InMemoryServerCache();\n * const pubsub = new CachingPubSub(new EventEmitterPubSub(), cache);\n *\n * // Subscribe with replay - receives cached events first, then live\n * await pubsub.subscribeWithReplay('my-topic', (event) => {\n *   console.log(event);\n * });\n * ```\n */\nexport class CachingPubSub extends PubSub {\n  private readonly keyPrefix: string;\n  private readonly logger?: IMastraLogger;\n  /** Maps original callbacks to their wrapped versions for proper unsubscribe */\n  private callbackMap = new Map<EventCallback, EventCallback>();\n\n  constructor(\n    private readonly inner: PubSub,\n    private readonly cache: MastraServerCache,\n    options: CachingPubSubOptions = {},\n  ) {\n    super();\n    this.keyPrefix = options.keyPrefix ?? 'pubsub:';\n    this.logger = options.logger;\n  }\n\n  /**\n   * Log an error message using the configured logger or console.error.\n   */\n  private logError(message: string, error: unknown): void {\n    if (this.logger) {\n      this.logger.error(message, error);\n    } else {\n      console.error(message, error);\n    }\n  }\n\n  /**\n   * Get the cache key for a topic's event list\n   */\n  private getCacheKey(topic: string): string {\n    return `${this.keyPrefix}${topic}`;\n  }\n\n  /**\n   * Get the cache key for a topic's index counter\n   */\n  private getCounterKey(topic: string): string {\n    return `${this.keyPrefix}${topic}:counter`;\n  }\n\n  /**\n   * Publish an event to a topic.\n   * The event is cached with a sequential index before being published to the inner PubSub.\n   *\n   * Uses atomic increment for index assignment to prevent race conditions\n   * when multiple events are published concurrently.\n   */\n  async publish(topic: string, event: Omit<Event, 'id' | 'createdAt' | 'index'>): Promise<void> {\n    const cacheKey = this.getCacheKey(topic);\n    const counterKey = this.getCounterKey(topic);\n\n    let index = 0;\n    let indexFailed = false;\n    try {\n      // Atomically get next index (increment returns value after incrementing, so subtract 1 for 0-based index)\n      index = (await this.cache.increment(counterKey)) - 1;\n    } catch (error) {\n      this.logError(`[CachingPubSub] Failed to increment counter for ${topic}`, error);\n      indexFailed = true;\n    }\n\n    const fullEvent: Event = {\n      ...event,\n      id: crypto.randomUUID(),\n      createdAt: new Date(),\n      index,\n    };\n\n    if (!indexFailed) {\n      try {\n        // Cache BEFORE live publish so late-joining observers never miss events\n        await this.cache.listPush(cacheKey, fullEvent);\n      } catch (error) {\n        this.logError(`[CachingPubSub] Failed to cache event for ${topic}`, error);\n      }\n    }\n\n    // Always publish to inner PubSub — cache failure must not block live delivery\n    await this.inner.publish(topic, fullEvent);\n  }\n\n  /**\n   * Subscribe to live events on a topic (no replay).\n   */\n  async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {\n    await this.inner.subscribe(topic, cb, options);\n  }\n\n  /**\n   * Subscribe to a topic with automatic replay of cached events.\n   *\n   * Order of operations:\n   * 1. Subscribe to live events FIRST (to avoid missing events during replay)\n   * 2. Fetch and replay cached history\n   * 3. Deduplicate events at the boundary using event IDs\n   *\n   * Each subscriber gets its own deduplication set to ensure\n   * multiple subscribers can independently receive all events.\n   */\n  async subscribeWithReplay(topic: string, cb: EventCallback): Promise<void> {\n    // Each subscriber gets its own seen set for deduplication\n    // This prevents the same event from being delivered twice to THIS subscriber\n    // (once via cache replay and once via live subscription)\n    let seen: Set<string> | null = new Set<string>();\n\n    // Wrap callback to deduplicate events during replay/live overlap.\n    // After replay completes, seen is nulled out and the wrapper becomes a passthrough.\n    const wrappedCb: EventCallback = (event, ack) => {\n      if (seen) {\n        if (!seen.has(event.id)) {\n          seen.add(event.id);\n          cb(event, ack);\n        }\n      } else {\n        cb(event, ack);\n      }\n    };\n\n    // 1. Subscribe to live events FIRST to avoid race condition\n    this.callbackMap.set(cb, wrappedCb);\n    await this.inner.subscribe(topic, wrappedCb);\n\n    // 2. Fetch and replay cached history\n    const history = await this.getHistory(topic);\n    for (const event of history) {\n      if (!seen!.has(event.id)) {\n        seen!.add(event.id);\n        cb(event);\n      }\n    }\n\n    // Deduplication only needed during replay/live overlap — null out to free memory\n    // and skip unnecessary has/add for all subsequent live events\n    seen = null;\n  }\n\n  /**\n   * Subscribe to a topic with replay starting from a specific index.\n   * More efficient than full replay when the client knows their last position.\n   *\n   * @param topic - The topic to subscribe to\n   * @param offset - Start replaying from this index (0-based)\n   * @param cb - Callback invoked for each event\n   */\n  async subscribeFromOffset(topic: string, offset: number, cb: EventCallback): Promise<void> {\n    // Each subscriber gets its own seen set for deduplication\n    let seen: Set<string> | null = new Set<string>();\n\n    // Wrap callback to deduplicate events during replay/live overlap.\n    // After replay completes, seen is nulled out and the wrapper becomes a passthrough.\n    const wrappedCb: EventCallback = (event, ack) => {\n      if (seen) {\n        if (!seen.has(event.id)) {\n          seen.add(event.id);\n          cb(event, ack);\n        }\n      } else {\n        cb(event, ack);\n      }\n    };\n\n    // 1. Subscribe to live events FIRST to avoid race condition\n    this.callbackMap.set(cb, wrappedCb);\n    await this.inner.subscribe(topic, wrappedCb);\n\n    // 2. Fetch and replay cached history FROM the specified index\n    const history = await this.getHistory(topic, offset);\n    for (const event of history) {\n      if (!seen!.has(event.id)) {\n        seen!.add(event.id);\n        cb(event);\n      }\n    }\n\n    // Deduplication only needed during replay/live overlap — null out to free memory\n    seen = null;\n  }\n\n  /**\n   * Unsubscribe from a topic.\n   */\n  async unsubscribe(topic: string, cb: EventCallback): Promise<void> {\n    const wrappedCb = this.callbackMap.get(cb) ?? cb;\n    this.callbackMap.delete(cb);\n    await this.inner.unsubscribe(topic, wrappedCb);\n  }\n\n  /**\n   * Get historical events for a topic from cache.\n   */\n  async getHistory(topic: string, offset: number = 0): Promise<Event[]> {\n    const cacheKey = this.getCacheKey(topic);\n    const events = await this.cache.listFromTo(cacheKey, offset);\n    return events as Event[];\n  }\n\n  /**\n   * Flush any pending operations.\n   */\n  async flush(): Promise<void> {\n    await this.inner.flush();\n  }\n\n  /**\n   * Clear cached events for a specific topic.\n   * Call this when a stream completes to free memory.\n   * Also clears the index counter.\n   */\n  async clearTopic(topic: string): Promise<void> {\n    const cacheKey = this.getCacheKey(topic);\n    const counterKey = this.getCounterKey(topic);\n    await Promise.all([this.cache.delete(cacheKey), this.cache.delete(counterKey)]);\n  }\n\n  /**\n   * Get the inner PubSub instance.\n   * Useful for accessing implementation-specific methods like close().\n   */\n  getInner(): PubSub {\n    return this.inner;\n  }\n}\n\n/**\n * Factory function to wrap a PubSub with caching capabilities.\n *\n * @example\n * ```typescript\n * import { withCaching, EventEmitterPubSub } from '@mastra/core/events';\n * import { InMemoryServerCache } from '@mastra/core/cache';\n *\n * const cache = new InMemoryServerCache();\n * const pubsub = withCaching(new EventEmitterPubSub(), cache);\n * ```\n */\nexport function withCaching(pubsub: PubSub, cache: MastraServerCache, options?: CachingPubSubOptions): CachingPubSub {\n  return new CachingPubSub(pubsub, cache, options);\n}\n"]}