1 | import { AsyncSink } from './../asyncsink';
|
2 | import { bindCallback } from '../internal/bindcallback';
|
3 | import { identityAsync } from '../internal/identity';
|
4 | import { toLength } from '../internal/tolength';
|
5 | import { isArrayLike, isIterable, isAsyncIterable, isReadableNodeStream, isWritableNodeStream } from '../internal/isiterable';
|
6 |
|
7 |
|
8 |
|
9 | export class AsyncIterableX {
|
10 | async forEach(projection, thisArg) {
|
11 | const fn = bindCallback(projection, thisArg, 2);
|
12 | let i = 0;
|
13 | for await (let item of this) {
|
14 | await fn(item, i++);
|
15 | }
|
16 | }
|
17 | pipe(...args) {
|
18 | let i = -1;
|
19 | let n = args.length;
|
20 | let acc = this;
|
21 | let as = AsyncIterableX.as;
|
22 | while (++i < n) {
|
23 | acc = as(args[i](acc));
|
24 | }
|
25 | return acc;
|
26 | }
|
27 | tee() {
|
28 | return this._getDOMStream().tee();
|
29 | }
|
30 | pipeTo(writable, options) {
|
31 | return this._getDOMStream().pipeTo(writable, options);
|
32 | }
|
33 | pipeThrough(duplex, options) {
|
34 | return this._getDOMStream().pipeThrough(duplex, options);
|
35 | }
|
36 | _getDOMStream() {
|
37 | return this._DOMStream || (this._DOMStream = this.publish().toDOMStream());
|
38 | }
|
39 |
|
40 | static as(source) {
|
41 |
|
42 | if (source instanceof AsyncIterableX) {
|
43 | return source;
|
44 | }
|
45 | if (typeof source === 'string') {
|
46 | return new OfAsyncIterable([source]);
|
47 | }
|
48 | if (isIterable(source) || isAsyncIterable(source)) {
|
49 | return new FromAsyncIterable(source, identityAsync);
|
50 | }
|
51 | if (isPromise(source)) {
|
52 | return new FromPromiseIterable(source, identityAsync);
|
53 | }
|
54 | if (isObservable(source)) {
|
55 | return new FromObservableAsyncIterable(source, identityAsync);
|
56 | }
|
57 | if (isArrayLike(source)) {
|
58 | return new FromArrayIterable(source, identityAsync);
|
59 | }
|
60 | return new OfAsyncIterable([source]);
|
61 |
|
62 | }
|
63 |
|
64 | static from(source, selector = identityAsync, thisArg) {
|
65 | const fn = bindCallback(selector, thisArg, 2);
|
66 |
|
67 | if (isIterable(source) || isAsyncIterable(source)) {
|
68 | return new FromAsyncIterable(source, fn);
|
69 | }
|
70 | if (isPromise(source)) {
|
71 | return new FromPromiseIterable(source, fn);
|
72 | }
|
73 | if (isObservable(source)) {
|
74 | return new FromObservableAsyncIterable(source, fn);
|
75 | }
|
76 | if (isArrayLike(source)) {
|
77 | return new FromArrayIterable(source, fn);
|
78 | }
|
79 | throw new TypeError('Input type not supported');
|
80 |
|
81 | }
|
82 |
|
83 | static of(...args) {
|
84 |
|
85 | return new OfAsyncIterable(args);
|
86 | }
|
87 | }
|
88 | class FromArrayIterable extends AsyncIterableX {
|
89 | constructor(source, selector) {
|
90 | super();
|
91 | this._source = source;
|
92 | this._selector = selector;
|
93 | }
|
94 | async *[Symbol.asyncIterator]() {
|
95 | let i = 0;
|
96 | const length = toLength(this._source.length);
|
97 | while (i < length) {
|
98 | yield await this._selector(this._source[i], i++);
|
99 | }
|
100 | }
|
101 | }
|
102 | class FromAsyncIterable extends AsyncIterableX {
|
103 | constructor(source, selector) {
|
104 | super();
|
105 | this._source = source;
|
106 | this._selector = selector;
|
107 | }
|
108 | async *[Symbol.asyncIterator]() {
|
109 | let i = 0;
|
110 | for await (let item of this._source) {
|
111 | yield await this._selector(item, i++);
|
112 | }
|
113 | }
|
114 | }
|
115 | class FromPromiseIterable extends AsyncIterableX {
|
116 | constructor(source, selector) {
|
117 | super();
|
118 | this._source = source;
|
119 | this._selector = selector;
|
120 | }
|
121 | async *[Symbol.asyncIterator]() {
|
122 | const item = await this._source;
|
123 | yield await this._selector(item, 0);
|
124 | }
|
125 | }
|
126 | class FromObservableAsyncIterable extends AsyncIterableX {
|
127 | constructor(observable, selector) {
|
128 | super();
|
129 | this._observable = observable;
|
130 | this._selector = selector;
|
131 | }
|
132 | async *[Symbol.asyncIterator]() {
|
133 | const sink = new AsyncSink();
|
134 | const subscription = this._observable.subscribe({
|
135 | next(value) {
|
136 | sink.write(value);
|
137 | },
|
138 | error(err) {
|
139 | sink.error(err);
|
140 | },
|
141 | complete() {
|
142 | sink.end();
|
143 | }
|
144 | });
|
145 | let i = 0;
|
146 | try {
|
147 | for (let next; !(next = await sink.next()).done;) {
|
148 | yield await this._selector(next.value, i++);
|
149 | }
|
150 | }
|
151 | finally {
|
152 | subscription.unsubscribe();
|
153 | }
|
154 | }
|
155 | }
|
156 | function isPromise(x) {
|
157 | return x != null && Object(x) === x && typeof x['then'] === 'function';
|
158 | }
|
159 | function isObservable(x) {
|
160 | return x != null && Object(x) === x && typeof x['subscribe'] === 'function';
|
161 | }
|
162 | class OfAsyncIterable extends AsyncIterableX {
|
163 | constructor(args) {
|
164 | super();
|
165 | this._args = args;
|
166 | }
|
167 | async *[Symbol.asyncIterator]() {
|
168 | for (let item of this._args) {
|
169 | yield item;
|
170 | }
|
171 | }
|
172 | }
|
173 | try {
|
174 | (isBrowser => {
|
175 | if (isBrowser) {
|
176 | return;
|
177 | }
|
178 | const as = AsyncIterableX.as;
|
179 | AsyncIterableX.prototype.pipe = nodePipe;
|
180 | const readableOpts = (x, opts = x._writableState || { objectMode: true }) => opts;
|
181 | function nodePipe(...args) {
|
182 | let i = -1;
|
183 | let end;
|
184 | let n = args.length;
|
185 | let prev = this;
|
186 | let next;
|
187 | while (++i < n) {
|
188 | next = args[i];
|
189 | if (typeof next === 'function') {
|
190 | prev = as(next(prev));
|
191 | }
|
192 | else if (isWritableNodeStream(next)) {
|
193 | ({ end = true } = args[i + 1] || {});
|
194 |
|
195 | return isReadableNodeStream(prev) ? prev.pipe(next, { end }) :
|
196 | prev.toNodeStream(readableOpts(next)).pipe(next, { end });
|
197 | }
|
198 | }
|
199 | return prev;
|
200 | }
|
201 | })(typeof window === 'object' && typeof document === 'object' && document.nodeType === 9);
|
202 | }
|
203 | catch (e) {
|
204 |
|
205 | }
|
206 |
|
207 |
|