UNPKG

3.81 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2/** @ignore */
3const SharedArrayBuf = typeof SharedArrayBuffer !== 'undefined' ? SharedArrayBuffer : ArrayBuffer;
4export class AsyncIterableReadableStream extends AsyncIterableX {
5 constructor(_stream) {
6 super();
7 this._stream = _stream;
8 }
9 [Symbol.asyncIterator]() {
10 const stream = this._stream;
11 const reader = stream['getReader']();
12 return _consumeReader(stream, reader, defaultReaderToAsyncIterator(reader));
13 }
14}
15export class AsyncIterableReadableByteStream extends AsyncIterableReadableStream {
16 [Symbol.asyncIterator]() {
17 const stream = this._stream;
18 let reader;
19 try {
20 reader = stream['getReader']({ mode: 'byob' });
21 }
22 catch (e) {
23 return super[Symbol.asyncIterator]();
24 }
25 const iterator = _consumeReader(stream, reader, byobReaderToAsyncIterator(reader));
26 // "pump" the iterator once so it initializes and is ready to accept a buffer or bytesToRead
27 iterator.next();
28 return iterator;
29 }
30}
31async function* _consumeReader(stream, reader, iterator) {
32 let threw = false;
33 try {
34 yield* iterator;
35 }
36 catch (e) {
37 if ((threw = true) && reader) {
38 await reader['cancel'](e);
39 }
40 }
41 finally {
42 if (!reader) {
43 return;
44 }
45 if (!threw) {
46 await reader['cancel']();
47 }
48 if (stream.locked) {
49 try {
50 reader.closed.catch(() => {
51 /* */
52 });
53 reader.releaseLock();
54 }
55 catch (e) {
56 /* */
57 }
58 }
59 }
60}
61/** @ignore */
62async function* defaultReaderToAsyncIterator(reader) {
63 let r;
64 while (!(r = await reader.read()).done) {
65 yield r.value;
66 }
67}
68/** @ignore */
69async function* byobReaderToAsyncIterator(reader) {
70 let r;
71 let value = yield null;
72 while (!(r = await readNext(reader, value, 0)).done) {
73 value = yield r.value;
74 }
75}
76/** @ignore */
77async function readNext(reader, bufferOrLen, offset) {
78 let size;
79 let buffer;
80 if (typeof bufferOrLen === 'number') {
81 buffer = new ArrayBuffer((size = bufferOrLen));
82 }
83 else if (bufferOrLen instanceof ArrayBuffer) {
84 size = (buffer = bufferOrLen).byteLength;
85 }
86 else if (bufferOrLen instanceof SharedArrayBuf) {
87 size = (buffer = bufferOrLen).byteLength;
88 }
89 else {
90 return { done: true, value: undefined };
91 }
92 return await readInto(reader, buffer, offset, size);
93}
94/** @ignore */
95async function readInto(reader, buffer, offset, size) {
96 let innerOffset = offset;
97 if (innerOffset >= size) {
98 return { done: false, value: new Uint8Array(buffer, 0, size) };
99 }
100 const { done, value } = await reader.read(new Uint8Array(buffer, innerOffset, size - innerOffset));
101 if ((innerOffset += value.byteLength) < size && !done) {
102 return await readInto(reader, value.buffer, innerOffset, size);
103 }
104 return { done, value: new Uint8Array(value.buffer, 0, innerOffset) };
105}
106/**
107 * Creates an async-iterable from an existing DOM stream and optional options.
108 *
109 * @export
110 * @param {ReadableStream} stream The readable stream to convert to an async-iterable.
111 * @param {{ mode: 'byob' }} [options] The optional options to set the mode for the DOM stream.
112 * @returns {AsyncIterableX<any>} An async-iterable created from the incoming async-iterable.
113 */
114export function fromDOMStream(stream, options) {
115 return !options || options.mode !== 'byob'
116 ? new AsyncIterableReadableStream(stream)
117 : new AsyncIterableReadableByteStream(stream);
118}
119
120//# sourceMappingURL=fromdomstream.mjs.map