{"version":3,"file":"TransactionExecutor.cjs","sources":["../../../src/executor/TransactionExecutor.ts"],"sourcesContent":["import { createTransaction } from '@tanstack/db'\nimport { DefaultRetryPolicy } from '../retry/RetryPolicy'\nimport { NonRetriableError } from '../types'\nimport { withNestedSpan } from '../telemetry/tracer'\nimport type { KeyScheduler } from './KeyScheduler'\nimport type { OutboxManager } from '../outbox/OutboxManager'\nimport type {\n  OfflineConfig,\n  OfflineTransaction,\n  TransactionSignaler,\n} from '../types'\n\nconst HANDLED_EXECUTION_ERROR = Symbol(`HandledExecutionError`)\n\nexport class TransactionExecutor {\n  private scheduler: KeyScheduler\n  private outbox: OutboxManager\n  private config: OfflineConfig\n  private retryPolicy: DefaultRetryPolicy\n  private isExecuting = false\n  private executionPromise: Promise<void> | null = null\n  private offlineExecutor: TransactionSignaler\n  private retryTimer: ReturnType<typeof setTimeout> | null = null\n\n  constructor(\n    scheduler: KeyScheduler,\n    outbox: OutboxManager,\n    config: OfflineConfig,\n    offlineExecutor: TransactionSignaler,\n  ) {\n    this.scheduler = scheduler\n    this.outbox = outbox\n    this.config = config\n    this.retryPolicy = new DefaultRetryPolicy(\n      Number.POSITIVE_INFINITY,\n      config.jitter ?? true,\n    )\n    this.offlineExecutor = offlineExecutor\n  }\n\n  async execute(transaction: OfflineTransaction): Promise<void> {\n    this.scheduler.schedule(transaction)\n    await this.executeAll()\n  }\n\n  async executeAll(): Promise<void> {\n    if (this.isExecuting) {\n      return this.executionPromise!\n    }\n\n    this.isExecuting = true\n    this.executionPromise = this.runExecution()\n\n    try {\n      await this.executionPromise\n    } finally {\n      this.isExecuting = false\n      this.executionPromise = null\n    }\n  }\n\n  private async runExecution(): Promise<void> {\n    while (this.scheduler.getPendingCount() > 0) {\n      if (!this.isOnline()) {\n        break\n      }\n\n      const transaction = this.scheduler.getNext()\n\n      if (!transaction) {\n        break\n      }\n\n      await this.executeTransaction(transaction)\n    }\n\n    // Schedule next retry after execution completes\n    this.scheduleNextRetry()\n  }\n\n  private async executeTransaction(\n    transaction: OfflineTransaction,\n  ): Promise<void> {\n    try {\n      await withNestedSpan(\n        `transaction.execute`,\n        {\n          'transaction.id': transaction.id,\n          'transaction.mutationFnName': transaction.mutationFnName,\n          'transaction.retryCount': transaction.retryCount,\n          'transaction.keyCount': transaction.keys.length,\n        },\n        async (span) => {\n          this.scheduler.markStarted(transaction)\n\n          if (transaction.retryCount > 0) {\n            span.setAttribute(`retry.attempt`, transaction.retryCount)\n          }\n\n          try {\n            const result = await this.runMutationFn(transaction)\n\n            this.scheduler.markCompleted(transaction)\n            await this.outbox.remove(transaction.id)\n\n            span.setAttribute(`result`, `success`)\n            this.offlineExecutor.resolveTransaction(transaction.id, result)\n          } catch (error) {\n            const err =\n              error instanceof Error ? error : new Error(String(error))\n\n            span.setAttribute(`result`, `error`)\n\n            await this.handleError(transaction, err)\n            ;(err as any)[HANDLED_EXECUTION_ERROR] = true\n            throw err\n          }\n        },\n      )\n    } catch (error) {\n      if (\n        error instanceof Error &&\n        (error as any)[HANDLED_EXECUTION_ERROR] === true\n      ) {\n        return\n      }\n\n      throw error\n    }\n  }\n\n  private async runMutationFn(transaction: OfflineTransaction): Promise<void> {\n    const mutationFn = this.config.mutationFns[transaction.mutationFnName]\n\n    if (!mutationFn) {\n      const errorMessage = `Unknown mutation function: ${transaction.mutationFnName}`\n\n      if (this.config.onUnknownMutationFn) {\n        this.config.onUnknownMutationFn(transaction.mutationFnName, transaction)\n      }\n\n      throw new NonRetriableError(errorMessage)\n    }\n\n    // Mutations are already PendingMutation objects with collections attached\n    // from the deserializer, so we can use them directly\n    const transactionWithMutations = {\n      id: transaction.id,\n      mutations: transaction.mutations,\n      metadata: transaction.metadata ?? {},\n    }\n\n    await mutationFn({\n      transaction: transactionWithMutations as any,\n      idempotencyKey: transaction.idempotencyKey,\n    })\n  }\n\n  private async handleError(\n    transaction: OfflineTransaction,\n    error: Error,\n  ): Promise<void> {\n    return withNestedSpan(\n      `transaction.handleError`,\n      {\n        'transaction.id': transaction.id,\n        'error.name': error.name,\n        'error.message': error.message,\n      },\n      async (span) => {\n        const shouldRetry = this.retryPolicy.shouldRetry(\n          error,\n          transaction.retryCount,\n        )\n\n        span.setAttribute(`shouldRetry`, shouldRetry)\n\n        if (!shouldRetry) {\n          this.scheduler.markCompleted(transaction)\n          await this.outbox.remove(transaction.id)\n          console.warn(\n            `Transaction ${transaction.id} failed permanently:`,\n            error,\n          )\n\n          span.setAttribute(`result`, `permanent_failure`)\n          // Signal permanent failure to the waiting transaction\n          this.offlineExecutor.rejectTransaction(transaction.id, error)\n          return\n        }\n\n        const delay = Math.max(\n          0,\n          this.retryPolicy.calculateDelay(transaction.retryCount),\n        )\n        const updatedTransaction: OfflineTransaction = {\n          ...transaction,\n          retryCount: transaction.retryCount + 1,\n          nextAttemptAt: Date.now() + delay,\n          lastError: {\n            name: error.name,\n            message: error.message,\n            stack: error.stack,\n          },\n        }\n\n        span.setAttribute(`retryDelay`, delay)\n        span.setAttribute(`nextRetryCount`, updatedTransaction.retryCount)\n\n        this.scheduler.markFailed(transaction)\n        this.scheduler.updateTransaction(updatedTransaction)\n\n        try {\n          await this.outbox.update(transaction.id, updatedTransaction)\n          span.setAttribute(`result`, `scheduled_retry`)\n        } catch (persistError) {\n          span.recordException(persistError as Error)\n          span.setAttribute(`result`, `persist_failed`)\n          throw persistError\n        }\n\n        // Schedule retry timer\n        this.scheduleNextRetry()\n      },\n    )\n  }\n\n  async loadPendingTransactions(): Promise<void> {\n    const transactions = await this.outbox.getAll()\n    let filteredTransactions = transactions\n\n    if (this.config.beforeRetry) {\n      filteredTransactions = this.config.beforeRetry(transactions)\n    }\n\n    for (const transaction of filteredTransactions) {\n      this.scheduler.schedule(transaction)\n    }\n\n    // Restore optimistic state for loaded transactions\n    // This ensures the UI shows the optimistic data while transactions are pending\n    this.restoreOptimisticState(filteredTransactions)\n\n    // Reset retry delays for all loaded transactions so they can run immediately\n    this.resetRetryDelays()\n\n    // Schedule retry timer for loaded transactions\n    this.scheduleNextRetry()\n\n    const removedTransactions = transactions.filter(\n      (tx) => !filteredTransactions.some((filtered) => filtered.id === tx.id),\n    )\n\n    if (removedTransactions.length > 0) {\n      await this.outbox.removeMany(removedTransactions.map((tx) => tx.id))\n    }\n  }\n\n  /**\n   * Restore optimistic state from loaded transactions.\n   * Creates internal transactions to hold the mutations so the collection's\n   * state manager can show optimistic data while waiting for sync.\n   */\n  private restoreOptimisticState(\n    transactions: Array<OfflineTransaction>,\n  ): void {\n    for (const offlineTx of transactions) {\n      if (offlineTx.mutations.length === 0) {\n        continue\n      }\n\n      try {\n        // Create a restoration transaction that holds mutations for optimistic state display.\n        // It will never commit - the real mutation is handled by the offline executor.\n        const restorationTx = createTransaction({\n          id: offlineTx.id,\n          autoCommit: false,\n          mutationFn: async () => {},\n        })\n\n        // Prevent unhandled promise rejection when cleanup calls rollback()\n        // We don't care about this promise - it's just for holding mutations\n        restorationTx.isPersisted.promise.catch(() => {\n          // Intentionally ignored - restoration transactions are cleaned up\n          // via cleanupRestorationTransaction, not through normal commit flow\n        })\n\n        restorationTx.applyMutations(offlineTx.mutations)\n\n        // Register with each affected collection's state manager\n        const touchedCollections = new Set<string>()\n        for (const mutation of offlineTx.mutations) {\n          // Defensive check for corrupted deserialized data\n          // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition\n          if (!mutation.collection) {\n            continue\n          }\n          const collectionId = mutation.collection.id\n          if (touchedCollections.has(collectionId)) {\n            continue\n          }\n          touchedCollections.add(collectionId)\n\n          mutation.collection._state.transactions.set(\n            restorationTx.id,\n            restorationTx,\n          )\n          mutation.collection._state.recomputeOptimisticState(true)\n        }\n\n        this.offlineExecutor.registerRestorationTransaction(\n          offlineTx.id,\n          restorationTx,\n        )\n      } catch (error) {\n        console.warn(\n          `Failed to restore optimistic state for transaction ${offlineTx.id}:`,\n          error,\n        )\n      }\n    }\n  }\n\n  clear(): void {\n    this.scheduler.clear()\n    this.clearRetryTimer()\n  }\n\n  getPendingCount(): number {\n    return this.scheduler.getPendingCount()\n  }\n\n  private scheduleNextRetry(): void {\n    // Clear existing timer\n    this.clearRetryTimer()\n\n    if (!this.isOnline()) {\n      return\n    }\n\n    // Find the earliest retry time among pending transactions\n    const earliestRetryTime = this.getEarliestRetryTime()\n\n    if (earliestRetryTime === null) {\n      return // No transactions pending retry\n    }\n\n    const delay = Math.max(0, earliestRetryTime - Date.now())\n\n    this.retryTimer = setTimeout(() => {\n      this.executeAll().catch((error) => {\n        console.warn(`Failed to execute retry batch:`, error)\n      })\n    }, delay)\n  }\n\n  private getEarliestRetryTime(): number | null {\n    const allTransactions = this.scheduler.getAllPendingTransactions()\n\n    if (allTransactions.length === 0) {\n      return null\n    }\n\n    return Math.min(...allTransactions.map((tx) => tx.nextAttemptAt))\n  }\n\n  private clearRetryTimer(): void {\n    if (this.retryTimer) {\n      clearTimeout(this.retryTimer)\n      this.retryTimer = null\n    }\n  }\n\n  private isOnline(): boolean {\n    return this.offlineExecutor.isOnline()\n  }\n\n  getRunningCount(): number {\n    return this.scheduler.getRunningCount()\n  }\n\n  resetRetryDelays(): void {\n    const allTransactions = this.scheduler.getAllPendingTransactions()\n    const updatedTransactions = allTransactions.map((transaction) => ({\n      ...transaction,\n      nextAttemptAt: Date.now(),\n    }))\n\n    this.scheduler.updateTransactions(updatedTransactions)\n  }\n}\n"],"names":["DefaultRetryPolicy","withNestedSpan","NonRetriableError","createTransaction"],"mappings":";;;;;;AAYA,MAAM,iDAAiC,uBAAuB;AAEvD,MAAM,oBAAoB;AAAA,EAU/B,YACE,WACA,QACA,QACA,iBACA;AAVF,SAAQ,cAAc;AACtB,SAAQ,mBAAyC;AAEjD,SAAQ,aAAmD;AAQzD,SAAK,YAAY;AACjB,SAAK,SAAS;AACd,SAAK,SAAS;AACd,SAAK,cAAc,IAAIA,YAAAA;AAAAA,MACrB,OAAO;AAAA,MACP,OAAO,UAAU;AAAA,IAAA;AAEnB,SAAK,kBAAkB;AAAA,EACzB;AAAA,EAEA,MAAM,QAAQ,aAAgD;AAC5D,SAAK,UAAU,SAAS,WAAW;AACnC,UAAM,KAAK,WAAA;AAAA,EACb;AAAA,EAEA,MAAM,aAA4B;AAChC,QAAI,KAAK,aAAa;AACpB,aAAO,KAAK;AAAA,IACd;AAEA,SAAK,cAAc;AACnB,SAAK,mBAAmB,KAAK,aAAA;AAE7B,QAAI;AACF,YAAM,KAAK;AAAA,IACb,UAAA;AACE,WAAK,cAAc;AACnB,WAAK,mBAAmB;AAAA,IAC1B;AAAA,EACF;AAAA,EAEA,MAAc,eAA8B;AAC1C,WAAO,KAAK,UAAU,gBAAA,IAAoB,GAAG;AAC3C,UAAI,CAAC,KAAK,YAAY;AACpB;AAAA,MACF;AAEA,YAAM,cAAc,KAAK,UAAU,QAAA;AAEnC,UAAI,CAAC,aAAa;AAChB;AAAA,MACF;AAEA,YAAM,KAAK,mBAAmB,WAAW;AAAA,IAC3C;AAGA,SAAK,kBAAA;AAAA,EACP;AAAA,EAEA,MAAc,mBACZ,aACe;AACf,QAAI;AACF,YAAMC,OAAAA;AAAAA,QACJ;AAAA,QACA;AAAA,UACE,kBAAkB,YAAY;AAAA,UAC9B,8BAA8B,YAAY;AAAA,UAC1C,0BAA0B,YAAY;AAAA,UACtC,wBAAwB,YAAY,KAAK;AAAA,QAAA;AAAA,QAE3C,OAAO,SAAS;AACd,eAAK,UAAU,YAAY,WAAW;AAEtC,cAAI,YAAY,aAAa,GAAG;AAC9B,iBAAK,aAAa,iBAAiB,YAAY,UAAU;AAAA,UAC3D;AAEA,cAAI;AACF,kBAAM,SAAS,MAAM,KAAK,cAAc,WAAW;AAEnD,iBAAK,UAAU,cAAc,WAAW;AACxC,kBAAM,KAAK,OAAO,OAAO,YAAY,EAAE;AAEvC,iBAAK,aAAa,UAAU,SAAS;AACrC,iBAAK,gBAAgB,mBAAmB,YAAY,IAAI,MAAM;AAAA,UAChE,SAAS,OAAO;AACd,kBAAM,MACJ,iBAAiB,QAAQ,QAAQ,IAAI,MAAM,OAAO,KAAK,CAAC;AAE1D,iBAAK,aAAa,UAAU,OAAO;AAEnC,kBAAM,KAAK,YAAY,aAAa,GAAG;AACrC,gBAAY,uBAAuB,IAAI;AACzC,kBAAM;AAAA,UACR;AAAA,QACF;AAAA,MAAA;AAAA,IAEJ,SAAS,OAAO;AACd,UACE,iBAAiB,SAChB,MAAc,uBAAuB,MAAM,MAC5C;AACA;AAAA,MACF;AAEA,YAAM;AAAA,IACR;AAAA,EACF;AAAA,EAEA,MAAc,cAAc,aAAgD;AAC1E,UAAM,aAAa,KAAK,OAAO,YAAY,YAAY,cAAc;AAErE,QAAI,CAAC,YAAY;AACf,YAAM,eAAe,8BAA8B,YAAY,cAAc;AAE7E,UAAI,KAAK,OAAO,qBAAqB;AACnC,aAAK,OAAO,oBAAoB,YAAY,gBAAgB,WAAW;AAAA,MACzE;AAEA,YAAM,IAAIC,MAAAA,kBAAkB,YAAY;AAAA,IAC1C;AAIA,UAAM,2BAA2B;AAAA,MAC/B,IAAI,YAAY;AAAA,MAChB,WAAW,YAAY;AAAA,MACvB,UAAU,YAAY,YAAY,CAAA;AAAA,IAAC;AAGrC,UAAM,WAAW;AAAA,MACf,aAAa;AAAA,MACb,gBAAgB,YAAY;AAAA,IAAA,CAC7B;AAAA,EACH;AAAA,EAEA,MAAc,YACZ,aACA,OACe;AACf,WAAOD,OAAAA;AAAAA,MACL;AAAA,MACA;AAAA,QACE,kBAAkB,YAAY;AAAA,QAC9B,cAAc,MAAM;AAAA,QACpB,iBAAiB,MAAM;AAAA,MAAA;AAAA,MAEzB,OAAO,SAAS;AACd,cAAM,cAAc,KAAK,YAAY;AAAA,UACnC;AAAA,UACA,YAAY;AAAA,QAAA;AAGd,aAAK,aAAa,eAAe,WAAW;AAE5C,YAAI,CAAC,aAAa;AAChB,eAAK,UAAU,cAAc,WAAW;AACxC,gBAAM,KAAK,OAAO,OAAO,YAAY,EAAE;AACvC,kBAAQ;AAAA,YACN,eAAe,YAAY,EAAE;AAAA,YAC7B;AAAA,UAAA;AAGF,eAAK,aAAa,UAAU,mBAAmB;AAE/C,eAAK,gBAAgB,kBAAkB,YAAY,IAAI,KAAK;AAC5D;AAAA,QACF;AAEA,cAAM,QAAQ,KAAK;AAAA,UACjB;AAAA,UACA,KAAK,YAAY,eAAe,YAAY,UAAU;AAAA,QAAA;AAExD,cAAM,qBAAyC;AAAA,UAC7C,GAAG;AAAA,UACH,YAAY,YAAY,aAAa;AAAA,UACrC,eAAe,KAAK,IAAA,IAAQ;AAAA,UAC5B,WAAW;AAAA,YACT,MAAM,MAAM;AAAA,YACZ,SAAS,MAAM;AAAA,YACf,OAAO,MAAM;AAAA,UAAA;AAAA,QACf;AAGF,aAAK,aAAa,cAAc,KAAK;AACrC,aAAK,aAAa,kBAAkB,mBAAmB,UAAU;AAEjE,aAAK,UAAU,WAAW,WAAW;AACrC,aAAK,UAAU,kBAAkB,kBAAkB;AAEnD,YAAI;AACF,gBAAM,KAAK,OAAO,OAAO,YAAY,IAAI,kBAAkB;AAC3D,eAAK,aAAa,UAAU,iBAAiB;AAAA,QAC/C,SAAS,cAAc;AACrB,eAAK,gBAAgB,YAAqB;AAC1C,eAAK,aAAa,UAAU,gBAAgB;AAC5C,gBAAM;AAAA,QACR;AAGA,aAAK,kBAAA;AAAA,MACP;AAAA,IAAA;AAAA,EAEJ;AAAA,EAEA,MAAM,0BAAyC;AAC7C,UAAM,eAAe,MAAM,KAAK,OAAO,OAAA;AACvC,QAAI,uBAAuB;AAE3B,QAAI,KAAK,OAAO,aAAa;AAC3B,6BAAuB,KAAK,OAAO,YAAY,YAAY;AAAA,IAC7D;AAEA,eAAW,eAAe,sBAAsB;AAC9C,WAAK,UAAU,SAAS,WAAW;AAAA,IACrC;AAIA,SAAK,uBAAuB,oBAAoB;AAGhD,SAAK,iBAAA;AAGL,SAAK,kBAAA;AAEL,UAAM,sBAAsB,aAAa;AAAA,MACvC,CAAC,OAAO,CAAC,qBAAqB,KAAK,CAAC,aAAa,SAAS,OAAO,GAAG,EAAE;AAAA,IAAA;AAGxE,QAAI,oBAAoB,SAAS,GAAG;AAClC,YAAM,KAAK,OAAO,WAAW,oBAAoB,IAAI,CAAC,OAAO,GAAG,EAAE,CAAC;AAAA,IACrE;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOQ,uBACN,cACM;AACN,eAAW,aAAa,cAAc;AACpC,UAAI,UAAU,UAAU,WAAW,GAAG;AACpC;AAAA,MACF;AAEA,UAAI;AAGF,cAAM,gBAAgBE,GAAAA,kBAAkB;AAAA,UACtC,IAAI,UAAU;AAAA,UACd,YAAY;AAAA,UACZ,YAAY,YAAY;AAAA,UAAC;AAAA,QAAA,CAC1B;AAID,sBAAc,YAAY,QAAQ,MAAM,MAAM;AAAA,QAG9C,CAAC;AAED,sBAAc,eAAe,UAAU,SAAS;AAGhD,cAAM,yCAAyB,IAAA;AAC/B,mBAAW,YAAY,UAAU,WAAW;AAG1C,cAAI,CAAC,SAAS,YAAY;AACxB;AAAA,UACF;AACA,gBAAM,eAAe,SAAS,WAAW;AACzC,cAAI,mBAAmB,IAAI,YAAY,GAAG;AACxC;AAAA,UACF;AACA,6BAAmB,IAAI,YAAY;AAEnC,mBAAS,WAAW,OAAO,aAAa;AAAA,YACtC,cAAc;AAAA,YACd;AAAA,UAAA;AAEF,mBAAS,WAAW,OAAO,yBAAyB,IAAI;AAAA,QAC1D;AAEA,aAAK,gBAAgB;AAAA,UACnB,UAAU;AAAA,UACV;AAAA,QAAA;AAAA,MAEJ,SAAS,OAAO;AACd,gBAAQ;AAAA,UACN,sDAAsD,UAAU,EAAE;AAAA,UAClE;AAAA,QAAA;AAAA,MAEJ;AAAA,IACF;AAAA,EACF;AAAA,EAEA,QAAc;AACZ,SAAK,UAAU,MAAA;AACf,SAAK,gBAAA;AAAA,EACP;AAAA,EAEA,kBAA0B;AACxB,WAAO,KAAK,UAAU,gBAAA;AAAA,EACxB;AAAA,EAEQ,oBAA0B;AAEhC,SAAK,gBAAA;AAEL,QAAI,CAAC,KAAK,YAAY;AACpB;AAAA,IACF;AAGA,UAAM,oBAAoB,KAAK,qBAAA;AAE/B,QAAI,sBAAsB,MAAM;AAC9B;AAAA,IACF;AAEA,UAAM,QAAQ,KAAK,IAAI,GAAG,oBAAoB,KAAK,KAAK;AAExD,SAAK,aAAa,WAAW,MAAM;AACjC,WAAK,WAAA,EAAa,MAAM,CAAC,UAAU;AACjC,gBAAQ,KAAK,kCAAkC,KAAK;AAAA,MACtD,CAAC;AAAA,IACH,GAAG,KAAK;AAAA,EACV;AAAA,EAEQ,uBAAsC;AAC5C,UAAM,kBAAkB,KAAK,UAAU,0BAAA;AAEvC,QAAI,gBAAgB,WAAW,GAAG;AAChC,aAAO;AAAA,IACT;AAEA,WAAO,KAAK,IAAI,GAAG,gBAAgB,IAAI,CAAC,OAAO,GAAG,aAAa,CAAC;AAAA,EAClE;AAAA,EAEQ,kBAAwB;AAC9B,QAAI,KAAK,YAAY;AACnB,mBAAa,KAAK,UAAU;AAC5B,WAAK,aAAa;AAAA,IACpB;AAAA,EACF;AAAA,EAEQ,WAAoB;AAC1B,WAAO,KAAK,gBAAgB,SAAA;AAAA,EAC9B;AAAA,EAEA,kBAA0B;AACxB,WAAO,KAAK,UAAU,gBAAA;AAAA,EACxB;AAAA,EAEA,mBAAyB;AACvB,UAAM,kBAAkB,KAAK,UAAU,0BAAA;AACvC,UAAM,sBAAsB,gBAAgB,IAAI,CAAC,iBAAiB;AAAA,MAChE,GAAG;AAAA,MACH,eAAe,KAAK,IAAA;AAAA,IAAI,EACxB;AAEF,SAAK,UAAU,mBAAmB,mBAAmB;AAAA,EACvD;AACF;;"}