UNPKG

1.74 kBJavaScriptView Raw
1import { Readable } from 'stream';
2const done = async (_) => null;
3export class AsyncIterableReadable extends Readable {
4 constructor(source, options) {
5 super(options);
6 this._pulling = false;
7 this._objectMode = true;
8 this._iterator = source[Symbol.asyncIterator]();
9 this._objectMode = !options || !!options.objectMode;
10 }
11 _read(size) {
12 const it = this._iterator;
13 if (it && !this._pulling && (this._pulling = true)) {
14 Promise.resolve(this._pull(it, size)).then(p => (this._pulling = p));
15 }
16 }
17 _destroy(err, cb) {
18 const it = this._iterator;
19 this._iterator = undefined;
20 const fn = (it && (err ? it.throw : it.return)) || done;
21 fn.call(it, err).then(() => cb && cb(null));
22 }
23 async _pull(it, size) {
24 const objectMode = this._objectMode;
25 let r;
26 while (this.readable && !(r = await it.next(size)).done) {
27 if (size != null) {
28 if (objectMode) {
29 size -= 1;
30 }
31 else {
32 size -= Buffer.byteLength(r.value || '');
33 }
34 }
35 if (!this.push(r.value) || size <= 0) {
36 break;
37 }
38 }
39 if ((r && r.done) || !this.readable) {
40 this.push(null);
41 if (it.return) {
42 await it.return();
43 }
44 }
45 return !this.readable;
46 }
47}
48export function toNodeStream(source, options) {
49 return !options || options.objectMode === true
50 ? new AsyncIterableReadable(source, options)
51 : new AsyncIterableReadable(source, options);
52}
53
54//# sourceMappingURL=tonodestream.mjs.map