{"version":3,"file":"client.cjs","names":["baseHeaders: Record<string, string>","resp: Response | undefined","redirectUrl: string | undefined","eagerResponse: Promise<Response | undefined> | undefined","iterSse","parseSseEvent","redirectRes: Response | undefined"],"sources":["../../../src/experimental/durable-endpoints/client.ts"],"sourcesContent":["/**\n * Entrypoint file for client-side Durable Endpoint utilities.\n */\n\nimport {\n  iterSse,\n  parseSseEvent,\n  type SseEvent,\n} from \"../../components/execution/streaming.ts\";\n\ninterface FetchDurableEndpointOptions {\n  /** Fetch function. */\n  fetch?: typeof globalThis.fetch;\n\n  /** Options passed to the fetch function. */\n  fetchOpts?: RequestInit;\n\n  /** Called when run metadata is received (e.g. run ID). */\n  onMetadata?: (args: { runId: string }) => void;\n\n  /**\n   * Called for each streamed chunk. Should be considered uncommitted until a\n   * commit or rollback event is received.\n   */\n  onData?: (args: { data: unknown; hashedStepId: string | null }) => void;\n\n  /**\n   * Called when uncommitted stream data should be rolled back, since a retry\n   * will happen.\n   */\n  onRollback?: (args: { hashedStepId: string | null }) => void;\n\n  /**\n   * Called when uncommitted stream data should be committed, since it can no longer be\n   * rolled back.\n   */\n  onCommit?: (args: { hashedStepId: string | null }) => void;\n}\n\n/**\n * Fetch a durable endpoint URL and consume its SSE stream, dispatching\n * lifecycle callbacks (metadata, data, commit, rollback) as\n * events arrive. Returns the final `Response` reconstructed from the\n * terminal `inngest.response` SSE event.\n *\n * If the server does not respond with `text/event-stream`, the raw\n * `Response` is returned as-is (non-streaming path).\n */\nexport async function fetchWithStream(\n  url: string,\n  opts?: FetchDurableEndpointOptions,\n): Promise<Response> {\n  const fetchFn = opts?.fetch ?? globalThis.fetch;\n\n  const baseHeaders: Record<string, string> = {};\n  if (opts?.fetchOpts?.headers) {\n    new Headers(opts.fetchOpts.headers).forEach((value, key) => {\n      baseHeaders[key] = value;\n    });\n  }\n\n  const initialRes = await fetchFn(url, {\n    ...opts?.fetchOpts,\n    headers: {\n      ...baseHeaders,\n      Accept: \"text/event-stream\",\n    },\n  });\n\n  const contentType = initialRes.headers.get(\"content-type\") ?? \"\";\n  if (!contentType.includes(\"text/event-stream\")) {\n    return initialRes;\n  }\n\n  if (!initialRes.body) {\n    throw new Error(\"No response body\");\n  }\n\n  let resp: Response | undefined;\n\n  const source = iterSseFollowRedirects(\n    initialRes.body,\n    fetchFn,\n    opts?.fetchOpts?.signal ?? undefined,\n  );\n\n  outer: for await (const sseEvent of source) {\n    switch (sseEvent.type) {\n      case \"inngest.stream\": {\n        opts?.onData?.({\n          data: sseEvent.data,\n          hashedStepId: sseEvent.hashedStepId ?? null,\n        });\n        break;\n      }\n      case \"inngest.commit\": {\n        opts?.onCommit?.({ hashedStepId: sseEvent.hashedStepId });\n        break;\n      }\n      case \"inngest.rollback\": {\n        opts?.onRollback?.({ hashedStepId: sseEvent.hashedStepId });\n        break;\n      }\n      case \"inngest.response\": {\n        resp = new Response(sseEvent.response.body, {\n          status: sseEvent.response.statusCode,\n          headers: sseEvent.response.headers,\n        });\n\n        break outer;\n      }\n      case \"inngest.metadata\": {\n        opts?.onMetadata?.({ runId: sseEvent.runId });\n        break;\n      }\n      default:\n        break;\n    }\n  }\n\n  if (!resp) {\n    throw new Error(\"No response\");\n  }\n\n  return resp;\n}\n\n/**\n * Async generator that yields parsed SSE events from an already-fetched\n * response body, following `inngest.redirect_info` redirects.\n *\n * When a redirect event arrives, the redirect URL is fetched eagerly in the\n * background so the connection is already established by the time the direct\n * stream closes. This minimizes the window for late-joiner data loss.\n */\nasync function* iterSseFollowRedirects(\n  body: ReadableStream<Uint8Array>,\n  fetchFn: typeof globalThis.fetch,\n  signal?: AbortSignal,\n): AsyncGenerator<SseEvent> {\n  const fetchOpts = { headers: { Accept: \"text/event-stream\" }, signal };\n  let redirectUrl: string | undefined;\n  let eagerResponse: Promise<Response | undefined> | undefined;\n\n  try {\n    for await (const raw of iterSse(body)) {\n      const sseEvent = parseSseEvent(raw);\n      if (!sseEvent) continue;\n\n      if (sseEvent.type === \"inngest.redirect_info\") {\n        redirectUrl = sseEvent.url;\n\n        // Start the redirect connection immediately (once only).\n        if (sseEvent.url && !eagerResponse) {\n          eagerResponse = fetchFn(sseEvent.url, fetchOpts).catch(\n            () => undefined,\n          );\n        }\n\n        yield sseEvent;\n        continue;\n      }\n\n      yield sseEvent;\n    }\n\n    if (!redirectUrl) return;\n\n    let redirectRes: Response | undefined;\n\n    if (eagerResponse) {\n      const eager = await eagerResponse;\n      if (eager?.ok && eager.body) {\n        redirectRes = eager;\n      } else {\n        await eager?.body?.cancel();\n      }\n      eagerResponse = undefined;\n    }\n\n    if (!redirectRes) {\n      if (signal?.aborted) {\n        throw (\n          signal.reason ??\n          new DOMException(\"The operation was aborted.\", \"AbortError\")\n        );\n      }\n\n      const fallback = await fetchFn(redirectUrl, fetchOpts);\n      if (!fallback.ok) {\n        throw new Error(\n          `Stream request failed: ${fallback.status} ${fallback.statusText}`,\n        );\n      }\n      if (!fallback.body) {\n        throw new Error(\"No response body\");\n      }\n      redirectRes = fallback;\n    }\n\n    for await (const raw of iterSse(redirectRes.body!)) {\n      const sseEvent = parseSseEvent(raw);\n      if (!sseEvent) continue;\n      yield sseEvent;\n    }\n  } finally {\n    // Cancel any unconsumed eager response body to release the connection.\n    if (eagerResponse) {\n      void eagerResponse.then((r) => r?.body?.cancel()).catch(() => {});\n    }\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;AAgDA,eAAsB,gBACpB,KACA,MACmB;CACnB,MAAM,UAAU,MAAM,SAAS,WAAW;CAE1C,MAAMA,cAAsC,EAAE;AAC9C,KAAI,MAAM,WAAW,QACnB,KAAI,QAAQ,KAAK,UAAU,QAAQ,CAAC,SAAS,OAAO,QAAQ;AAC1D,cAAY,OAAO;GACnB;CAGJ,MAAM,aAAa,MAAM,QAAQ,KAAK;EACpC,GAAG,MAAM;EACT,SAAS;GACP,GAAG;GACH,QAAQ;GACT;EACF,CAAC;AAGF,KAAI,EADgB,WAAW,QAAQ,IAAI,eAAe,IAAI,IAC7C,SAAS,oBAAoB,CAC5C,QAAO;AAGT,KAAI,CAAC,WAAW,KACd,OAAM,IAAI,MAAM,mBAAmB;CAGrC,IAAIC;CAEJ,MAAM,SAAS,uBACb,WAAW,MACX,SACA,MAAM,WAAW,UAAU,OAC5B;AAED,OAAO,YAAW,MAAM,YAAY,OAClC,SAAQ,SAAS,MAAjB;EACE,KAAK;AACH,SAAM,SAAS;IACb,MAAM,SAAS;IACf,cAAc,SAAS,gBAAgB;IACxC,CAAC;AACF;EAEF,KAAK;AACH,SAAM,WAAW,EAAE,cAAc,SAAS,cAAc,CAAC;AACzD;EAEF,KAAK;AACH,SAAM,aAAa,EAAE,cAAc,SAAS,cAAc,CAAC;AAC3D;EAEF,KAAK;AACH,UAAO,IAAI,SAAS,SAAS,SAAS,MAAM;IAC1C,QAAQ,SAAS,SAAS;IAC1B,SAAS,SAAS,SAAS;IAC5B,CAAC;AAEF,SAAM;EAER,KAAK;AACH,SAAM,aAAa,EAAE,OAAO,SAAS,OAAO,CAAC;AAC7C;EAEF,QACE;;AAIN,KAAI,CAAC,KACH,OAAM,IAAI,MAAM,cAAc;AAGhC,QAAO;;;;;;;;;;AAWT,gBAAgB,uBACd,MACA,SACA,QAC0B;CAC1B,MAAM,YAAY;EAAE,SAAS,EAAE,QAAQ,qBAAqB;EAAE;EAAQ;CACtE,IAAIC;CACJ,IAAIC;AAEJ,KAAI;AACF,aAAW,MAAM,OAAOC,0BAAQ,KAAK,EAAE;GACrC,MAAM,WAAWC,gCAAc,IAAI;AACnC,OAAI,CAAC,SAAU;AAEf,OAAI,SAAS,SAAS,yBAAyB;AAC7C,kBAAc,SAAS;AAGvB,QAAI,SAAS,OAAO,CAAC,cACnB,iBAAgB,QAAQ,SAAS,KAAK,UAAU,CAAC,YACzC,OACP;AAGH,UAAM;AACN;;AAGF,SAAM;;AAGR,MAAI,CAAC,YAAa;EAElB,IAAIC;AAEJ,MAAI,eAAe;GACjB,MAAM,QAAQ,MAAM;AACpB,OAAI,OAAO,MAAM,MAAM,KACrB,eAAc;OAEd,OAAM,OAAO,MAAM,QAAQ;AAE7B,mBAAgB;;AAGlB,MAAI,CAAC,aAAa;AAChB,OAAI,QAAQ,QACV,OACE,OAAO,UACP,IAAI,aAAa,8BAA8B,aAAa;GAIhE,MAAM,WAAW,MAAM,QAAQ,aAAa,UAAU;AACtD,OAAI,CAAC,SAAS,GACZ,OAAM,IAAI,MACR,0BAA0B,SAAS,OAAO,GAAG,SAAS,aACvD;AAEH,OAAI,CAAC,SAAS,KACZ,OAAM,IAAI,MAAM,mBAAmB;AAErC,iBAAc;;AAGhB,aAAW,MAAM,OAAOF,0BAAQ,YAAY,KAAM,EAAE;GAClD,MAAM,WAAWC,gCAAc,IAAI;AACnC,OAAI,CAAC,SAAU;AACf,SAAM;;WAEA;AAER,MAAI,cACF,CAAK,cAAc,MAAM,MAAM,GAAG,MAAM,QAAQ,CAAC,CAAC,YAAY,GAAG"}