UNPKG

3.6 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2// To work around circular-dependency hell, these need to be on
3// the AsyncIterable prototype for tee, pipeTo, and pipeThrough
4import '../add/asynciterable-operators/publish';
5import '../add/asynciterable-operators/todomstream';
6/** @ignore */
7const SharedArrayBuf = typeof SharedArrayBuffer !== 'undefined' ? SharedArrayBuffer : ArrayBuffer;
8export class AsyncIterableReadableStream extends AsyncIterableX {
9 constructor(_stream) {
10 super();
11 this._stream = _stream;
12 }
13 [Symbol.asyncIterator]() {
14 const stream = this._stream;
15 const reader = stream['getReader']();
16 return _consumeReader(stream, reader, defaultReaderToAsyncIterator(reader));
17 }
18}
19export class AsyncIterableReadableByteStream extends AsyncIterableReadableStream {
20 [Symbol.asyncIterator]() {
21 let stream = this._stream;
22 let reader;
23 try {
24 reader = stream['getReader']({ mode: 'byob' });
25 }
26 catch (e) {
27 return super[Symbol.asyncIterator]();
28 }
29 const iterator = _consumeReader(stream, reader, byobReaderToAsyncIterator(reader));
30 // "pump" the iterator once so it initializes and is ready to accept a buffer or bytesToRead
31 iterator.next();
32 return iterator;
33 }
34}
35async function* _consumeReader(stream, reader, iterator) {
36 let threw = false;
37 try {
38 yield* iterator;
39 }
40 catch (e) {
41 if ((threw = true) && reader) {
42 await reader['cancel'](e);
43 }
44 }
45 finally {
46 if (!reader) {
47 return;
48 }
49 if (!threw) {
50 await reader['cancel']();
51 }
52 if (stream.locked) {
53 try {
54 reader.closed.catch(() => {
55 /* */
56 });
57 reader.releaseLock();
58 }
59 catch (e) {
60 /* */
61 }
62 }
63 }
64}
65/** @ignore */
66async function* defaultReaderToAsyncIterator(reader) {
67 let r;
68 while (!(r = await reader.read()).done) {
69 yield r.value;
70 }
71}
72/** @ignore */
73async function* byobReaderToAsyncIterator(reader) {
74 let r;
75 let value = yield null;
76 while (!(r = await readNext(reader, value, 0)).done) {
77 value = yield r.value;
78 }
79}
80/** @ignore */
81async function readNext(reader, bufferOrLen, offset) {
82 let size;
83 let buffer;
84 if (typeof bufferOrLen === 'number') {
85 buffer = new ArrayBuffer((size = bufferOrLen));
86 }
87 else if (bufferOrLen instanceof ArrayBuffer) {
88 size = (buffer = bufferOrLen).byteLength;
89 }
90 else if (bufferOrLen instanceof SharedArrayBuf) {
91 size = (buffer = bufferOrLen).byteLength;
92 }
93 else {
94 return { done: true, value: undefined };
95 }
96 return await readInto(reader, buffer, offset, size);
97}
98/** @ignore */
99async function readInto(reader, buffer, offset, size) {
100 if (offset >= size) {
101 return { done: false, value: new Uint8Array(buffer, 0, size) };
102 }
103 const { done, value } = await reader.read(new Uint8Array(buffer, offset, size - offset));
104 if ((offset += value.byteLength) < size && !done) {
105 return await readInto(reader, value.buffer, offset, size);
106 }
107 return { done, value: new Uint8Array(value.buffer, 0, offset) };
108}
109export function fromDOMStream(stream, options) {
110 return !options || options.mode !== 'byob'
111 ? new AsyncIterableReadableStream(stream)
112 : new AsyncIterableReadableByteStream(stream);
113}
114
115//# sourceMappingURL=fromdomstream.mjs.map