UNPKG

2.01 kBJavaScriptView Raw
1import { identity } from '../util/identity';
2import { wrapWithAbort } from './operators/withabort';
3import { safeRace } from '../util/safeRace';
4// eslint-disable-next-line @typescript-eslint/no-empty-function
5const NEVER_PROMISE = new Promise(() => { });
6function wrapPromiseWithIndex(promise, index) {
7 return promise.then((value) => ({ value, index }));
8}
9/**
10 * Runs all specified async-iterable sequences in parallel and collects their last elements.
11 *
12 * @export
13 * @template T The type of the elements in the source sequences.
14 * @param {...any[]} sources Async-iterable sequence to collect the last elements for.
15 * @returns {(Promise<T[] | undefined>)} An async-iterable sequence with an array of all the last elements of all sequences.
16 */
17export async function forkJoin(...sources) {
18 let signal = sources.shift();
19 if (!(signal instanceof AbortSignal)) {
20 sources.unshift(signal);
21 signal = undefined;
22 }
23 const length = sources.length;
24 const iterators = new Array(length);
25 const nexts = new Array(length);
26 let active = length;
27 const values = new Array(length);
28 const hasValues = new Array(length);
29 hasValues.fill(false);
30 for (let i = 0; i < length; i++) {
31 const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator]();
32 iterators[i] = iterator;
33 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
34 }
35 while (active > 0) {
36 const next = safeRace(nexts);
37 const { value: next$, index } = await next;
38 if (next$.done) {
39 nexts[index] = NEVER_PROMISE;
40 active--;
41 }
42 else {
43 const iterator$ = iterators[index];
44 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
45 hasValues[index] = true;
46 values[index] = next$.value;
47 }
48 }
49 if (hasValues.length > 0 && hasValues.every(identity)) {
50 return values;
51 }
52 return undefined;
53}
54
55//# sourceMappingURL=forkjoin.mjs.map