{"version":3,"file":"event_source_parse.cjs","names":["IterableReadableStream"],"sources":["../../src/utils/event_source_parse.ts"],"sourcesContent":["/* oxlint-disable prefer-template */\n/* oxlint-disable default-case */\n// Adapted from https://github.com/gfortaine/fetch-event-source/blob/main/src/parse.ts\n// due to a packaging issue in the original.\n// MIT License\nimport { IterableReadableStream } from \"./stream.js\";\n\nexport const EventStreamContentType = \"text/event-stream\";\n\n/**\n * Represents a message sent in an event stream\n * https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format\n */\nexport interface EventSourceMessage {\n  /** The event ID to set the EventSource object's last event ID value. */\n  id: string;\n  /** A string identifying the type of event described. */\n  event: string;\n  /** The event data */\n  data: string;\n  /** The reconnection interval (in milliseconds) to wait before retrying the connection */\n  retry?: number;\n}\n\n/**\n * Converts a ReadableStream into a callback pattern.\n * @param stream The input ReadableStream.\n * @param onChunk A function that will be called on each new byte chunk in the stream.\n * @returns {Promise<void>} A promise that will be resolved when the stream closes.\n */\nexport async function getBytes(\n  // oxlint-disable-next-line @typescript-eslint/no-explicit-any\n  stream: ReadableStream<Uint8Array> | AsyncIterable<any>,\n  onChunk: (arr: Uint8Array, flush?: boolean) => void\n) {\n  // TODO: Use Async iteration for both cases?\n  // oxlint-disable-next-line no-instanceof/no-instanceof\n  if (stream instanceof ReadableStream) {\n    const reader = stream.getReader();\n    // CHANGED: Introduced a \"flush\" mechanism to process potential pending messages when the stream ends.\n    //          This change is essential to ensure that we capture every last piece of information from streams,\n    //          such as those from Azure OpenAI, which may not terminate with a blank line. Without this\n    //          mechanism, we risk ignoring a possibly significant last message.\n    //          See https://github.com/langchain-ai/langchainjs/issues/1299 for details.\n    while (true) {\n      const result = await reader.read();\n      if (result.done) {\n        onChunk(new Uint8Array(), true);\n        break;\n      }\n      onChunk(result.value);\n    }\n  } else {\n    try {\n      // Handle Node.js Readable streams with async iteration\n      for await (const chunk of stream) {\n        onChunk(new Uint8Array(chunk));\n      }\n      onChunk(new Uint8Array(), true);\n      // oxlint-disable-next-line @typescript-eslint/no-explicit-any\n    } catch (e: any) {\n      throw new Error(\n        [\n          \"Parsing event source stream failed.\",\n          \"Ensure your implementation of fetch returns a web or Node readable stream.\",\n          `Error: ${e.message}`,\n        ].join(\"\\n\")\n      );\n    }\n  }\n}\n\nconst enum ControlChars {\n  NewLine = 10,\n  CarriageReturn = 13,\n  Space = 32,\n  Colon = 58,\n}\n\n/**\n * Parses arbitary byte chunks into EventSource line buffers.\n * Each line should be of the format \"field: value\" and ends with \\r, \\n, or \\r\\n.\n * @param onLine A function that will be called on each new EventSource line.\n * @returns A function that should be called for each incoming byte chunk.\n */\nexport function getLines(\n  onLine: (line: Uint8Array, fieldLength: number, flush?: boolean) => void\n) {\n  let buffer: Uint8Array | undefined;\n  let position: number; // current read position\n  let fieldLength: number; // length of the `field` portion of the line\n  let discardTrailingNewline = false;\n\n  // return a function that can process each incoming byte chunk:\n  return function onChunk(arr: Uint8Array, flush?: boolean) {\n    if (flush) {\n      onLine(arr, 0, true);\n      return;\n    }\n\n    if (buffer === undefined) {\n      buffer = arr;\n      position = 0;\n      fieldLength = -1;\n    } else {\n      // we're still parsing the old line. Append the new bytes into buffer:\n      buffer = concat(buffer, arr);\n    }\n\n    const bufLength = buffer.length;\n    let lineStart = 0; // index where the current line starts\n    while (position < bufLength) {\n      if (discardTrailingNewline) {\n        if (buffer[position] === ControlChars.NewLine) {\n          lineStart = ++position; // skip to next char\n        }\n\n        discardTrailingNewline = false;\n      }\n\n      // start looking forward till the end of line:\n      let lineEnd = -1; // index of the \\r or \\n char\n      for (; position < bufLength && lineEnd === -1; ++position) {\n        switch (buffer[position]) {\n          case ControlChars.Colon:\n            if (fieldLength === -1) {\n              // first colon in line\n              fieldLength = position - lineStart;\n            }\n            break;\n          // @ts-expect-error \\r case below should fallthrough to \\n:\n          case ControlChars.CarriageReturn:\n            discardTrailingNewline = true;\n          // oxlint-disable-next-line no-fallthrough\n          case ControlChars.NewLine:\n            lineEnd = position;\n            break;\n        }\n      }\n\n      if (lineEnd === -1) {\n        // We reached the end of the buffer but the line hasn't ended.\n        // Wait for the next arr and then continue parsing:\n        break;\n      }\n\n      // we've reached the line end, send it out:\n      onLine(buffer.subarray(lineStart, lineEnd), fieldLength);\n      lineStart = position; // we're now on the next line\n      fieldLength = -1;\n    }\n\n    if (lineStart === bufLength) {\n      buffer = undefined; // we've finished reading it\n    } else if (lineStart !== 0) {\n      // Create a new view into buffer beginning at lineStart so we don't\n      // need to copy over the previous lines when we get the new arr:\n      buffer = buffer.subarray(lineStart);\n      position -= lineStart;\n    }\n  };\n}\n\n/**\n * Parses line buffers into EventSourceMessages.\n * @param onId A function that will be called on each `id` field.\n * @param onRetry A function that will be called on each `retry` field.\n * @param onMessage A function that will be called on each message.\n * @returns A function that should be called for each incoming line buffer.\n */\nexport function getMessages(\n  onMessage?: (msg: EventSourceMessage) => void,\n  onId?: (id: string) => void,\n  onRetry?: (retry: number) => void\n) {\n  let message = newMessage();\n  const decoder = new TextDecoder();\n\n  // return a function that can process each incoming line buffer:\n  return function onLine(\n    line: Uint8Array,\n    fieldLength: number,\n    flush?: boolean\n  ) {\n    if (flush) {\n      if (!isEmpty(message)) {\n        onMessage?.(message);\n        message = newMessage();\n      }\n      return;\n    }\n\n    if (line.length === 0) {\n      // empty line denotes end of message. Trigger the callback and start a new message:\n      onMessage?.(message);\n      message = newMessage();\n    } else if (fieldLength > 0) {\n      // exclude comments and lines with no values\n      // line is of format \"<field>:<value>\" or \"<field>: <value>\"\n      // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation\n      const field = decoder.decode(line.subarray(0, fieldLength));\n      const valueOffset =\n        fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);\n      const value = decoder.decode(line.subarray(valueOffset));\n\n      switch (field) {\n        case \"data\":\n          // if this message already has data, append the new value to the old.\n          // otherwise, just set to the new value:\n          message.data = message.data ? message.data + \"\\n\" + value : value; // otherwise,\n          break;\n        case \"event\":\n          message.event = value;\n          break;\n        case \"id\":\n          onId?.((message.id = value));\n          break;\n        case \"retry\": {\n          const retry = parseInt(value, 10);\n          if (!Number.isNaN(retry)) {\n            // per spec, ignore non-integers\n            onRetry?.((message.retry = retry));\n          }\n          break;\n        }\n      }\n    }\n  };\n}\n\nfunction concat(a: Uint8Array, b: Uint8Array) {\n  const res = new Uint8Array(a.length + b.length);\n  res.set(a);\n  res.set(b, a.length);\n  return res;\n}\n\nfunction newMessage(): EventSourceMessage {\n  // data, event, and id must be initialized to empty strings:\n  // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation\n  // retry should be initialized to undefined so we return a consistent shape\n  // to the js engine all the time: https://mathiasbynens.be/notes/shapes-ics#takeaways\n  return {\n    data: \"\",\n    event: \"\",\n    id: \"\",\n    retry: undefined,\n  };\n}\n\nexport function convertEventStreamToIterableReadableDataStream(\n  stream: ReadableStream,\n  onMetadataEvent?: (e: unknown) => unknown\n) {\n  const dataStream = new ReadableStream({\n    async start(controller) {\n      const enqueueLine = getMessages((msg) => {\n        if (msg.event === \"error\") {\n          throw new Error(msg.data ?? \"Unspecified event streaming error.\");\n        } else if (msg.event === \"metadata\") {\n          onMetadataEvent?.(msg);\n        } else {\n          if (msg.data) controller.enqueue(msg.data);\n        }\n      });\n      const onLine = (\n        line: Uint8Array,\n        fieldLength: number,\n        flush?: boolean\n      ) => {\n        enqueueLine(line, fieldLength, flush);\n        if (flush) controller.close();\n      };\n      await getBytes(stream, getLines(onLine));\n    },\n  });\n  return IterableReadableStream.fromReadableStream(dataStream);\n}\n\nfunction isEmpty(message: EventSourceMessage): boolean {\n  return (\n    message.data === \"\" &&\n    message.event === \"\" &&\n    message.id === \"\" &&\n    message.retry === undefined\n  );\n}\n"],"mappings":";;;;;;;;;;;AAOA,MAAa,yBAAyB;;;;;;;AAuBtC,eAAsB,SAEpB,QACA,SACA;AAGA,KAAI,kBAAkB,gBAAgB;EACpC,MAAM,SAAS,OAAO,WAAW;AAMjC,SAAO,MAAM;GACX,MAAM,SAAS,MAAM,OAAO,MAAM;AAClC,OAAI,OAAO,MAAM;AACf,YAAQ,IAAI,YAAY,EAAE,KAAK;AAC/B;;AAEF,WAAQ,OAAO,MAAM;;OAGvB,KAAI;AAEF,aAAW,MAAM,SAAS,OACxB,SAAQ,IAAI,WAAW,MAAM,CAAC;AAEhC,UAAQ,IAAI,YAAY,EAAE,KAAK;UAExB,GAAQ;AACf,QAAM,IAAI,MACR;GACE;GACA;GACA,UAAU,EAAE;GACb,CAAC,KAAK,KAAK,CACb;;;AAKP,IAAW,eAAX,yBAAA,cAAA;AACE,cAAA,aAAA,aAAA,MAAA;AACA,cAAA,aAAA,oBAAA,MAAA;AACA,cAAA,aAAA,WAAA,MAAA;AACA,cAAA,aAAA,WAAA,MAAA;;EAJS,gBAAA,EAAA,CAKV;;;;;;;AAQD,SAAgB,SACd,QACA;CACA,IAAI;CACJ,IAAI;CACJ,IAAI;CACJ,IAAI,yBAAyB;AAG7B,QAAO,SAAS,QAAQ,KAAiB,OAAiB;AACxD,MAAI,OAAO;AACT,UAAO,KAAK,GAAG,KAAK;AACpB;;AAGF,MAAI,WAAW,KAAA,GAAW;AACxB,YAAS;AACT,cAAW;AACX,iBAAc;QAGd,UAAS,OAAO,QAAQ,IAAI;EAG9B,MAAM,YAAY,OAAO;EACzB,IAAI,YAAY;AAChB,SAAO,WAAW,WAAW;AAC3B,OAAI,wBAAwB;AAC1B,QAAI,OAAO,cAAc,aAAa,QACpC,aAAY,EAAE;AAGhB,6BAAyB;;GAI3B,IAAI,UAAU;AACd,UAAO,WAAW,aAAa,YAAY,IAAI,EAAE,SAC/C,SAAQ,OAAO,WAAf;IACE,KAAK,aAAa;AAChB,SAAI,gBAAgB,GAElB,eAAc,WAAW;AAE3B;IAEF,KAAK,aAAa,eAChB,0BAAyB;IAE3B,KAAK,aAAa;AAChB,eAAU;AACV;;AAIN,OAAI,YAAY,GAGd;AAIF,UAAO,OAAO,SAAS,WAAW,QAAQ,EAAE,YAAY;AACxD,eAAY;AACZ,iBAAc;;AAGhB,MAAI,cAAc,UAChB,UAAS,KAAA;WACA,cAAc,GAAG;AAG1B,YAAS,OAAO,SAAS,UAAU;AACnC,eAAY;;;;;;;;;;;AAYlB,SAAgB,YACd,WACA,MACA,SACA;CACA,IAAI,UAAU,YAAY;CAC1B,MAAM,UAAU,IAAI,aAAa;AAGjC,QAAO,SAAS,OACd,MACA,aACA,OACA;AACA,MAAI,OAAO;AACT,OAAI,CAAC,QAAQ,QAAQ,EAAE;AACrB,gBAAY,QAAQ;AACpB,cAAU,YAAY;;AAExB;;AAGF,MAAI,KAAK,WAAW,GAAG;AAErB,eAAY,QAAQ;AACpB,aAAU,YAAY;aACb,cAAc,GAAG;GAI1B,MAAM,QAAQ,QAAQ,OAAO,KAAK,SAAS,GAAG,YAAY,CAAC;GAC3D,MAAM,cACJ,eAAe,KAAK,cAAc,OAAO,aAAa,QAAQ,IAAI;GACpE,MAAM,QAAQ,QAAQ,OAAO,KAAK,SAAS,YAAY,CAAC;AAExD,WAAQ,OAAR;IACE,KAAK;AAGH,aAAQ,OAAO,QAAQ,OAAO,QAAQ,OAAO,OAAO,QAAQ;AAC5D;IACF,KAAK;AACH,aAAQ,QAAQ;AAChB;IACF,KAAK;AACH,YAAQ,QAAQ,KAAK,MAAO;AAC5B;IACF,KAAK,SAAS;KACZ,MAAM,QAAQ,SAAS,OAAO,GAAG;AACjC,SAAI,CAAC,OAAO,MAAM,MAAM,CAEtB,WAAW,QAAQ,QAAQ,MAAO;AAEpC;;;;;;AAOV,SAAS,OAAO,GAAe,GAAe;CAC5C,MAAM,MAAM,IAAI,WAAW,EAAE,SAAS,EAAE,OAAO;AAC/C,KAAI,IAAI,EAAE;AACV,KAAI,IAAI,GAAG,EAAE,OAAO;AACpB,QAAO;;AAGT,SAAS,aAAiC;AAKxC,QAAO;EACL,MAAM;EACN,OAAO;EACP,IAAI;EACJ,OAAO,KAAA;EACR;;AAGH,SAAgB,+CACd,QACA,iBACA;CACA,MAAM,aAAa,IAAI,eAAe,EACpC,MAAM,MAAM,YAAY;EACtB,MAAM,cAAc,aAAa,QAAQ;AACvC,OAAI,IAAI,UAAU,QAChB,OAAM,IAAI,MAAM,IAAI,QAAQ,qCAAqC;YACxD,IAAI,UAAU,WACvB,mBAAkB,IAAI;YAElB,IAAI,KAAM,YAAW,QAAQ,IAAI,KAAK;IAE5C;EACF,MAAM,UACJ,MACA,aACA,UACG;AACH,eAAY,MAAM,aAAa,MAAM;AACrC,OAAI,MAAO,YAAW,OAAO;;AAE/B,QAAM,SAAS,QAAQ,SAAS,OAAO,CAAC;IAE3C,CAAC;AACF,QAAOA,qBAAAA,uBAAuB,mBAAmB,WAAW;;AAG9D,SAAS,QAAQ,SAAsC;AACrD,QACE,QAAQ,SAAS,MACjB,QAAQ,UAAU,MAClB,QAAQ,OAAO,MACf,QAAQ,UAAU,KAAA"}