UNPKG

2.15 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { Readable } from 'stream';
3const done = async (_) => null;
4export class AsyncIterableReadable extends Readable {
5 constructor(source, options) {
6 super(options);
7 this._pulling = false;
8 this._objectMode = true;
9 this._iterator = source[Symbol.asyncIterator]();
10 this._objectMode = !options || !!options.objectMode;
11 }
12 _read(size) {
13 const it = this._iterator;
14 if (it && !this._pulling && (this._pulling = true)) {
15 Promise.resolve(this._pull(it, size)).then((p) => (this._pulling = p));
16 }
17 }
18 _destroy(err, cb) {
19 const it = this._iterator;
20 this._iterator = undefined;
21 const fn = (it && (err ? it.throw : it.return)) || done;
22 fn.call(it, err).then(() => cb && cb(null));
23 }
24 // eslint-disable-next-line complexity
25 async _pull(it, size) {
26 let innerSize = size;
27 const objectMode = this._objectMode;
28 let r;
29 while (this.readable && !(r = await it.next(innerSize)).done) {
30 if (innerSize != null) {
31 if (objectMode) {
32 innerSize -= 1;
33 }
34 else {
35 innerSize -= Buffer.byteLength(r.value || '');
36 }
37 }
38 if (!this.push(r.value) || innerSize <= 0) {
39 break;
40 }
41 }
42 if ((r && r.done) || !this.readable) {
43 this.push(null);
44 if (it.return) {
45 await it.return();
46 }
47 }
48 return !this.readable;
49 }
50}
51export function toNodeStream(source, options) {
52 return !options || options.objectMode === true
53 ? new AsyncIterableReadable(source, options)
54 : new AsyncIterableReadable(source, options);
55}
56export function toNodeStreamProto(options) {
57 return !options || options.objectMode === true
58 ? new AsyncIterableReadable(this, options)
59 : new AsyncIterableReadable(this, options);
60}
61AsyncIterableX.prototype.toNodeStream = toNodeStreamProto;
62
63//# sourceMappingURL=tonodestream.mjs.map