UNPKG

2.04 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { identity } from '../util/identity';
3import { wrapWithAbort } from './operators/withabort';
4import { throwIfAborted } from '../aborterror';
5import { safeRace } from '../util/safeRace';
6// eslint-disable-next-line @typescript-eslint/no-empty-function
7const NEVER_PROMISE = new Promise(() => { });
8function wrapPromiseWithIndex(promise, index) {
9 return promise.then((value) => ({ value, index }));
10}
11export class CombineLatestAsyncIterable extends AsyncIterableX {
12 constructor(sources) {
13 super();
14 this._sources = sources;
15 }
16 async *[Symbol.asyncIterator](signal) {
17 throwIfAborted(signal);
18 const length = this._sources.length;
19 const iterators = new Array(length);
20 const nexts = new Array(length);
21 let hasValueAll = false;
22 const values = new Array(length);
23 const hasValues = new Array(length);
24 let active = length;
25 hasValues.fill(false);
26 for (let i = 0; i < length; i++) {
27 const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator]();
28 iterators[i] = iterator;
29 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
30 }
31 while (active > 0) {
32 const next = safeRace(nexts);
33 const { value: { value: value$, done: done$ }, index, } = await next;
34 if (done$) {
35 nexts[index] = NEVER_PROMISE;
36 active--;
37 }
38 else {
39 values[index] = value$;
40 hasValues[index] = true;
41 const iterator$ = iterators[index];
42 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
43 if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
44 yield values;
45 }
46 }
47 }
48 }
49}
50export function combineLatest(...sources) {
51 return new CombineLatestAsyncIterable(sources);
52}
53
54//# sourceMappingURL=combinelatest.mjs.map