UNPKG

2 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { identity } from '../util/identity';
3import { wrapWithAbort } from './operators/withabort';
4import { throwIfAborted } from '../aborterror';
5// eslint-disable-next-line @typescript-eslint/no-empty-function
6const NEVER_PROMISE = new Promise(() => { });
7function wrapPromiseWithIndex(promise, index) {
8 return promise.then((value) => ({ value, index }));
9}
10export class CombineLatestAsyncIterable extends AsyncIterableX {
11 constructor(sources) {
12 super();
13 this._sources = sources;
14 }
15 async *[Symbol.asyncIterator](signal) {
16 throwIfAborted(signal);
17 const length = this._sources.length;
18 const iterators = new Array(length);
19 const nexts = new Array(length);
20 let hasValueAll = false;
21 const values = new Array(length);
22 const hasValues = new Array(length);
23 let active = length;
24 hasValues.fill(false);
25 for (let i = 0; i < length; i++) {
26 const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator]();
27 iterators[i] = iterator;
28 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
29 }
30 while (active > 0) {
31 const next = Promise.race(nexts);
32 const { value: { value: value$, done: done$ }, index, } = await next;
33 if (done$) {
34 nexts[index] = NEVER_PROMISE;
35 active--;
36 }
37 else {
38 values[index] = value$;
39 hasValues[index] = true;
40 const iterator$ = iterators[index];
41 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
42 if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
43 yield values;
44 }
45 }
46 }
47 }
48}
49export function combineLatest(...sources) {
50 return new CombineLatestAsyncIterable(sources);
51}
52
53//# sourceMappingURL=combinelatest.mjs.map