UNPKG

6.38 kBJavaScriptView Raw
1import { AsyncSink } from './../asyncsink';
2import { bindCallback } from '../internal/bindcallback';
3import { identityAsync } from '../internal/identity';
4import { toLength } from '../internal/tolength';
5import { isArrayLike, isIterable, isAsyncIterable, isReadableNodeStream, isWritableNodeStream } from '../internal/isiterable';
6/**
7 * This class serves as the base for all operations which support [Symbol.asyncIterator].
8 */
9export class AsyncIterableX {
10 async forEach(projection, thisArg) {
11 const fn = bindCallback(projection, thisArg, 2);
12 let i = 0;
13 for await (let item of this) {
14 await fn(item, i++);
15 }
16 }
17 pipe(...args) {
18 let i = -1;
19 let n = args.length;
20 let acc = this;
21 let as = AsyncIterableX.as;
22 while (++i < n) {
23 acc = as(args[i](acc));
24 }
25 return acc;
26 }
27 tee() {
28 return this._getDOMStream().tee();
29 }
30 pipeTo(writable, options) {
31 return this._getDOMStream().pipeTo(writable, options);
32 }
33 pipeThrough(duplex, options) {
34 return this._getDOMStream().pipeThrough(duplex, options);
35 }
36 _getDOMStream() {
37 return this._DOMStream || (this._DOMStream = this.publish().toDOMStream());
38 }
39 /** @nocollapse */
40 static as(source) {
41 /* tslint:disable */
42 if (source instanceof AsyncIterableX) {
43 return source;
44 }
45 if (typeof source === 'string') {
46 return new OfAsyncIterable([source]);
47 }
48 if (isIterable(source) || isAsyncIterable(source)) {
49 return new FromAsyncIterable(source, identityAsync);
50 }
51 if (isPromise(source)) {
52 return new FromPromiseIterable(source, identityAsync);
53 }
54 if (isObservable(source)) {
55 return new FromObservableAsyncIterable(source, identityAsync);
56 }
57 if (isArrayLike(source)) {
58 return new FromArrayIterable(source, identityAsync);
59 }
60 return new OfAsyncIterable([source]);
61 /* tslint:enable */
62 }
63 /** @nocollapse */
64 static from(source, selector = identityAsync, thisArg) {
65 const fn = bindCallback(selector, thisArg, 2);
66 /* tslint:disable */
67 if (isIterable(source) || isAsyncIterable(source)) {
68 return new FromAsyncIterable(source, fn);
69 }
70 if (isPromise(source)) {
71 return new FromPromiseIterable(source, fn);
72 }
73 if (isObservable(source)) {
74 return new FromObservableAsyncIterable(source, fn);
75 }
76 if (isArrayLike(source)) {
77 return new FromArrayIterable(source, fn);
78 }
79 throw new TypeError('Input type not supported');
80 /* tslint:enable */
81 }
82 /** @nocollapse */
83 static of(...args) {
84 //tslint:disable-next-line
85 return new OfAsyncIterable(args);
86 }
87}
88class FromArrayIterable extends AsyncIterableX {
89 constructor(source, selector) {
90 super();
91 this._source = source;
92 this._selector = selector;
93 }
94 async *[Symbol.asyncIterator]() {
95 let i = 0;
96 const length = toLength(this._source.length);
97 while (i < length) {
98 yield await this._selector(this._source[i], i++);
99 }
100 }
101}
102class FromAsyncIterable extends AsyncIterableX {
103 constructor(source, selector) {
104 super();
105 this._source = source;
106 this._selector = selector;
107 }
108 async *[Symbol.asyncIterator]() {
109 let i = 0;
110 for await (let item of this._source) {
111 yield await this._selector(item, i++);
112 }
113 }
114}
115class FromPromiseIterable extends AsyncIterableX {
116 constructor(source, selector) {
117 super();
118 this._source = source;
119 this._selector = selector;
120 }
121 async *[Symbol.asyncIterator]() {
122 const item = await this._source;
123 yield await this._selector(item, 0);
124 }
125}
126class FromObservableAsyncIterable extends AsyncIterableX {
127 constructor(observable, selector) {
128 super();
129 this._observable = observable;
130 this._selector = selector;
131 }
132 async *[Symbol.asyncIterator]() {
133 const sink = new AsyncSink();
134 const subscription = this._observable.subscribe({
135 next(value) {
136 sink.write(value);
137 },
138 error(err) {
139 sink.error(err);
140 },
141 complete() {
142 sink.end();
143 }
144 });
145 let i = 0;
146 try {
147 for (let next; !(next = await sink.next()).done;) {
148 yield await this._selector(next.value, i++);
149 }
150 }
151 finally {
152 subscription.unsubscribe();
153 }
154 }
155}
156function isPromise(x) {
157 return x != null && Object(x) === x && typeof x['then'] === 'function';
158}
159function isObservable(x) {
160 return x != null && Object(x) === x && typeof x['subscribe'] === 'function';
161}
162class OfAsyncIterable extends AsyncIterableX {
163 constructor(args) {
164 super();
165 this._args = args;
166 }
167 async *[Symbol.asyncIterator]() {
168 for (let item of this._args) {
169 yield item;
170 }
171 }
172}
173try {
174 (isBrowser => {
175 if (isBrowser) {
176 return;
177 }
178 const as = AsyncIterableX.as;
179 AsyncIterableX.prototype.pipe = nodePipe;
180 const readableOpts = (x, opts = x._writableState || { objectMode: true }) => opts;
181 function nodePipe(...args) {
182 let i = -1;
183 let end;
184 let n = args.length;
185 let prev = this;
186 let next;
187 while (++i < n) {
188 next = args[i];
189 if (typeof next === 'function') {
190 prev = as(next(prev));
191 }
192 else if (isWritableNodeStream(next)) {
193 ({ end = true } = args[i + 1] || {});
194 // prettier-ignore
195 return isReadableNodeStream(prev) ? prev.pipe(next, { end }) :
196 prev.toNodeStream(readableOpts(next)).pipe(next, { end });
197 }
198 }
199 return prev;
200 }
201 })(typeof window === 'object' && typeof document === 'object' && document.nodeType === 9);
202}
203catch (e) {
204 /* */
205}
206
207//# sourceMappingURL=asynciterablex.mjs.map