UNPKG

2.46 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2const NON_FLOWING = 0;
3const READABLE = 1;
4const ENDED = 2;
5const ERRORED = 3;
6export class ReadableStreamAsyncIterable extends AsyncIterableX {
7 constructor(stream, size) {
8 super();
9 this._stream = stream;
10 this._defaultSize = size;
11 this._state = NON_FLOWING;
12 this._error = null;
13 this._rejectFns = new Set();
14 const onError = (err) => {
15 this._state = ERRORED;
16 this._error = err;
17 for (const rejectFn of this._rejectFns) {
18 rejectFn(err);
19 }
20 };
21 const onEnd = () => {
22 this._state = ENDED;
23 };
24 this._stream['once']('error', onError);
25 this._stream['once']('end', onEnd);
26 }
27 [Symbol.asyncIterator]() {
28 return this;
29 }
30 async next(size = this._defaultSize) {
31 if (this._state === NON_FLOWING) {
32 await Promise.race([this._waitReadable(), this._waitEnd()]);
33 return await this.next(size);
34 }
35 if (this._state === ENDED) {
36 return { done: true, value: undefined };
37 }
38 if (this._state === ERRORED) {
39 throw this._error;
40 }
41 const value = this._stream['read'](size);
42 if (value !== null) {
43 return { done: false, value };
44 }
45 else {
46 this._state = NON_FLOWING;
47 return await this.next(size);
48 }
49 }
50 _waitReadable() {
51 return new Promise((resolve, reject) => {
52 const onReadable = () => {
53 this._state = READABLE;
54 this._rejectFns.delete(reject);
55 resolve();
56 };
57 this._rejectFns.add(reject);
58 this._stream['once']('readable', onReadable);
59 });
60 }
61 _waitEnd() {
62 if (!this._endPromise) {
63 this._endPromise = new Promise((resolve, reject) => {
64 const onEnd = () => {
65 this._state = ENDED;
66 this._rejectFns.delete(reject);
67 resolve();
68 };
69 this._rejectFns.add(reject);
70 this._stream['once']('end', onEnd);
71 });
72 }
73 return this._endPromise;
74 }
75}
76export function fromNodeStream(stream, size) {
77 return new ReadableStreamAsyncIterable(stream, size);
78}
79
80//# sourceMappingURL=fromnodestream.mjs.map