1 | import { AsyncIterableX } from './asynciterablex';
|
2 |
|
3 | const SharedArrayBuf = typeof SharedArrayBuffer !== 'undefined' ? SharedArrayBuffer : ArrayBuffer;
|
4 | export class AsyncIterableReadableStream extends AsyncIterableX {
|
5 | constructor(_stream) {
|
6 | super();
|
7 | this._stream = _stream;
|
8 | }
|
9 | [Symbol.asyncIterator]() {
|
10 | const stream = this._stream;
|
11 | const reader = stream['getReader']();
|
12 | return _consumeReader(stream, reader, defaultReaderToAsyncIterator(reader));
|
13 | }
|
14 | }
|
15 | export class AsyncIterableReadableByteStream extends AsyncIterableReadableStream {
|
16 | [Symbol.asyncIterator]() {
|
17 | const stream = this._stream;
|
18 | let reader;
|
19 | try {
|
20 | reader = stream['getReader']({ mode: 'byob' });
|
21 | }
|
22 | catch (e) {
|
23 | return super[Symbol.asyncIterator]();
|
24 | }
|
25 | const iterator = _consumeReader(stream, reader, byobReaderToAsyncIterator(reader));
|
26 |
|
27 | iterator.next();
|
28 | return iterator;
|
29 | }
|
30 | }
|
31 | async function* _consumeReader(stream, reader, iterator) {
|
32 | let threw = false;
|
33 | try {
|
34 | yield* iterator;
|
35 | }
|
36 | catch (e) {
|
37 | if ((threw = true) && reader) {
|
38 | await reader['cancel'](e);
|
39 | }
|
40 | }
|
41 | finally {
|
42 | if (!reader) {
|
43 | return;
|
44 | }
|
45 | if (!threw) {
|
46 | await reader['cancel']();
|
47 | }
|
48 | if (stream.locked) {
|
49 | try {
|
50 | reader.closed.catch(() => {
|
51 |
|
52 | });
|
53 | reader.releaseLock();
|
54 | }
|
55 | catch (e) {
|
56 |
|
57 | }
|
58 | }
|
59 | }
|
60 | }
|
61 |
|
62 | async function* defaultReaderToAsyncIterator(reader) {
|
63 | let r;
|
64 | while (!(r = await reader.read()).done) {
|
65 | yield r.value;
|
66 | }
|
67 | }
|
68 |
|
69 | async function* byobReaderToAsyncIterator(reader) {
|
70 | let r;
|
71 | let value = yield null;
|
72 | while (!(r = await readNext(reader, value, 0)).done) {
|
73 | value = yield r.value;
|
74 | }
|
75 | }
|
76 |
|
77 | async function readNext(reader, bufferOrLen, offset) {
|
78 | let size;
|
79 | let buffer;
|
80 | if (typeof bufferOrLen === 'number') {
|
81 | buffer = new ArrayBuffer((size = bufferOrLen));
|
82 | }
|
83 | else if (bufferOrLen instanceof ArrayBuffer) {
|
84 | size = (buffer = bufferOrLen).byteLength;
|
85 | }
|
86 | else if (bufferOrLen instanceof SharedArrayBuf) {
|
87 | size = (buffer = bufferOrLen).byteLength;
|
88 | }
|
89 | else {
|
90 | return { done: true, value: undefined };
|
91 | }
|
92 | return await readInto(reader, buffer, offset, size);
|
93 | }
|
94 |
|
95 | async function readInto(reader, buffer, offset, size) {
|
96 | let innerOffset = offset;
|
97 | if (innerOffset >= size) {
|
98 | return { done: false, value: new Uint8Array(buffer, 0, size) };
|
99 | }
|
100 | const { done, value } = await reader.read(new Uint8Array(buffer, innerOffset, size - innerOffset));
|
101 | if ((innerOffset += value.byteLength) < size && !done) {
|
102 | return await readInto(reader, value.buffer, innerOffset, size);
|
103 | }
|
104 | return { done, value: new Uint8Array(value.buffer, 0, innerOffset) };
|
105 | }
|
106 | export function fromDOMStream(stream, options) {
|
107 | return !options || options.mode !== 'byob'
|
108 | ? new AsyncIterableReadableStream(stream)
|
109 | : new AsyncIterableReadableByteStream(stream);
|
110 | }
|
111 |
|
112 |
|