UNPKG

2.05 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { identity, identityAsync } from '../internal/identity';
3// tslint:disable-next-line:no-empty
4const NEVER_PROMISE = new Promise(() => { });
5function wrapPromiseWithIndex(promise, index) {
6 return promise.then(value => ({ value, index }));
7}
8export class CombineLatestAsyncIterable extends AsyncIterableX {
9 constructor(sources, fn) {
10 super();
11 this._sources = sources;
12 this._fn = fn;
13 }
14 async *[Symbol.asyncIterator]() {
15 const fn = this._fn;
16 const length = this._sources.length;
17 const iterators = new Array(length);
18 const nexts = new Array(length);
19 let hasValueAll = false;
20 const values = new Array(length);
21 const hasValues = new Array(length);
22 let active = length;
23 hasValues.fill(false);
24 for (let i = 0; i < length; i++) {
25 const iterator = this._sources[i][Symbol.asyncIterator]();
26 iterators[i] = iterator;
27 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
28 }
29 while (active > 0) {
30 const next = Promise.race(nexts);
31 const { value: next$, index } = await next;
32 if (next$.done) {
33 nexts[index] = NEVER_PROMISE;
34 active--;
35 }
36 else {
37 values[index] = next$.value;
38 hasValues[index] = true;
39 const iterator$ = iterators[index];
40 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
41 if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
42 yield await fn(values);
43 }
44 }
45 }
46 }
47}
48/* tslint:enable:max-line-length */
49export function combineLatest(...sources) {
50 let fn = sources.shift();
51 if (typeof fn !== 'function') {
52 sources.unshift(fn);
53 fn = identityAsync;
54 }
55 return new CombineLatestAsyncIterable(sources, fn);
56}
57
58//# sourceMappingURL=combinelatest.mjs.map