UNPKG

2.87 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { safeRace } from '../util/safeRace';
3const NON_FLOWING = 0;
4const READABLE = 1;
5const ENDED = 2;
6const ERRORED = 3;
7export class ReadableStreamAsyncIterable extends AsyncIterableX {
8 constructor(stream, size) {
9 super();
10 this._stream = stream;
11 this._defaultSize = size;
12 this._state = NON_FLOWING;
13 this._error = null;
14 this._rejectFns = new Set();
15 const onError = (err) => {
16 this._state = ERRORED;
17 this._error = err;
18 for (const rejectFn of this._rejectFns) {
19 rejectFn(err);
20 }
21 };
22 const onEnd = () => {
23 this._state = ENDED;
24 };
25 this._stream['once']('error', onError);
26 this._stream['once']('end', onEnd);
27 }
28 [Symbol.asyncIterator]() {
29 return this;
30 }
31 async next(size = this._defaultSize) {
32 if (this._state === NON_FLOWING) {
33 await safeRace([this._waitReadable(), this._waitEnd()]);
34 return await this.next(size);
35 }
36 if (this._state === ENDED) {
37 return { done: true, value: undefined };
38 }
39 if (this._state === ERRORED) {
40 throw this._error;
41 }
42 const value = this._stream['read'](size);
43 if (value !== null) {
44 return { done: false, value };
45 }
46 else {
47 this._state = NON_FLOWING;
48 return await this.next(size);
49 }
50 }
51 _waitReadable() {
52 return new Promise((resolve, reject) => {
53 const onReadable = () => {
54 this._state = READABLE;
55 this._rejectFns.delete(reject);
56 resolve();
57 };
58 this._rejectFns.add(reject);
59 this._stream['once']('readable', onReadable);
60 });
61 }
62 _waitEnd() {
63 if (!this._endPromise) {
64 this._endPromise = new Promise((resolve, reject) => {
65 const onEnd = () => {
66 this._state = ENDED;
67 this._rejectFns.delete(reject);
68 resolve();
69 };
70 this._rejectFns.add(reject);
71 this._stream['once']('end', onEnd);
72 });
73 }
74 return this._endPromise;
75 }
76}
77/**
78 * Creates a new async-iterable from a Node.js stream.
79 *
80 * @export
81 * @param {NodeJS.ReadableStream} stream The Node.js stream to convert to an async-iterable.
82 * @param {number} [size] The size of the buffers for the stream.
83 * @returns {(AsyncIterableX<string | Buffer>)} An async-iterable containing data from the stream either in string or Buffer format.
84 */
85export function fromNodeStream(stream, size) {
86 return new ReadableStreamAsyncIterable(stream, size);
87}
88
89//# sourceMappingURL=fromnodestream.mjs.map