{"version":3,"file":"buffer.cjs","names":["headers: Record<string, string>","headerKeys","FlushResponse","expBackoff"],"sources":["../../../src/components/connect/buffer.ts"],"sourcesContent":["import { headerKeys } from \"../../helpers/consts.ts\";\nimport type { Logger } from \"../../middleware/logger.ts\";\nimport { FlushResponse } from \"../../proto/src/components/connect/protobuf/connect.ts\";\nimport { expBackoff } from \"./util.ts\";\n\nexport class MessageBuffer {\n  private buffered: Record<string, Uint8Array> = {};\n  private pending: Record<string, Uint8Array> = {};\n  private getApiBaseUrl: () => Promise<string>;\n  private logger: Logger;\n  private envName: string | undefined;\n\n  constructor({\n    envName,\n    getApiBaseUrl,\n    logger,\n  }: {\n    envName: string | undefined;\n    getApiBaseUrl: () => Promise<string>;\n    logger: Logger;\n  }) {\n    this.envName = envName;\n    this.getApiBaseUrl = getApiBaseUrl;\n    this.logger = logger;\n  }\n\n  public append(requestId: string, responseBytes: Uint8Array) {\n    this.buffered[requestId] = responseBytes;\n    delete this.pending[requestId];\n  }\n\n  public addPending(\n    requestId: string,\n    responseBytes: Uint8Array,\n    deadline: number,\n  ) {\n    this.pending[requestId] = responseBytes;\n    setTimeout(() => {\n      if (this.pending[requestId]) {\n        this.logger.warn({ requestId }, \"Message not acknowledged in time\");\n        this.append(requestId, this.pending[requestId]!);\n      }\n    }, deadline);\n  }\n\n  public acknowledgePending(requestId: string) {\n    delete this.pending[requestId];\n  }\n\n  private async sendFlushRequest(\n    hashedSigningKey: string | undefined,\n    responseBytes: Uint8Array,\n  ) {\n    const headers: Record<string, string> = {\n      \"Content-Type\": \"application/protobuf\",\n      ...(hashedSigningKey\n        ? { Authorization: `Bearer ${hashedSigningKey}` }\n        : {}),\n    };\n\n    if (this.envName) {\n      headers[headerKeys.Environment] = this.envName;\n    }\n\n    // protobuf's `finish()` is typed as `Uint8Array<ArrayBufferLike>` (could be\n    // SharedArrayBuffer-backed), but it actually creates a regular ArrayBuffer.\n    // Cast to satisfy fetch's stricter type requirement.\n    // const body = responseBytes as Uint8Array<ArrayBuffer>;\n    if (!isUnsharedArrayBuffer(responseBytes)) {\n      throw new Error(\"Unreachable: response bytes are not an ArrayBuffer\");\n    }\n\n    const resp = await fetch(\n      // refactor this to a more universal spot\n      new URL(\"/v0/connect/flush\", await this.getApiBaseUrl()),\n      {\n        method: \"POST\",\n        body: responseBytes,\n        headers: headers,\n      },\n    );\n\n    if (!resp.ok) {\n      this.logger.error(\n        { body: await resp.text(), status: resp.status },\n        \"Failed to flush messages\",\n      );\n      throw new Error(\"Failed to flush messages\");\n    }\n\n    const flushResp = FlushResponse.decode(\n      new Uint8Array(await resp.arrayBuffer()),\n    );\n\n    return flushResp;\n  }\n\n  public async flush(hashedSigningKey: string | undefined) {\n    if (Object.keys(this.buffered).length === 0) {\n      return;\n    }\n\n    this.logger.info(\n      { count: Object.keys(this.buffered).length },\n      \"Flushing messages\",\n    );\n\n    const maxAttempts = 5;\n\n    for (let attempt = 0; attempt < maxAttempts; attempt++) {\n      for (const [requestId, v] of Object.entries(this.buffered)) {\n        try {\n          await this.sendFlushRequest(hashedSigningKey, v);\n          delete this.buffered[requestId];\n        } catch (err) {\n          this.logger.warn({ err, requestId }, \"Failed to flush message\");\n          break;\n        }\n      }\n\n      if (Object.keys(this.buffered).length === 0) {\n        return;\n      }\n\n      await new Promise((resolve) => setTimeout(resolve, expBackoff(attempt)));\n    }\n\n    this.logger.error(\n      { maxAttempts },\n      \"Failed to flush messages after max attempts\",\n    );\n  }\n}\n\nfunction isUnsharedArrayBuffer(\n  value: Uint8Array<ArrayBufferLike>,\n): value is Uint8Array<ArrayBuffer> {\n  if (typeof SharedArrayBuffer === \"undefined\") {\n    // `SharedArrayBuffer` may not exist at runtime. Some runtimes removed it\n    // for security reasons (Spectre-like attacks).\n    //\n    // If it doesn't exist then we know value is an `ArrayBuffer`.\n    return true;\n  }\n\n  return value.buffer instanceof ArrayBuffer;\n}\n\n/**\n * Throws an error if the value is not an unshared ArrayBuffer. This should be\n * safe because we shouldn't be using `SharedArrayBuffer` at runtime, but our\n * protobuf types have `Uint8Array` as the return type (no generic), which\n * effectively defaults to a union of `ArrayBuffer` and `SharedArrayBuffer`.\n */\nexport function ensureUnsharedArrayBuffer(\n  value: Uint8Array<ArrayBufferLike>,\n): Uint8Array<ArrayBuffer> {\n  if (!isUnsharedArrayBuffer(value)) {\n    throw new Error(\"Unreachable: response bytes are not an ArrayBuffer\");\n  }\n  return value;\n}\n"],"mappings":";;;;;AAKA,IAAa,gBAAb,MAA2B;CACzB,AAAQ,WAAuC,EAAE;CACjD,AAAQ,UAAsC,EAAE;CAChD,AAAQ;CACR,AAAQ;CACR,AAAQ;CAER,YAAY,EACV,SACA,eACA,UAKC;AACD,OAAK,UAAU;AACf,OAAK,gBAAgB;AACrB,OAAK,SAAS;;CAGhB,AAAO,OAAO,WAAmB,eAA2B;AAC1D,OAAK,SAAS,aAAa;AAC3B,SAAO,KAAK,QAAQ;;CAGtB,AAAO,WACL,WACA,eACA,UACA;AACA,OAAK,QAAQ,aAAa;AAC1B,mBAAiB;AACf,OAAI,KAAK,QAAQ,YAAY;AAC3B,SAAK,OAAO,KAAK,EAAE,WAAW,EAAE,mCAAmC;AACnE,SAAK,OAAO,WAAW,KAAK,QAAQ,WAAY;;KAEjD,SAAS;;CAGd,AAAO,mBAAmB,WAAmB;AAC3C,SAAO,KAAK,QAAQ;;CAGtB,MAAc,iBACZ,kBACA,eACA;EACA,MAAMA,UAAkC;GACtC,gBAAgB;GAChB,GAAI,mBACA,EAAE,eAAe,UAAU,oBAAoB,GAC/C,EAAE;GACP;AAED,MAAI,KAAK,QACP,SAAQC,0BAAW,eAAe,KAAK;AAOzC,MAAI,CAAC,sBAAsB,cAAc,CACvC,OAAM,IAAI,MAAM,qDAAqD;EAGvE,MAAM,OAAO,MAAM,MAEjB,IAAI,IAAI,qBAAqB,MAAM,KAAK,eAAe,CAAC,EACxD;GACE,QAAQ;GACR,MAAM;GACG;GACV,CACF;AAED,MAAI,CAAC,KAAK,IAAI;AACZ,QAAK,OAAO,MACV;IAAE,MAAM,MAAM,KAAK,MAAM;IAAE,QAAQ,KAAK;IAAQ,EAChD,2BACD;AACD,SAAM,IAAI,MAAM,2BAA2B;;AAO7C,SAJkBC,8BAAc,OAC9B,IAAI,WAAW,MAAM,KAAK,aAAa,CAAC,CACzC;;CAKH,MAAa,MAAM,kBAAsC;AACvD,MAAI,OAAO,KAAK,KAAK,SAAS,CAAC,WAAW,EACxC;AAGF,OAAK,OAAO,KACV,EAAE,OAAO,OAAO,KAAK,KAAK,SAAS,CAAC,QAAQ,EAC5C,oBACD;EAED,MAAM,cAAc;AAEpB,OAAK,IAAI,UAAU,GAAG,UAAU,aAAa,WAAW;AACtD,QAAK,MAAM,CAAC,WAAW,MAAM,OAAO,QAAQ,KAAK,SAAS,CACxD,KAAI;AACF,UAAM,KAAK,iBAAiB,kBAAkB,EAAE;AAChD,WAAO,KAAK,SAAS;YACd,KAAK;AACZ,SAAK,OAAO,KAAK;KAAE;KAAK;KAAW,EAAE,0BAA0B;AAC/D;;AAIJ,OAAI,OAAO,KAAK,KAAK,SAAS,CAAC,WAAW,EACxC;AAGF,SAAM,IAAI,SAAS,YAAY,WAAW,SAASC,wBAAW,QAAQ,CAAC,CAAC;;AAG1E,OAAK,OAAO,MACV,EAAE,aAAa,EACf,8CACD;;;AAIL,SAAS,sBACP,OACkC;AAClC,KAAI,OAAO,sBAAsB,YAK/B,QAAO;AAGT,QAAO,MAAM,kBAAkB;;;;;;;;AASjC,SAAgB,0BACd,OACyB;AACzB,KAAI,CAAC,sBAAsB,MAAM,CAC/B,OAAM,IAAI,MAAM,qDAAqD;AAEvE,QAAO"}