1 | import { Readable } from 'stream';
|
2 | const done = async (_) => null;
|
3 | export class AsyncIterableReadable extends Readable {
|
4 | constructor(source, options) {
|
5 | super(options);
|
6 | this._pulling = false;
|
7 | this._objectMode = true;
|
8 | this._iterator = source[Symbol.asyncIterator]();
|
9 | this._objectMode = !options || !!options.objectMode;
|
10 | }
|
11 | _read(size) {
|
12 | const it = this._iterator;
|
13 | if (it && !this._pulling && (this._pulling = true)) {
|
14 | Promise.resolve(this._pull(it, size)).then(p => (this._pulling = p));
|
15 | }
|
16 | }
|
17 | _destroy(err, cb) {
|
18 | const it = this._iterator;
|
19 | this._iterator = undefined;
|
20 | const fn = (it && (err ? it.throw : it.return)) || done;
|
21 | fn.call(it, err).then(() => cb && cb(null));
|
22 | }
|
23 | async _pull(it, size) {
|
24 | const objectMode = this._objectMode;
|
25 | let r;
|
26 | while (this.readable && !(r = await it.next(size)).done) {
|
27 | if (size != null) {
|
28 | if (objectMode) {
|
29 | size -= 1;
|
30 | }
|
31 | else {
|
32 | size -= Buffer.byteLength(r.value || '');
|
33 | }
|
34 | }
|
35 | if (!this.push(r.value) || size <= 0) {
|
36 | break;
|
37 | }
|
38 | }
|
39 | if ((r && r.done) || !this.readable) {
|
40 | this.push(null);
|
41 | if (it.return) {
|
42 | await it.return();
|
43 | }
|
44 | }
|
45 | return !this.readable;
|
46 | }
|
47 | }
|
48 | export function toNodeStream(source, options) {
|
49 | return !options || options.objectMode === true
|
50 | ? new AsyncIterableReadable(source, options)
|
51 | : new AsyncIterableReadable(source, options);
|
52 | }
|
53 |
|
54 |
|