1 | import { AsyncIterableX } from './asynciterablex';
|
2 |
|
3 |
|
4 | import '../add/asynciterable-operators/publish';
|
5 | import '../add/asynciterable-operators/todomstream';
|
6 |
|
7 | const SharedArrayBuf = typeof SharedArrayBuffer !== 'undefined' ? SharedArrayBuffer : ArrayBuffer;
|
8 | export class AsyncIterableReadableStream extends AsyncIterableX {
|
9 | constructor(_stream) {
|
10 | super();
|
11 | this._stream = _stream;
|
12 | }
|
13 | [Symbol.asyncIterator]() {
|
14 | const stream = this._stream;
|
15 | const reader = stream['getReader']();
|
16 | return _consumeReader(stream, reader, defaultReaderToAsyncIterator(reader));
|
17 | }
|
18 | }
|
19 | export class AsyncIterableReadableByteStream extends AsyncIterableReadableStream {
|
20 | [Symbol.asyncIterator]() {
|
21 | let stream = this._stream;
|
22 | let reader;
|
23 | try {
|
24 | reader = stream['getReader']({ mode: 'byob' });
|
25 | }
|
26 | catch (e) {
|
27 | return super[Symbol.asyncIterator]();
|
28 | }
|
29 | const iterator = _consumeReader(stream, reader, byobReaderToAsyncIterator(reader));
|
30 |
|
31 | iterator.next();
|
32 | return iterator;
|
33 | }
|
34 | }
|
35 | async function* _consumeReader(stream, reader, iterator) {
|
36 | let threw = false;
|
37 | try {
|
38 | yield* iterator;
|
39 | }
|
40 | catch (e) {
|
41 | if ((threw = true) && reader) {
|
42 | await reader['cancel'](e);
|
43 | }
|
44 | }
|
45 | finally {
|
46 | if (!reader) {
|
47 | return;
|
48 | }
|
49 | if (!threw) {
|
50 | await reader['cancel']();
|
51 | }
|
52 | if (stream.locked) {
|
53 | try {
|
54 | reader.closed.catch(() => {
|
55 |
|
56 | });
|
57 | reader.releaseLock();
|
58 | }
|
59 | catch (e) {
|
60 |
|
61 | }
|
62 | }
|
63 | }
|
64 | }
|
65 |
|
66 | async function* defaultReaderToAsyncIterator(reader) {
|
67 | let r;
|
68 | while (!(r = await reader.read()).done) {
|
69 | yield r.value;
|
70 | }
|
71 | }
|
72 |
|
73 | async function* byobReaderToAsyncIterator(reader) {
|
74 | let r;
|
75 | let value = yield null;
|
76 | while (!(r = await readNext(reader, value, 0)).done) {
|
77 | value = yield r.value;
|
78 | }
|
79 | }
|
80 |
|
81 | async function readNext(reader, bufferOrLen, offset) {
|
82 | let size;
|
83 | let buffer;
|
84 | if (typeof bufferOrLen === 'number') {
|
85 | buffer = new ArrayBuffer((size = bufferOrLen));
|
86 | }
|
87 | else if (bufferOrLen instanceof ArrayBuffer) {
|
88 | size = (buffer = bufferOrLen).byteLength;
|
89 | }
|
90 | else if (bufferOrLen instanceof SharedArrayBuf) {
|
91 | size = (buffer = bufferOrLen).byteLength;
|
92 | }
|
93 | else {
|
94 | return { done: true, value: undefined };
|
95 | }
|
96 | return await readInto(reader, buffer, offset, size);
|
97 | }
|
98 |
|
99 | async function readInto(reader, buffer, offset, size) {
|
100 | if (offset >= size) {
|
101 | return { done: false, value: new Uint8Array(buffer, 0, size) };
|
102 | }
|
103 | const { done, value } = await reader.read(new Uint8Array(buffer, offset, size - offset));
|
104 | if ((offset += value.byteLength) < size && !done) {
|
105 | return await readInto(reader, value.buffer, offset, size);
|
106 | }
|
107 | return { done, value: new Uint8Array(value.buffer, 0, offset) };
|
108 | }
|
109 | export function fromDOMStream(stream, options) {
|
110 | return !options || options.mode !== 'byob'
|
111 | ? new AsyncIterableReadableStream(stream)
|
112 | : new AsyncIterableReadableByteStream(stream);
|
113 | }
|
114 |
|
115 |
|