{"version":3,"sources":["../src/workflows/scheduler/scheduler.ts"],"names":["MastraBase","RegisteredLogger","computeNextFireAt"],"mappings":";;;;;;;AAOA,IAAM,eAAA,GAAkB,WAAA;AACxB,IAAM,wBAAA,GAA2B,GAAA;AACjC,IAAM,kBAAA,GAAqB,GAAA;AAC3B,IAAM,4BAAA,GAA+B,CAAA;AAgB9B,IAAM,iBAAA,GAAN,cAAgCA,4BAAA,CAAW;AAAA,EAChD,eAAA;AAAA,EACA,OAAA;AAAA,EACA,OAAA;AAAA,EAEA,eAAA;AAAA,EACA,aAAA;AAAA,EACA,QAAA,GAAW,KAAA;AAAA,EACX,SAAA,GAAY,KAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAQZ,sBAAA,uBAA6B,GAAA,EAAoB;AAAA,EAEjD,WAAA,CAAY;AAAA,IACV,cAAA;AAAA,IACA,MAAA;AAAA,IACA;AAAA,GACF,EAIG;AACD,IAAA,KAAA,CAAM,EAAE,SAAA,EAAWC,kCAAA,CAAiB,QAAA,EAAU,IAAA,EAAM,qBAAqB,CAAA;AACzE,IAAA,IAAA,CAAK,eAAA,GAAkB,cAAA;AACvB,IAAA,IAAA,CAAK,OAAA,GAAU,MAAA;AACf,IAAA,IAAA,CAAK,OAAA,GAAU;AAAA,MACb,GAAG,MAAA;AAAA,MACH,cAAA,EAAgB,QAAQ,cAAA,IAAkB,wBAAA;AAAA,MAC1C,SAAA,EAAW,QAAQ,SAAA,IAAa;AAAA,KAClC;AAAA,EACF;AAAA;AAAA,EAGA,MAAM,KAAA,GAAuB;AAC3B,IAAA,IAAI,KAAK,QAAA,EAAU;AACnB,IAAA,IAAA,CAAK,QAAA,GAAW,IAAA;AAChB,IAAA,IAAA,CAAK,SAAA,GAAY,KAAA;AAIjB,IAAA,IAAA,CAAK,uBAAuB,KAAA,EAAM;AAElC,IAAA,IAAI;AAEF,MAAA,MAAM,KAAK,QAAA,EAAS;AAIpB,MAAA,IAAI,IAAA,CAAK,SAAA,IAAa,CAAC,IAAA,CAAK,QAAA,EAAU;AAEtC,MAAA,IAAA,CAAK,eAAA,GAAkB,YAAY,MAAM;AAKvC,QAAA,KAAK,IAAA,CAAK,QAAA,EAAS,CAAE,KAAA,CAAM,CAAA,GAAA,KAAO;AAChC,UAAA,IAAA,CAAK,OAAO,KAAA,CAAM,gCAAA,EAAkC,EAAE,KAAA,EAAO,KAAK,CAAA;AAAA,QACpE,CAAC,CAAA;AAAA,MACH,CAAA,EAAG,IAAA,CAAK,OAAA,CAAQ,cAAc,CAAA;AAAA,IAChC,SAAS,GAAA,EAAK;AAIZ,MAAA,IAAA,CAAK,QAAA,GAAW,KAAA;AAChB,MAAA,IAAA,CAAK,SAAA,GAAY,KAAA;AACjB,MAAA,MAAM,GAAA;AAAA,IACR;AAAA,EACF;AAAA;AAAA,EAGA,MAAM,IAAA,GAAsB;AAC1B,IAAA,IAAI,CAAC,KAAK,QAAA,EAAU;AACpB,IAAA,IAAA,CAAK,SAAA,GAAY,IAAA;AAEjB,IAAA,IAAI,KAAK,eAAA,EAAiB;AACxB,MAAA,aAAA,CAAc,KAAK,eAAe,CAAA;AAClC,MAAA,IAAA,CAAK,eAAA,GAAkB,MAAA;AAAA,IACzB;AAEA,IAAA,IAAI,KAAK,aAAA,EAAe;AACtB,MAAA,IAAI;AACF,QAAA,MAAM,IAAA,CAAK,aAAA;AAAA,MACb,CAAA,CAAA,MAAQ;AAAA,MAER;AAAA,IACF;AAEA,IAAA,IAAA,CAAK,QAAA,GAAW,KAAA;AAChB,IAAA,IAAA,CAAK,SAAA,GAAY,KAAA;AAAA,EACnB;AAAA;AAAA,EAGA,IAAI,SAAA,GAAqB;AACvB,IAAA,OAAO,IAAA,CAAK,QAAA;AAAA,EACd;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,MAAM,IAAA,GAAsB;AAC1B,IAAA,MAAM,KAAK,QAAA,EAAS;AAAA,EACtB;AAAA;AAAA,EAIA,MAAM,QAAA,GAA0B;AAC9B,IAAA,IAAI,IAAA,CAAK,SAAA,IAAa,IAAA,CAAK,aAAA,EAAe;AAC1C,IAAA,MAAM,OAAA,GAAU,IAAA,CAAK,YAAA,EAAa,CAAE,QAAQ,MAAM;AAChD,MAAA,IAAA,CAAK,aAAA,GAAgB,MAAA;AAAA,IACvB,CAAC,CAAA;AACD,IAAA,IAAA,CAAK,aAAA,GAAgB,OAAA;AACrB,IAAA,MAAM,OAAA;AAAA,EACR;AAAA,EAEA,MAAM,YAAA,GAA8B;AAClC,IAAA,IAAI,GAAA;AACJ,IAAA,IAAI;AACF,MAAA,GAAA,GAAM,MAAM,KAAK,eAAA,CAAgB,gBAAA,CAAiB,KAAK,GAAA,EAAI,EAAG,IAAA,CAAK,OAAA,CAAQ,SAAS,CAAA;AAAA,IACtF,SAAS,GAAA,EAAK;AACZ,MAAA,IAAA,CAAK,OAAO,KAAA,CAAM,8BAAA,EAAgC,EAAE,KAAA,EAAO,KAAK,CAAA;AAChE,MAAA;AAAA,IACF;AAEA,IAAA,KAAA,MAAW,YAAY,GAAA,EAAK;AAC1B,MAAA,IAAI,KAAK,SAAA,EAAW;AACpB,MAAA,MAAM,IAAA,CAAK,cAAc,QAAQ,CAAA;AAAA,IACnC;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAaA,MAAM,sBAAsB,QAAA,EAAsC;AAChE,IAAA,MAAM,SAAA,GAAY,KAAK,OAAA,CAAQ,oBAAA;AAC/B,IAAA,IAAI,CAAC,WAAW,OAAO,IAAA;AACvB,IAAA,IAAI,QAAA,CAAS,MAAA,CAAO,IAAA,KAAS,UAAA,EAAY,OAAO,IAAA;AAEhD,IAAA,MAAM,UAAA,GAAa,SAAS,MAAA,CAAO,UAAA;AACnC,IAAA,IAAI,SAAA,CAAU,UAAU,CAAA,EAAG;AACzB,MAAA,IAAA,CAAK,sBAAA,CAAuB,MAAA,CAAO,QAAA,CAAS,EAAE,CAAA;AAC9C,MAAA,OAAO,IAAA;AAAA,IACT;AAEA,IAAA,MAAM,KAAA,GAAQ,IAAA,CAAK,OAAA,CAAQ,kBAAA,IAAsB,4BAAA;AACjD,IAAA,MAAM,OAAO,IAAA,CAAK,sBAAA,CAAuB,GAAA,CAAI,QAAA,CAAS,EAAE,CAAA,IAAK,CAAA;AAC7D,IAAA,MAAM,OAAO,IAAA,GAAO,CAAA;AAEpB,IAAA,IAAI,OAAO,KAAA,EAAO;AAChB,MAAA,IAAA,CAAK,sBAAA,CAAuB,GAAA,CAAI,QAAA,CAAS,EAAA,EAAI,IAAI,CAAA;AACjD,MAAA,IAAI,SAAS,CAAA,EAAG;AACd,QAAA,IAAA,CAAK,MAAA,CAAO,KAAK,uEAAA,EAAyE;AAAA,UACxF,YAAY,QAAA,CAAS,EAAA;AAAA,UACrB,UAAA;AAAA,UACA,kBAAA,EAAoB;AAAA,SACrB,CAAA;AAAA,MACH;AACA,MAAA,OAAO,KAAA;AAAA,IACT;AAGA,IAAA,IAAA,CAAK,MAAA,CAAO,MAAM,iEAAA,EAAmE;AAAA,MACnF,YAAY,QAAA,CAAS,EAAA;AAAA,MACrB,UAAA;AAAA,MACA,iBAAA,EAAmB;AAAA,KACpB,CAAA;AACD,IAAA,IAAI;AACF,MAAA,MAAM,IAAA,CAAK,eAAA,CAAgB,cAAA,CAAe,QAAA,CAAS,EAAE,CAAA;AAAA,IACvD,SAAS,GAAA,EAAK;AACZ,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,iCAAA,EAAmC;AAAA,QACnD,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,UAAA;AAAA,QACA,KAAA,EAAO;AAAA,OACR,CAAA;AAGD,MAAA,OAAO,KAAA;AAAA,IACT;AACA,IAAA,IAAA,CAAK,sBAAA,CAAuB,MAAA,CAAO,QAAA,CAAS,EAAE,CAAA;AAC9C,IAAA,OAAO,KAAA;AAAA,EACT;AAAA,EAEA,MAAM,cAAc,QAAA,EAAmC;AACrD,IAAA,IAAI,CAAE,MAAM,IAAA,CAAK,qBAAA,CAAsB,QAAQ,CAAA,EAAI;AAEnD,IAAA,MAAM,YAAA,GAAe,KAAK,GAAA,EAAI;AAE9B,IAAA,IAAI,aAAA;AACJ,IAAA,IAAI;AACF,MAAA,aAAA,GAAgBC,mCAAA,CAAkB,SAAS,IAAA,EAAM;AAAA,QAC/C,UAAU,QAAA,CAAS,QAAA;AAAA,QACnB,KAAA,EAAO;AAAA,OACR,CAAA;AAAA,IACH,SAAS,GAAA,EAAK;AACZ,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,+CAAA,EAAiD;AAAA,QACjE,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,MAAM,QAAA,CAAS,IAAA;AAAA,QACf,KAAA,EAAO;AAAA,OACR,CAAA;AACD,MAAA,IAAA,CAAK,YAAA,CAAa,GAAA,EAAK,QAAA,CAAS,EAAE,CAAA;AAClC,MAAA;AAAA,IACF;AAGA,IAAA,MAAM,QAAQ,CAAA,MAAA,EAAS,QAAA,CAAS,EAAE,CAAA,CAAA,EAAI,SAAS,UAAU,CAAA,CAAA;AAEzD,IAAA,IAAI,OAAA,GAAU,KAAA;AACd,IAAA,IAAI;AACF,MAAA,OAAA,GAAU,MAAM,KAAK,eAAA,CAAgB,sBAAA;AAAA,QACnC,QAAA,CAAS,EAAA;AAAA,QACT,QAAA,CAAS,UAAA;AAAA,QACT,aAAA;AAAA,QACA,YAAA;AAAA,QACA;AAAA,OACF;AAAA,IACF,SAAS,GAAA,EAAK;AACZ,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,mCAAA,EAAqC;AAAA,QACrD,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,KAAA;AAAA,QACA,KAAA,EAAO;AAAA,OACR,CAAA;AACD,MAAA,IAAA,CAAK,YAAA,CAAa,GAAA,EAAK,QAAA,CAAS,EAAE,CAAA;AAClC,MAAA;AAAA,IACF;AAEA,IAAA,IAAI,CAAC,OAAA,EAAS;AAGZ,MAAA;AAAA,IACF;AAEA,IAAA,IAAI,aAAA,GAA4C,WAAA;AAChD,IAAA,IAAI,YAAA;AAEJ,IAAA,IAAI;AACF,MAAA,MAAM,IAAA,CAAK,qBAAA,CAAsB,QAAA,EAAU,KAAK,CAAA;AAAA,IAClD,SAAS,GAAA,EAAK;AACZ,MAAA,aAAA,GAAgB,QAAA;AAChB,MAAA,YAAA,GAAe,GAAA,YAAe,KAAA,GAAQ,GAAA,CAAI,OAAA,GAAU,OAAO,GAAG,CAAA;AAC9D,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,+CAAA,EAAiD;AAAA,QACjE,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,KAAA;AAAA,QACA,KAAA,EAAO;AAAA,OACR,CAAA;AACD,MAAA,IAAA,CAAK,YAAA,CAAa,GAAA,EAAK,QAAA,CAAS,EAAE,CAAA;AAAA,IACpC;AAEA,IAAA,IAAI;AACF,MAAA,MAAM,IAAA,CAAK,gBAAgB,aAAA,CAAc;AAAA,QACvC,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,KAAA;AAAA,QACA,iBAAiB,QAAA,CAAS,UAAA;AAAA,QAC1B,YAAA;AAAA,QACA,OAAA,EAAS,aAAA;AAAA,QACT,KAAA,EAAO,YAAA;AAAA,QACP,WAAA,EAAa;AAAA,OACd,CAAA;AAAA,IACH,SAAS,GAAA,EAAK;AACZ,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,mCAAA,EAAqC;AAAA,QACrD,YAAY,QAAA,CAAS,EAAA;AAAA,QACrB,KAAA;AAAA,QACA,KAAA,EAAO;AAAA,OACR,CAAA;AAAA,IACH;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,YAAA,CAAa,OAAgB,UAAA,EAA0B;AACrD,IAAA,IAAI,CAAC,IAAA,CAAK,OAAA,CAAQ,OAAA,EAAS;AAC3B,IAAA,IAAI;AACF,MAAA,IAAA,CAAK,OAAA,CAAQ,OAAA,CAAQ,KAAA,EAAO,EAAE,YAAY,CAAA;AAAA,IAC5C,SAAS,aAAA,EAAe;AACtB,MAAA,IAAA,CAAK,MAAA,CAAO,MAAM,yCAAA,EAA2C;AAAA,QAC3D,UAAA;AAAA,QACA,KAAA,EAAO;AAAA,OACR,CAAA;AAAA,IACH;AAAA,EACF;AAAA,EAEA,MAAM,qBAAA,CAAsB,QAAA,EAAoB,KAAA,EAA8B;AAC5E,IAAA,IAAI,QAAA,CAAS,MAAA,CAAO,IAAA,KAAS,UAAA,EAAY;AACvC,MAAA,MAAM,IAAI,KAAA,CAAM,CAAA,kCAAA,EAAsC,QAAA,CAAS,MAAA,CAA4B,IAAI,CAAA,CAAE,CAAA;AAAA,IACnG;AAEA,IAAA,MAAM,EAAE,UAAA,EAAY,SAAA,EAAW,YAAA,EAAc,cAAA,KAAmB,QAAA,CAAS,MAAA;AAEzE,IAAA,MAAM,IAAA,CAAK,OAAA,CAAQ,OAAA,CAAQ,eAAA,EAAiB;AAAA,MAC1C,IAAA,EAAM,gBAAA;AAAA,MACN,KAAA;AAAA,MACA,IAAA,EAAM;AAAA,QACJ,UAAA;AAAA,QACA,KAAA;AAAA,QACA,YAAY,EAAE,MAAA,EAAQ,WAAW,MAAA,EAAQ,SAAA,IAAa,EAAC,EAAE;AAAA,QACzD,cAAA,EAAgB,kBAAkB,EAAC;AAAA,QACnC,YAAA,EAAc,gBAAgB;AAAC;AACjC,KACD,CAAA;AAAA,EACH;AACF","file":"chunk-B3HPYVQP.cjs","sourcesContent":["import { MastraBase } from '../../base';\nimport type { PubSub } from '../../events/pubsub';\nimport { RegisteredLogger } from '../../logger/constants';\nimport type { Schedule, ScheduleTrigger, SchedulesStorage } from '../../storage/domains/schedules/base';\nimport { computeNextFireAt } from './cron';\nimport type { WorkflowSchedulerConfig } from './types';\n\nconst TOPIC_WORKFLOWS = 'workflows';\nconst DEFAULT_TICK_INTERVAL_MS = 10_000;\nconst DEFAULT_BATCH_SIZE = 100;\nconst DEFAULT_MISSES_BEFORE_DELETE = 3;\n\n/**\n * Drives cron-based workflow triggers.\n *\n * On each tick the scheduler:\n *  1. Loads schedules whose `nextFireAt <= now` from storage.\n *  2. Computes the next fire time from the cron expression.\n *  3. Atomically advances `nextFireAt` via compare-and-swap. Only one\n *     instance across many polling the same storage can claim a fire.\n *  4. Publishes a `workflow.start` event on the `workflows` pubsub topic.\n *  5. Records the trigger in the schedule's history.\n *\n * The scheduler does **not** execute workflows. The existing\n * `WorkflowEventProcessor` consumes `workflow.start` events and runs them.\n */\nexport class WorkflowScheduler extends MastraBase {\n  #schedulesStore: SchedulesStorage;\n  #pubsub: PubSub;\n  #config: Required<Pick<WorkflowSchedulerConfig, 'tickIntervalMs' | 'batchSize'>> & WorkflowSchedulerConfig;\n\n  #intervalHandle?: ReturnType<typeof setInterval>;\n  #inflightTick?: Promise<void>;\n  #started = false;\n  #stopping = false;\n\n  /**\n   * Per-schedule count of consecutive ticks where the target workflow was\n   * not registered with the host Mastra instance. Reset when the workflow\n   * resolves or the schedule is deleted. Used to ride out deploy/startup\n   * ordering races before reclaiming a ghost row.\n   */\n  #missingWorkflowCounts = new Map<string, number>();\n\n  constructor({\n    schedulesStore,\n    pubsub,\n    config,\n  }: {\n    schedulesStore: SchedulesStorage;\n    pubsub: PubSub;\n    config?: WorkflowSchedulerConfig;\n  }) {\n    super({ component: RegisteredLogger.WORKFLOW, name: 'WorkflowScheduler' });\n    this.#schedulesStore = schedulesStore;\n    this.#pubsub = pubsub;\n    this.#config = {\n      ...config,\n      tickIntervalMs: config?.tickIntervalMs ?? DEFAULT_TICK_INTERVAL_MS,\n      batchSize: config?.batchSize ?? DEFAULT_BATCH_SIZE,\n    };\n  }\n\n  /** Start the periodic tick loop. Runs an immediate tick first. */\n  async start(): Promise<void> {\n    if (this.#started) return;\n    this.#started = true;\n    this.#stopping = false;\n    // Fresh process / fresh grace window — old miss counts shouldn't carry\n    // over into a new start() since the workflow registry may now look\n    // different.\n    this.#missingWorkflowCounts.clear();\n\n    try {\n      // Run one tick immediately so newly-due schedules don't wait the full interval.\n      await this.#runTick();\n\n      // If stop() ran concurrently with the warm-up tick, don't arm a new\n      // interval afterwards — the caller has already asked us to shut down.\n      if (this.#stopping || !this.#started) return;\n\n      this.#intervalHandle = setInterval(() => {\n        // Swallow rejections here so a tick failure can't surface as an\n        // unhandled promise rejection and crash the host process. #processTick\n        // already logs its own errors and notifies onError, so we only need a\n        // belt-and-braces logger.error for anything that escapes.\n        void this.#runTick().catch(err => {\n          this.logger.error('WorkflowScheduler tick crashed', { error: err });\n        });\n      }, this.#config.tickIntervalMs);\n    } catch (err) {\n      // Reset state so a future start() can retry. Without this, a failed\n      // warm-up tick would leave #started=true with no interval armed and\n      // every subsequent start() call would silently no-op.\n      this.#started = false;\n      this.#stopping = false;\n      throw err;\n    }\n  }\n\n  /** Stop the tick loop and wait for any in-flight tick to finish. */\n  async stop(): Promise<void> {\n    if (!this.#started) return;\n    this.#stopping = true;\n\n    if (this.#intervalHandle) {\n      clearInterval(this.#intervalHandle);\n      this.#intervalHandle = undefined;\n    }\n\n    if (this.#inflightTick) {\n      try {\n        await this.#inflightTick;\n      } catch {\n        // tick errors are already logged; swallow during shutdown\n      }\n    }\n\n    this.#started = false;\n    this.#stopping = false;\n  }\n\n  /** True when the scheduler is currently running its tick loop. */\n  get isRunning(): boolean {\n    return this.#started;\n  }\n\n  /**\n   * Run a single tick. Public for tests; production callers should rely\n   * on the interval started by `start()`.\n   */\n  async tick(): Promise<void> {\n    await this.#runTick();\n  }\n\n  // -------- Internals --------\n\n  async #runTick(): Promise<void> {\n    if (this.#stopping || this.#inflightTick) return;\n    const promise = this.#processTick().finally(() => {\n      this.#inflightTick = undefined;\n    });\n    this.#inflightTick = promise;\n    await promise;\n  }\n\n  async #processTick(): Promise<void> {\n    let due: Schedule[];\n    try {\n      due = await this.#schedulesStore.listDueSchedules(Date.now(), this.#config.batchSize);\n    } catch (err) {\n      this.logger.error('Failed to list due schedules', { error: err });\n      return;\n    }\n\n    for (const schedule of due) {\n      if (this.#stopping) break;\n      await this.#fireSchedule(schedule);\n    }\n  }\n\n  /**\n   * Check whether a schedule's target workflow is registered with the host\n   * Mastra instance. Returns `true` if no predicate is configured (we can't\n   * verify, so assume the consumer will reject) or if the workflow resolves.\n   *\n   * When the workflow is missing, we increment an in-memory counter and\n   * delete the schedule after `missesBeforeDelete` consecutive misses. The\n   * grace window protects against deploy/startup ordering races where the\n   * scheduler ticks before workflows finish registering on a fresh process.\n   * Returns `false` to tell `#fireSchedule` to skip publishing for this tick.\n   */\n  async #ensureWorkflowExists(schedule: Schedule): Promise<boolean> {\n    const predicate = this.#config.isWorkflowRegistered;\n    if (!predicate) return true;\n    if (schedule.target.type !== 'workflow') return true;\n\n    const workflowId = schedule.target.workflowId;\n    if (predicate(workflowId)) {\n      this.#missingWorkflowCounts.delete(schedule.id);\n      return true;\n    }\n\n    const limit = this.#config.missesBeforeDelete ?? DEFAULT_MISSES_BEFORE_DELETE;\n    const prev = this.#missingWorkflowCounts.get(schedule.id) ?? 0;\n    const next = prev + 1;\n\n    if (next < limit) {\n      this.#missingWorkflowCounts.set(schedule.id, next);\n      if (prev === 0) {\n        this.logger.warn('Schedule target workflow is not registered; skipping until it appears', {\n          scheduleId: schedule.id,\n          workflowId,\n          missesBeforeDelete: limit,\n        });\n      }\n      return false;\n    }\n\n    // Hit the grace limit — reclaim the row.\n    this.logger.error('Deleting schedule whose target workflow has not been registered', {\n      scheduleId: schedule.id,\n      workflowId,\n      consecutiveMisses: next,\n    });\n    try {\n      await this.#schedulesStore.deleteSchedule(schedule.id);\n    } catch (err) {\n      this.logger.error('Failed to delete ghost schedule', {\n        scheduleId: schedule.id,\n        workflowId,\n        error: err,\n      });\n      // Keep the counter so we try again next tick rather than reset and\n      // start the grace window over.\n      return false;\n    }\n    this.#missingWorkflowCounts.delete(schedule.id);\n    return false;\n  }\n\n  async #fireSchedule(schedule: Schedule): Promise<void> {\n    if (!(await this.#ensureWorkflowExists(schedule))) return;\n\n    const actualFireAt = Date.now();\n\n    let newNextFireAt: number;\n    try {\n      newNextFireAt = computeNextFireAt(schedule.cron, {\n        timezone: schedule.timezone,\n        after: actualFireAt,\n      });\n    } catch (err) {\n      this.logger.error('Failed to compute next fire time for schedule', {\n        scheduleId: schedule.id,\n        cron: schedule.cron,\n        error: err,\n      });\n      this.#notifyError(err, schedule.id);\n      return;\n    }\n\n    // Deterministic runId so concurrent ticks across processes derive the same id.\n    const runId = `sched_${schedule.id}_${schedule.nextFireAt}`;\n\n    let claimed = false;\n    try {\n      claimed = await this.#schedulesStore.updateScheduleNextFire(\n        schedule.id,\n        schedule.nextFireAt,\n        newNextFireAt,\n        actualFireAt,\n        runId,\n      );\n    } catch (err) {\n      this.logger.error('Failed to claim due schedule fire', {\n        scheduleId: schedule.id,\n        runId,\n        error: err,\n      });\n      this.#notifyError(err, schedule.id);\n      return;\n    }\n\n    if (!claimed) {\n      // Another instance won the race, the row was paused/disabled, or the\n      // expected nextFireAt no longer matches. Skip publishing.\n      return;\n    }\n\n    let triggerStatus: ScheduleTrigger['outcome'] = 'published';\n    let triggerError: string | undefined;\n\n    try {\n      await this.#publishWorkflowStart(schedule, runId);\n    } catch (err) {\n      triggerStatus = 'failed';\n      triggerError = err instanceof Error ? err.message : String(err);\n      this.logger.error('Failed to publish workflow.start for schedule', {\n        scheduleId: schedule.id,\n        runId,\n        error: err,\n      });\n      this.#notifyError(err, schedule.id);\n    }\n\n    try {\n      await this.#schedulesStore.recordTrigger({\n        scheduleId: schedule.id,\n        runId,\n        scheduledFireAt: schedule.nextFireAt,\n        actualFireAt,\n        outcome: triggerStatus,\n        error: triggerError,\n        triggerKind: 'schedule-fire',\n      });\n    } catch (err) {\n      this.logger.error('Failed to record schedule trigger', {\n        scheduleId: schedule.id,\n        runId,\n        error: err,\n      });\n    }\n  }\n\n  /**\n   * Invoke the user-supplied onError hook in isolation. A throwing hook\n   * must not abort the scheduler tick loop, so we swallow + log any error\n   * the callback itself raises.\n   */\n  #notifyError(error: unknown, scheduleId: string): void {\n    if (!this.#config.onError) return;\n    try {\n      this.#config.onError(error, { scheduleId });\n    } catch (callbackError) {\n      this.logger.error('WorkflowScheduler onError handler threw', {\n        scheduleId,\n        error: callbackError,\n      });\n    }\n  }\n\n  async #publishWorkflowStart(schedule: Schedule, runId: string): Promise<void> {\n    if (schedule.target.type !== 'workflow') {\n      throw new Error(`Unsupported schedule target type: ${(schedule.target as { type: string }).type}`);\n    }\n\n    const { workflowId, inputData, initialState, requestContext } = schedule.target;\n\n    await this.#pubsub.publish(TOPIC_WORKFLOWS, {\n      type: 'workflow.start',\n      runId,\n      data: {\n        workflowId,\n        runId,\n        prevResult: { status: 'success', output: inputData ?? {} },\n        requestContext: requestContext ?? {},\n        initialState: initialState ?? {},\n      },\n    });\n  }\n}\n"]}