{"version":3,"file":"scheduler.cjs","sources":["../../src/scheduler.ts"],"sourcesContent":["/**\n * Identifier used to scope scheduled work. Maps to a transaction id for live queries.\n */\nexport type SchedulerContextId = string | symbol\n\n/**\n * Options for {@link Scheduler.schedule}. Jobs are identified by `jobId` within a context\n * and may declare dependencies.\n */\ninterface ScheduleOptions {\n  contextId?: SchedulerContextId\n  jobId: unknown\n  dependencies?: Iterable<unknown>\n  run: () => void\n}\n\n/**\n * State per context. Queue preserves order, jobs hold run functions, dependencies track\n * prerequisites, and completed records which jobs have run during the current flush.\n */\ninterface SchedulerContextState {\n  queue: Array<unknown>\n  jobs: Map<unknown, () => void>\n  dependencies: Map<unknown, Set<unknown>>\n  completed: Set<unknown>\n}\n\ninterface PendingAwareJob {\n  hasPendingGraphRun: (contextId: SchedulerContextId) => boolean\n}\n\nfunction isPendingAwareJob(dep: any): dep is PendingAwareJob {\n  return (\n    typeof dep === `object` &&\n    dep !== null &&\n    typeof dep.hasPendingGraphRun === `function`\n  )\n}\n\n/**\n * Scoped scheduler that coalesces work by context and job.\n *\n * - **context** (e.g. transaction id) defines the batching boundary; work is queued until flushed.\n * - **job id** deduplicates work within a context; scheduling the same job replaces the previous run function.\n * - Without a context id, work executes immediately.\n *\n * Callers manage their own state; the scheduler only orchestrates execution order.\n */\nexport class Scheduler {\n  private contexts = new Map<SchedulerContextId, SchedulerContextState>()\n  private clearListeners = new Set<(contextId: SchedulerContextId) => void>()\n\n  /**\n   * Get or create the state bucket for a context.\n   */\n  private getOrCreateContext(\n    contextId: SchedulerContextId,\n  ): SchedulerContextState {\n    let context = this.contexts.get(contextId)\n    if (!context) {\n      context = {\n        queue: [],\n        jobs: new Map(),\n        dependencies: new Map(),\n        completed: new Set(),\n      }\n      this.contexts.set(contextId, context)\n    }\n    return context\n  }\n\n  /**\n   * Schedule work. Without a context id, executes immediately.\n   * Otherwise queues the job to be flushed once dependencies are satisfied.\n   * Scheduling the same jobId again replaces the previous run function.\n   */\n  schedule({ contextId, jobId, dependencies, run }: ScheduleOptions): void {\n    if (typeof contextId === `undefined`) {\n      run()\n      return\n    }\n\n    const context = this.getOrCreateContext(contextId)\n\n    // If this is a new job, add it to the queue\n    if (!context.jobs.has(jobId)) {\n      context.queue.push(jobId)\n    }\n\n    // Store or replace the run function\n    context.jobs.set(jobId, run)\n\n    // Update dependencies\n    if (dependencies) {\n      const depSet = new Set<unknown>(dependencies)\n      depSet.delete(jobId)\n      context.dependencies.set(jobId, depSet)\n    } else if (!context.dependencies.has(jobId)) {\n      context.dependencies.set(jobId, new Set())\n    }\n\n    // Clear completion status since we're rescheduling\n    context.completed.delete(jobId)\n  }\n\n  /**\n   * Flush all queued work for a context. Jobs with unmet dependencies are retried.\n   * Throws if a pass completes without running any job (dependency cycle).\n   */\n  flush(contextId: SchedulerContextId): void {\n    const context = this.contexts.get(contextId)\n    if (!context) return\n\n    const { queue, jobs, dependencies, completed } = context\n\n    while (queue.length > 0) {\n      let ranThisPass = false\n      const jobsThisPass = queue.length\n\n      for (let i = 0; i < jobsThisPass; i++) {\n        const jobId = queue.shift()!\n        const run = jobs.get(jobId)\n        if (!run) {\n          dependencies.delete(jobId)\n          completed.delete(jobId)\n          continue\n        }\n\n        const deps = dependencies.get(jobId)\n        let ready = !deps\n        if (deps) {\n          ready = true\n          for (const dep of deps) {\n            if (dep === jobId) continue\n\n            const depHasPending =\n              isPendingAwareJob(dep) && dep.hasPendingGraphRun(contextId)\n\n            // Treat dependencies as blocking if the dep has a pending run in this\n            // context or if it's enqueued and not yet complete. If the dep is\n            // neither pending nor enqueued, consider it satisfied to avoid deadlocks\n            // on lazy sources that never schedule work.\n            if (\n              (jobs.has(dep) && !completed.has(dep)) ||\n              (!jobs.has(dep) && depHasPending)\n            ) {\n              ready = false\n              break\n            }\n          }\n        }\n\n        if (ready) {\n          jobs.delete(jobId)\n          dependencies.delete(jobId)\n          // Run the job. If it throws, we don't mark it complete, allowing the\n          // error to propagate while maintaining scheduler state consistency.\n          run()\n          completed.add(jobId)\n          ranThisPass = true\n        } else {\n          queue.push(jobId)\n        }\n      }\n\n      if (!ranThisPass) {\n        throw new Error(\n          `Scheduler detected unresolved dependencies for context ${String(\n            contextId,\n          )}.`,\n        )\n      }\n    }\n\n    this.contexts.delete(contextId)\n  }\n\n  /**\n   * Flush all contexts with pending work. Useful during tear-down.\n   */\n  flushAll(): void {\n    for (const contextId of Array.from(this.contexts.keys())) {\n      this.flush(contextId)\n    }\n  }\n\n  /** Clear all scheduled jobs for a context. */\n  clear(contextId: SchedulerContextId): void {\n    this.contexts.delete(contextId)\n    // Notify listeners that this context was cleared\n    this.clearListeners.forEach((listener) => listener(contextId))\n  }\n\n  /** Register a listener to be notified when a context is cleared. */\n  onClear(listener: (contextId: SchedulerContextId) => void): () => void {\n    this.clearListeners.add(listener)\n    return () => this.clearListeners.delete(listener)\n  }\n\n  /** Check if a context has pending jobs. */\n  hasPendingJobs(contextId: SchedulerContextId): boolean {\n    const context = this.contexts.get(contextId)\n    return !!context && context.jobs.size > 0\n  }\n\n  /** Remove a single job from a context and clean up its dependencies. */\n  clearJob(contextId: SchedulerContextId, jobId: unknown): void {\n    const context = this.contexts.get(contextId)\n    if (!context) return\n\n    context.jobs.delete(jobId)\n    context.dependencies.delete(jobId)\n    context.completed.delete(jobId)\n    context.queue = context.queue.filter((id) => id !== jobId)\n\n    if (context.jobs.size === 0) {\n      this.contexts.delete(contextId)\n    }\n  }\n}\n\nexport const transactionScopedScheduler = new Scheduler()\n"],"names":[],"mappings":";;AA+BA,SAAS,kBAAkB,KAAkC;AAC3D,SACE,OAAO,QAAQ,YACf,QAAQ,QACR,OAAO,IAAI,uBAAuB;AAEtC;AAWO,MAAM,UAAU;AAAA,EAAhB,cAAA;AACL,SAAQ,+BAAe,IAAA;AACvB,SAAQ,qCAAqB,IAAA;AAAA,EAA6C;AAAA;AAAA;AAAA;AAAA,EAKlE,mBACN,WACuB;AACvB,QAAI,UAAU,KAAK,SAAS,IAAI,SAAS;AACzC,QAAI,CAAC,SAAS;AACZ,gBAAU;AAAA,QACR,OAAO,CAAA;AAAA,QACP,0BAAU,IAAA;AAAA,QACV,kCAAkB,IAAA;AAAA,QAClB,+BAAe,IAAA;AAAA,MAAI;AAErB,WAAK,SAAS,IAAI,WAAW,OAAO;AAAA,IACtC;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,SAAS,EAAE,WAAW,OAAO,cAAc,OAA8B;AACvE,QAAI,OAAO,cAAc,aAAa;AACpC,UAAA;AACA;AAAA,IACF;AAEA,UAAM,UAAU,KAAK,mBAAmB,SAAS;AAGjD,QAAI,CAAC,QAAQ,KAAK,IAAI,KAAK,GAAG;AAC5B,cAAQ,MAAM,KAAK,KAAK;AAAA,IAC1B;AAGA,YAAQ,KAAK,IAAI,OAAO,GAAG;AAG3B,QAAI,cAAc;AAChB,YAAM,SAAS,IAAI,IAAa,YAAY;AAC5C,aAAO,OAAO,KAAK;AACnB,cAAQ,aAAa,IAAI,OAAO,MAAM;AAAA,IACxC,WAAW,CAAC,QAAQ,aAAa,IAAI,KAAK,GAAG;AAC3C,cAAQ,aAAa,IAAI,OAAO,oBAAI,KAAK;AAAA,IAC3C;AAGA,YAAQ,UAAU,OAAO,KAAK;AAAA,EAChC;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,MAAM,WAAqC;AACzC,UAAM,UAAU,KAAK,SAAS,IAAI,SAAS;AAC3C,QAAI,CAAC,QAAS;AAEd,UAAM,EAAE,OAAO,MAAM,cAAc,cAAc;AAEjD,WAAO,MAAM,SAAS,GAAG;AACvB,UAAI,cAAc;AAClB,YAAM,eAAe,MAAM;AAE3B,eAAS,IAAI,GAAG,IAAI,cAAc,KAAK;AACrC,cAAM,QAAQ,MAAM,MAAA;AACpB,cAAM,MAAM,KAAK,IAAI,KAAK;AAC1B,YAAI,CAAC,KAAK;AACR,uBAAa,OAAO,KAAK;AACzB,oBAAU,OAAO,KAAK;AACtB;AAAA,QACF;AAEA,cAAM,OAAO,aAAa,IAAI,KAAK;AACnC,YAAI,QAAQ,CAAC;AACb,YAAI,MAAM;AACR,kBAAQ;AACR,qBAAW,OAAO,MAAM;AACtB,gBAAI,QAAQ,MAAO;AAEnB,kBAAM,gBACJ,kBAAkB,GAAG,KAAK,IAAI,mBAAmB,SAAS;AAM5D,gBACG,KAAK,IAAI,GAAG,KAAK,CAAC,UAAU,IAAI,GAAG,KACnC,CAAC,KAAK,IAAI,GAAG,KAAK,eACnB;AACA,sBAAQ;AACR;AAAA,YACF;AAAA,UACF;AAAA,QACF;AAEA,YAAI,OAAO;AACT,eAAK,OAAO,KAAK;AACjB,uBAAa,OAAO,KAAK;AAGzB,cAAA;AACA,oBAAU,IAAI,KAAK;AACnB,wBAAc;AAAA,QAChB,OAAO;AACL,gBAAM,KAAK,KAAK;AAAA,QAClB;AAAA,MACF;AAEA,UAAI,CAAC,aAAa;AAChB,cAAM,IAAI;AAAA,UACR,0DAA0D;AAAA,YACxD;AAAA,UAAA,CACD;AAAA,QAAA;AAAA,MAEL;AAAA,IACF;AAEA,SAAK,SAAS,OAAO,SAAS;AAAA,EAChC;AAAA;AAAA;AAAA;AAAA,EAKA,WAAiB;AACf,eAAW,aAAa,MAAM,KAAK,KAAK,SAAS,KAAA,CAAM,GAAG;AACxD,WAAK,MAAM,SAAS;AAAA,IACtB;AAAA,EACF;AAAA;AAAA,EAGA,MAAM,WAAqC;AACzC,SAAK,SAAS,OAAO,SAAS;AAE9B,SAAK,eAAe,QAAQ,CAAC,aAAa,SAAS,SAAS,CAAC;AAAA,EAC/D;AAAA;AAAA,EAGA,QAAQ,UAA+D;AACrE,SAAK,eAAe,IAAI,QAAQ;AAChC,WAAO,MAAM,KAAK,eAAe,OAAO,QAAQ;AAAA,EAClD;AAAA;AAAA,EAGA,eAAe,WAAwC;AACrD,UAAM,UAAU,KAAK,SAAS,IAAI,SAAS;AAC3C,WAAO,CAAC,CAAC,WAAW,QAAQ,KAAK,OAAO;AAAA,EAC1C;AAAA;AAAA,EAGA,SAAS,WAA+B,OAAsB;AAC5D,UAAM,UAAU,KAAK,SAAS,IAAI,SAAS;AAC3C,QAAI,CAAC,QAAS;AAEd,YAAQ,KAAK,OAAO,KAAK;AACzB,YAAQ,aAAa,OAAO,KAAK;AACjC,YAAQ,UAAU,OAAO,KAAK;AAC9B,YAAQ,QAAQ,QAAQ,MAAM,OAAO,CAAC,OAAO,OAAO,KAAK;AAEzD,QAAI,QAAQ,KAAK,SAAS,GAAG;AAC3B,WAAK,SAAS,OAAO,SAAS;AAAA,IAChC;AAAA,EACF;AACF;AAEO,MAAM,6BAA6B,IAAI,UAAA;;;"}