1 | import { IterableX } from '../iterable/iterablex';
|
2 | import { Readable } from 'stream';
|
3 | const done = (_) => null;
|
4 | export class IterableReadable extends Readable {
|
5 | constructor(source, options) {
|
6 | super(options);
|
7 | this._pulling = false;
|
8 | this._objectMode = true;
|
9 | this._iterator = source[Symbol.iterator]();
|
10 | this._objectMode = !options || !!options.objectMode;
|
11 | }
|
12 | _read(size) {
|
13 | const it = this._iterator;
|
14 | if (it && !this._pulling && (this._pulling = true)) {
|
15 | this._pulling = this._pull(it, size);
|
16 | }
|
17 | }
|
18 | _destroy(err, cb) {
|
19 | const it = this._iterator;
|
20 | this._iterator = undefined;
|
21 | const fn = (it && (err ? it.throw : it.return)) || done;
|
22 | fn.call(it, err);
|
23 | if (cb) {
|
24 | cb(null);
|
25 | }
|
26 | }
|
27 |
|
28 | _pull(it, size) {
|
29 | let innerSize = size;
|
30 | const objectMode = this._objectMode;
|
31 | let r;
|
32 | while (this.readable && !(r = it.next(innerSize)).done) {
|
33 | if (innerSize != null) {
|
34 | if (objectMode) {
|
35 | innerSize -= 1;
|
36 | }
|
37 | else {
|
38 | innerSize -= Buffer.byteLength(r.value || '');
|
39 | }
|
40 | }
|
41 | if (!this.push(r.value) || innerSize <= 0) {
|
42 | break;
|
43 | }
|
44 | }
|
45 | if ((r && r.done) || !this.readable) {
|
46 | this.push(null);
|
47 | if (it.return) {
|
48 | it.return();
|
49 | }
|
50 | }
|
51 | return !this.readable;
|
52 | }
|
53 | }
|
54 | export function toNodeStream(source, options) {
|
55 | return !options || options.objectMode === true
|
56 | ? new IterableReadable(source, options)
|
57 | : new IterableReadable(source, options);
|
58 | }
|
59 | export function toNodeStreamProto(options) {
|
60 | return !options || options.objectMode === true
|
61 | ? new IterableReadable(this, options)
|
62 | : new IterableReadable(this, options);
|
63 | }
|
64 | IterableX.prototype.toNodeStream = toNodeStreamProto;
|
65 |
|
66 |
|