1 |
|
2 | import * as dntShim from "./_dnt.shims.js";
|
3 |
|
4 | import { asyncIterToStream, streamToAsyncIter } from 'whatwg-stream-to-async-iter';
|
5 | import { concatUint8Arrays } from 'typed-array-utils'
|
6 | import { aMap, aJoin, collect, promiseToStream, ForAwaitable } from './iter.js';
|
7 |
|
8 | export type StreamBodyInit = ForAwaitable<string> | ReadableStream<string>;
|
9 | export type ByteStreamBodyInit = ForAwaitable<Uint8Array> | ReadableStream<Uint8Array>;
|
10 |
|
11 | const maybeAsyncIterToStream = <T>(x: ForAwaitable<T> | ReadableStream<T>) =>
|
12 | x instanceof ReadableStream ? x : asyncIterToStream(x);
|
13 |
|
14 | const maybeStreamToAsyncIter = <T>(x: ForAwaitable<T> | ReadableStream<T>) =>
|
15 | x instanceof ReadableStream ? streamToAsyncIter(x) : x;
|
16 |
|
17 |
|
18 | const isCFWorkers = (<any>globalThis.navigator)?.userAgent?.includes('Cloudflare-Workers')
|
19 | || !('TextEncoderStream' in dntShim.dntGlobalThis)
|
20 |
|
21 |
|
22 |
|
23 | const stringStreamToByteStream: (body: StreamBodyInit) => ReadableStream<Uint8Array> = isCFWorkers
|
24 | ? body => {
|
25 | const encoder = new TextEncoder();
|
26 | return asyncIterToStream(aMap(maybeStreamToAsyncIter(body), x => encoder.encode(x)))
|
27 | }
|
28 | : body => maybeAsyncIterToStream(body).pipeThrough(new TextEncoderStream())
|
29 |
|
30 | const CONTENT_TYPE = 'content-type'
|
31 | const OCTET_STREAM = 'application/octet-stream'
|
32 |
|
33 | export class StreamResponse extends Response {
|
34 | constructor(body?: StreamBodyInit | null, init?: ResponseInit) {
|
35 | super(body && stringStreamToByteStream(body), init)
|
36 | if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
37 | }
|
38 | }
|
39 |
|
40 | export class ByteStreamResponse extends Response {
|
41 | constructor(body?: ByteStreamBodyInit | null, init?: ResponseInit) {
|
42 | super(body && maybeAsyncIterToStream(body), init)
|
43 | if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
44 | }
|
45 | }
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 | export class BufferedStreamResponse extends Response {
|
53 | constructor(body?: StreamBodyInit | null, init?: ResponseInit) {
|
54 | super(body && promiseToStream(
|
55 | aJoin(maybeStreamToAsyncIter(body)).then(str => new TextEncoder().encode(str))
|
56 | ), init);
|
57 | if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
58 | }
|
59 | }
|
60 |
|
61 | export class BufferedByteStreamResponse extends Response {
|
62 | constructor(body?: ByteStreamBodyInit | null, init?: ResponseInit) {
|
63 | super(body && promiseToStream(
|
64 | collect(maybeStreamToAsyncIter(body)).then(chunks => concatUint8Arrays(...chunks))
|
65 | ), init);
|
66 | if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
67 | }
|
68 | }
|
69 |
|
70 | export { BufferedStreamResponse as BufferedResponse }
|
71 |
|
72 | export type StreamRequestInit = Omit<RequestInit, 'body'> & { body?: StreamBodyInit }
|
73 | export type ByteStreamRequestInit = Omit<RequestInit, 'body'> & { body?: ByteStreamBodyInit }
|
74 |
|
75 | export class StreamRequest extends Request {
|
76 | constructor(input: RequestInfo, init?: StreamRequestInit) {
|
77 | const { body, ...rest } = init || {};
|
78 | super(input, {
|
79 | ...body ? { body: stringStreamToByteStream(body) } : {},
|
80 | ...rest,
|
81 | });
|
82 | if (body && !this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
83 | }
|
84 | }
|
85 |
|
86 | export class ByteStreamRequest extends Request {
|
87 | constructor(input: RequestInfo, init?: ByteStreamRequestInit) {
|
88 | const { body, ...rest } = init || {};
|
89 | super(input, {
|
90 | ...body ? { body: maybeAsyncIterToStream(body) } : {},
|
91 | ...rest,
|
92 | });
|
93 | if (body && !this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
|
94 | }
|
95 | }
|
96 |
|
97 |
|
98 |
|