UNPKG

4.02 kBPlain TextView Raw
1// deno-lint-ignore-file no-explicit-any
2import * as dntShim from "./_dnt.shims.js";
3
4import { asyncIterToStream, streamToAsyncIter } from 'whatwg-stream-to-async-iter';
5import { concatUint8Arrays } from 'typed-array-utils'
6import { aMap, aJoin, collect, promiseToStream, ForAwaitable } from './iter.js';
7
8export type StreamBodyInit = ForAwaitable<string> | ReadableStream<string>;
9export type ByteStreamBodyInit = ForAwaitable<Uint8Array> | ReadableStream<Uint8Array>;
10
11const maybeAsyncIterToStream = <T>(x: ForAwaitable<T> | ReadableStream<T>) =>
12 x instanceof ReadableStream ? x : asyncIterToStream(x);
13
14const maybeStreamToAsyncIter = <T>(x: ForAwaitable<T> | ReadableStream<T>) =>
15 x instanceof ReadableStream ? streamToAsyncIter(x) : x;
16
17// FIXME: add exception for newer versions that support streams correctly!?
18const isCFWorkers = (<any>globalThis.navigator)?.userAgent?.includes('Cloudflare-Workers')
19 || !('TextEncoderStream' in dntShim.dntGlobalThis)
20
21// CF Workers doesn't support non-binary Transform Streams,
22// so we use a version that does the byte encoding in a async iterator instead:
23const stringStreamToByteStream: (body: StreamBodyInit) => ReadableStream<Uint8Array> = isCFWorkers
24 ? body => {
25 const encoder = new TextEncoder();
26 return asyncIterToStream(aMap(maybeStreamToAsyncIter(body), x => encoder.encode(x)))
27 }
28 : body => maybeAsyncIterToStream(body).pipeThrough(new TextEncoderStream())
29
30const CONTENT_TYPE = 'content-type'
31const OCTET_STREAM = 'application/octet-stream'
32
33export class StreamResponse extends Response {
34 constructor(body?: StreamBodyInit | null, init?: ResponseInit) {
35 super(body && stringStreamToByteStream(body), init)
36 if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
37 }
38}
39
40export class ByteStreamResponse extends Response {
41 constructor(body?: ByteStreamBodyInit | null, init?: ResponseInit) {
42 super(body && maybeAsyncIterToStream(body), init)
43 if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
44 }
45}
46
47/**
48 * If for any reason you don't want to use streaming response bodies,
49 * you can use this class instead, which will buffer the entire body before releasing it to the network.
50 * Note that headers will still be sent immediately.
51 */
52export class BufferedStreamResponse extends Response {
53 constructor(body?: StreamBodyInit | null, init?: ResponseInit) {
54 super(body && promiseToStream(
55 aJoin(maybeStreamToAsyncIter(body)).then(str => new TextEncoder().encode(str))
56 ), init);
57 if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
58 }
59}
60
61export class BufferedByteStreamResponse extends Response {
62 constructor(body?: ByteStreamBodyInit | null, init?: ResponseInit) {
63 super(body && promiseToStream(
64 collect(maybeStreamToAsyncIter(body)).then(chunks => concatUint8Arrays(...chunks))
65 ), init);
66 if (!this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
67 }
68}
69
70export { BufferedStreamResponse as BufferedResponse }
71
72export type StreamRequestInit = Omit<RequestInit, 'body'> & { body?: StreamBodyInit }
73export type ByteStreamRequestInit = Omit<RequestInit, 'body'> & { body?: ByteStreamBodyInit }
74
75export class StreamRequest extends Request {
76 constructor(input: RequestInfo, init?: StreamRequestInit) {
77 const { body, ...rest } = init || {};
78 super(input, {
79 ...body ? { body: stringStreamToByteStream(body) } : {},
80 ...rest,
81 });
82 if (body && !this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
83 }
84}
85
86export class ByteStreamRequest extends Request {
87 constructor(input: RequestInfo, init?: ByteStreamRequestInit) {
88 const { body, ...rest } = init || {};
89 super(input, {
90 ...body ? { body: maybeAsyncIterToStream(body) } : {},
91 ...rest,
92 });
93 if (body && !this.headers.has(CONTENT_TYPE)) this.headers.set(CONTENT_TYPE, OCTET_STREAM)
94 }
95}
96
97// TODO: BufferedStreamRequest...
98// TODO: BufferedByteStreamRequest...