{"version":3,"file":"StreamTools.cjs","names":["readableStrategy: QueuingStrategy<Uint8Array> | undefined","getAsyncCtxSync","buildSseCommitEvent","buildSseRollbackEvent","sseEvent: string","buildSseStreamEvent","iterable: AsyncIterable<string>","chunks: string[]","buildSseRedirectEvent","buildSseSucceededEvent","buildSseFailedEvent","getAsyncCtx","stream: StreamTools"],"sources":["../../src/components/StreamTools.ts"],"sourcesContent":["import { getAsyncCtx, getAsyncCtxSync } from \"./execution/als.ts\";\nimport {\n  buildSseCommitEvent,\n  buildSseFailedEvent,\n  buildSseRedirectEvent,\n  buildSseRollbackEvent,\n  buildSseStreamEvent,\n  buildSseSucceededEvent,\n  type SseResponse,\n} from \"./execution/streaming.ts\";\n\n/**\n * Accepted source types for `stream.pipe()`.\n *\n * - `ReadableStream` — piped directly\n * - `AsyncIterable<string>` — iterated; each yielded value becomes a chunk\n * - `() => AsyncIterable<string>` — factory invoked lazily, then iterated\n */\nexport type PipeSource =\n  | ReadableStream\n  | AsyncIterable<string>\n  | (() => AsyncIterable<string>);\n\n/**\n * The public interface for stream tools available to user code.\n */\nexport interface StreamTools {\n  /**\n   * Push data to the client as an SSE stream event. Fire-and-forget from the\n   * caller's perspective.\n   *\n   * Outside of an Inngest execution context this is a silent no-op (graceful\n   * degradation).\n   */\n  push(data: unknown): void;\n\n  /**\n   * Pipe a source to the client, writing each chunk as an SSE stream event.\n   * Resolves with the concatenated content of all chunks when the source is\n   * fully consumed.\n   *\n   * Accepts a `ReadableStream`, an `AsyncIterable<string>`, or a factory\n   * function that returns an `AsyncIterable<string>` (e.g. an async\n   * generator function).\n   *\n   * Outside of an Inngest execution context this resolves with an empty string.\n   */\n  pipe(source: PipeSource): Promise<string>;\n}\n\n/**\n * Wraps a `TransformStream<Uint8Array>` to provide push/pipe SSE streaming\n * capabilities within an Inngest execution.\n *\n * @internal\n */\nexport class Stream implements StreamTools {\n  private transform: TransformStream<Uint8Array, Uint8Array>;\n  private writer: WritableStreamDefaultWriter<Uint8Array>;\n  private encoder = new TextEncoder();\n  private _activated = false;\n  private _errored = false;\n  private writeChain: Promise<void> = Promise.resolve();\n\n  /**\n   * Optional callback invoked the first time `push` or `pipe` is called.\n   * Used by the execution engine to fire a checkpoint that returns the SSE\n   * Response to the client immediately.\n   */\n  private onActivated?: () => void;\n\n  /**\n   * Optional callback invoked when a write to the underlying stream fails\n   * (e.g. the client disconnected or the transform stream errored). Used by\n   * the execution engine to emit diagnostic logs.\n   */\n  private onWriteError?: (err: unknown) => void;\n\n  constructor(opts?: {\n    onActivated?: () => void;\n    onWriteError?: (err: unknown) => void;\n  }) {\n    this.onActivated = opts?.onActivated;\n    this.onWriteError = opts?.onWriteError;\n\n    let readableStrategy: QueuingStrategy<Uint8Array> | undefined;\n\n    // `CountQueuingStrategy` is not available in some runtimes (e.g. Next.js\n    // Edge), where it may exist as a stub that throws on instantiation. Fall\n    // back to a plain `TransformStream` when it's missing or broken.\n    try {\n      readableStrategy = new CountQueuingStrategy({\n        // Use a generous high water mark so that writes don't block due to\n        // backpressure before the consumer reads.\n        highWaterMark: 1024,\n      });\n    } catch {\n      // Leave `readableStrategy` undefined\n    }\n\n    this.transform = new TransformStream<Uint8Array, Uint8Array>(\n      undefined,\n      undefined,\n      readableStrategy,\n    );\n    this.writer = this.transform.writable.getWriter();\n  }\n\n  /**\n   * Whether `push` or `pipe` has been called at least once.\n   */\n  get activated(): boolean {\n    return this._activated;\n  }\n\n  /**\n   * The readable side of the underlying transform stream. Consumers (i.e. the\n   * HTTP response) read SSE events from here.\n   */\n  get readable(): ReadableStream<Uint8Array> {\n    return this.transform.readable;\n  }\n\n  /**\n   * Resolve the current hashed step ID for stream events. Returns the\n   * executing step's hashed ID (read from ALS), or undefined if outside a step.\n   */\n  private currentHashedStepId(): string | undefined {\n    return getAsyncCtxSync()?.execution?.executingStep?.hashedId;\n  }\n\n  private activate(): void {\n    if (!this._activated) {\n      this._activated = true;\n      this.onActivated?.();\n    }\n  }\n\n  /**\n   * Encode and write an SSE event string to the underlying writer.\n   */\n  private writeEncoded(sseEvent: string): Promise<void> {\n    return this.writer.write(this.encoder.encode(sseEvent));\n  }\n\n  /**\n   * Enqueue a pre-built SSE event string onto the write chain.\n   */\n  private enqueue(sseEvent: string): void {\n    if (this._errored) return;\n\n    this.writeChain = this.writeChain\n      .then(() => this.writeEncoded(sseEvent))\n      .catch((err) => {\n        // Writer errored (e.g. stream closed) — swallow so the chain\n        // doesn't break and subsequent writes fail gracefully.\n        this._errored = true;\n        this.onWriteError?.(err);\n      });\n  }\n\n  /**\n   * Emit an `inngest.commit` SSE event indicating that uncommitted streamed data\n   * should be committed (i.e. will not be rolled back). Internal use only.\n   */\n  commit(hashedStepId: string | null): void {\n    this.enqueue(buildSseCommitEvent(hashedStepId));\n  }\n\n  /**\n   * Emit an `inngest.rollback` SSE event indicating the uncommitted streamed\n   * data should be discarded (e.g. step errored). Internal use only.\n   */\n  rollback(hashedStepId: string | null): void {\n    this.enqueue(buildSseRollbackEvent(hashedStepId));\n  }\n\n  /**\n   * Serialize `data` into an SSE stream event and enqueue it. Returns `false`\n   * if serialization fails (e.g. circular reference) so callers can skip.\n   */\n  private enqueueStreamEvent(data: unknown, hashedStepId?: string): boolean {\n    let sseEvent: string;\n    try {\n      sseEvent = buildSseStreamEvent(data, hashedStepId);\n    } catch {\n      return false;\n    }\n\n    this.enqueue(sseEvent);\n    return true;\n  }\n\n  /**\n   * Write a single SSE stream event containing `data`. The current step's\n   * hashed ID is automatically included as stepId for rollback tracking.\n   */\n  push(data: unknown): void {\n    this.activate();\n    this.enqueueStreamEvent(data, this.currentHashedStepId());\n  }\n\n  /**\n   * Pipe a source to the client, writing each chunk as an SSE stream event.\n   * Returns the concatenated content of all chunks.\n   */\n  async pipe(source: PipeSource): Promise<string> {\n    this.activate();\n\n    let iterable: AsyncIterable<string>;\n    if (source instanceof ReadableStream) {\n      iterable = this.readableToAsyncIterable(source);\n    } else if (typeof source === \"function\") {\n      iterable = source();\n    } else {\n      iterable = source;\n    }\n\n    return this.pipeIterable(iterable);\n  }\n\n  /**\n   * Adapt a ReadableStream into an AsyncIterable<string>. TypeScript's\n   * ReadableStream type doesn't declare Symbol.asyncIterator, so we use the\n   * reader API for type safety.\n   */\n  private async *readableToAsyncIterable(\n    readable: ReadableStream,\n  ): AsyncIterable<string> {\n    const reader = readable.getReader();\n    const decoder = new TextDecoder();\n    try {\n      while (true) {\n        const { done, value } = await reader.read();\n        if (done) break;\n        yield typeof value === \"string\"\n          ? value\n          : decoder.decode(value, { stream: true });\n      }\n      // flush any partially buffered multibyte characters from the decoder\n      const final = decoder.decode();\n      if (final) yield final;\n    } finally {\n      reader.releaseLock();\n    }\n  }\n\n  /**\n   * Core pipe loop: iterate an async iterable, writing each chunk as an SSE\n   * stream event and collecting the concatenated result.\n   */\n  private async pipeIterable(source: AsyncIterable<string>): Promise<string> {\n    const hashedStepId = this.currentHashedStepId();\n    const chunks: string[] = [];\n\n    for await (const chunk of source) {\n      if (this._errored) break;\n\n      chunks.push(chunk);\n\n      if (!this.enqueueStreamEvent(chunk, hashedStepId)) {\n        continue;\n      }\n\n      await this.writeChain;\n    }\n\n    return chunks.join(\"\");\n  }\n\n  /**\n   * Write a redirect info event. Tells the client where to reconnect if the\n   * durable endpoint goes async. Does NOT close the writer — more stream\n   * events may follow before the durable endpoint actually switches to async\n   * mode. Internal use only.\n   */\n  sendRedirectInfo(data: { runId: string; url: string }): void {\n    this.enqueue(buildSseRedirectEvent(data));\n  }\n\n  /**\n   * Write a succeeded result event and close the writer. Internal use only.\n   */\n  closeSucceeded(response: SseResponse): void {\n    let sseEvent: string;\n    try {\n      sseEvent = buildSseSucceededEvent(response);\n    } catch {\n      sseEvent = buildSseFailedEvent(\"Failed to serialize result\");\n    }\n    this.closeWriter(sseEvent);\n  }\n\n  /**\n   * Write a failed result event and close the writer. Internal use only.\n   */\n  closeFailed(error: string): void {\n    this.closeWriter(buildSseFailedEvent(error));\n  }\n\n  /**\n   * Optionally write a final SSE event, then close the writer.\n   */\n  private closeWriter(finalEvent?: string): void {\n    this.writeChain = this.writeChain\n      .then(async () => {\n        if (finalEvent) {\n          await this.writeEncoded(finalEvent);\n        }\n        await this.writer.close();\n      })\n      .catch((err) => {\n        this.onWriteError?.(err);\n      });\n  }\n\n  /**\n   * Close the writer without writing a result event. Used when the durable endpoint goes\n   * async and the real result will arrive on the redirected stream.\n   */\n  end(): void {\n    this.closeWriter();\n  }\n}\n\n/** Synchronous ALS lookup for the stream tools (fast path). */\nconst getStreamToolsSync = (): Stream | undefined => {\n  const ctx = getAsyncCtxSync();\n  return ctx?.execution?.stream;\n};\n\nconst getDeferredStreamTooling = async (): Promise<Stream | undefined> => {\n  const ctx = await getAsyncCtx();\n  return ctx?.execution?.stream;\n};\n\n/**\n * Stream tools that use ALS to resolve the current execution context.\n * Outside an Inngest execution, `push()` is a no-op and `pipe()` resolves immediately.\n */\nexport const stream: StreamTools = {\n  push: (data) => {\n    // Sync fast path: activate the stream before the next microtask tick.\n    const syncStream = getStreamToolsSync();\n    if (syncStream) {\n      syncStream.push(data);\n      return;\n    }\n\n    // Fallback: ALS not yet initialized (first import still resolving).\n    void getDeferredStreamTooling()\n      .then((s) => {\n        s?.push(data);\n      })\n      .catch(() => {\n        // ALS initialization failure — already warned in als.ts.\n        // push() is best-effort, so silently degrade.\n      });\n  },\n  pipe: async (source) => {\n    const syncStream = getStreamToolsSync();\n    if (syncStream) {\n      return syncStream.pipe(source);\n    }\n\n    const s = await getDeferredStreamTooling();\n    if (s) {\n      return s.pipe(source);\n    }\n    return \"\";\n  },\n};\n"],"mappings":";;;;;;;;;;AAwDA,IAAa,SAAb,MAA2C;CACzC,AAAQ;CACR,AAAQ;CACR,AAAQ,UAAU,IAAI,aAAa;CACnC,AAAQ,aAAa;CACrB,AAAQ,WAAW;CACnB,AAAQ,aAA4B,QAAQ,SAAS;;;;;;CAOrD,AAAQ;;;;;;CAOR,AAAQ;CAER,YAAY,MAGT;AACD,OAAK,cAAc,MAAM;AACzB,OAAK,eAAe,MAAM;EAE1B,IAAIA;AAKJ,MAAI;AACF,sBAAmB,IAAI,qBAAqB,EAG1C,eAAe,MAChB,CAAC;UACI;AAIR,OAAK,YAAY,IAAI,gBACnB,QACA,QACA,iBACD;AACD,OAAK,SAAS,KAAK,UAAU,SAAS,WAAW;;;;;CAMnD,IAAI,YAAqB;AACvB,SAAO,KAAK;;;;;;CAOd,IAAI,WAAuC;AACzC,SAAO,KAAK,UAAU;;;;;;CAOxB,AAAQ,sBAA0C;AAChD,SAAOC,6BAAiB,EAAE,WAAW,eAAe;;CAGtD,AAAQ,WAAiB;AACvB,MAAI,CAAC,KAAK,YAAY;AACpB,QAAK,aAAa;AAClB,QAAK,eAAe;;;;;;CAOxB,AAAQ,aAAa,UAAiC;AACpD,SAAO,KAAK,OAAO,MAAM,KAAK,QAAQ,OAAO,SAAS,CAAC;;;;;CAMzD,AAAQ,QAAQ,UAAwB;AACtC,MAAI,KAAK,SAAU;AAEnB,OAAK,aAAa,KAAK,WACpB,WAAW,KAAK,aAAa,SAAS,CAAC,CACvC,OAAO,QAAQ;AAGd,QAAK,WAAW;AAChB,QAAK,eAAe,IAAI;IACxB;;;;;;CAON,OAAO,cAAmC;AACxC,OAAK,QAAQC,sCAAoB,aAAa,CAAC;;;;;;CAOjD,SAAS,cAAmC;AAC1C,OAAK,QAAQC,wCAAsB,aAAa,CAAC;;;;;;CAOnD,AAAQ,mBAAmB,MAAe,cAAgC;EACxE,IAAIC;AACJ,MAAI;AACF,cAAWC,sCAAoB,MAAM,aAAa;UAC5C;AACN,UAAO;;AAGT,OAAK,QAAQ,SAAS;AACtB,SAAO;;;;;;CAOT,KAAK,MAAqB;AACxB,OAAK,UAAU;AACf,OAAK,mBAAmB,MAAM,KAAK,qBAAqB,CAAC;;;;;;CAO3D,MAAM,KAAK,QAAqC;AAC9C,OAAK,UAAU;EAEf,IAAIC;AACJ,MAAI,kBAAkB,eACpB,YAAW,KAAK,wBAAwB,OAAO;WACtC,OAAO,WAAW,WAC3B,YAAW,QAAQ;MAEnB,YAAW;AAGb,SAAO,KAAK,aAAa,SAAS;;;;;;;CAQpC,OAAe,wBACb,UACuB;EACvB,MAAM,SAAS,SAAS,WAAW;EACnC,MAAM,UAAU,IAAI,aAAa;AACjC,MAAI;AACF,UAAO,MAAM;IACX,MAAM,EAAE,MAAM,UAAU,MAAM,OAAO,MAAM;AAC3C,QAAI,KAAM;AACV,UAAM,OAAO,UAAU,WACnB,QACA,QAAQ,OAAO,OAAO,EAAE,QAAQ,MAAM,CAAC;;GAG7C,MAAM,QAAQ,QAAQ,QAAQ;AAC9B,OAAI,MAAO,OAAM;YACT;AACR,UAAO,aAAa;;;;;;;CAQxB,MAAc,aAAa,QAAgD;EACzE,MAAM,eAAe,KAAK,qBAAqB;EAC/C,MAAMC,SAAmB,EAAE;AAE3B,aAAW,MAAM,SAAS,QAAQ;AAChC,OAAI,KAAK,SAAU;AAEnB,UAAO,KAAK,MAAM;AAElB,OAAI,CAAC,KAAK,mBAAmB,OAAO,aAAa,CAC/C;AAGF,SAAM,KAAK;;AAGb,SAAO,OAAO,KAAK,GAAG;;;;;;;;CASxB,iBAAiB,MAA4C;AAC3D,OAAK,QAAQC,wCAAsB,KAAK,CAAC;;;;;CAM3C,eAAe,UAA6B;EAC1C,IAAIJ;AACJ,MAAI;AACF,cAAWK,yCAAuB,SAAS;UACrC;AACN,cAAWC,sCAAoB,6BAA6B;;AAE9D,OAAK,YAAY,SAAS;;;;;CAM5B,YAAY,OAAqB;AAC/B,OAAK,YAAYA,sCAAoB,MAAM,CAAC;;;;;CAM9C,AAAQ,YAAY,YAA2B;AAC7C,OAAK,aAAa,KAAK,WACpB,KAAK,YAAY;AAChB,OAAI,WACF,OAAM,KAAK,aAAa,WAAW;AAErC,SAAM,KAAK,OAAO,OAAO;IACzB,CACD,OAAO,QAAQ;AACd,QAAK,eAAe,IAAI;IACxB;;;;;;CAON,MAAY;AACV,OAAK,aAAa;;;;AAKtB,MAAM,2BAA+C;AAEnD,QADYT,6BAAiB,EACjB,WAAW;;AAGzB,MAAM,2BAA2B,YAAyC;AAExE,SADY,MAAMU,yBAAa,GACnB,WAAW;;;;;;AAOzB,MAAaC,SAAsB;CACjC,OAAO,SAAS;EAEd,MAAM,aAAa,oBAAoB;AACvC,MAAI,YAAY;AACd,cAAW,KAAK,KAAK;AACrB;;AAIF,EAAK,0BAA0B,CAC5B,MAAM,MAAM;AACX,MAAG,KAAK,KAAK;IACb,CACD,YAAY,GAGX;;CAEN,MAAM,OAAO,WAAW;EACtB,MAAM,aAAa,oBAAoB;AACvC,MAAI,WACF,QAAO,WAAW,KAAK,OAAO;EAGhC,MAAM,IAAI,MAAM,0BAA0B;AAC1C,MAAI,EACF,QAAO,EAAE,KAAK,OAAO;AAEvB,SAAO;;CAEV"}