UNPKG

1.97 kBJavaScriptView Raw
1import { identity } from '../util/identity';
2import { wrapWithAbort } from './operators/withabort';
3// eslint-disable-next-line @typescript-eslint/no-empty-function
4const NEVER_PROMISE = new Promise(() => { });
5function wrapPromiseWithIndex(promise, index) {
6 return promise.then((value) => ({ value, index }));
7}
8/**
9 * Runs all specified async-iterable sequences in parallel and collects their last elements.
10 *
11 * @export
12 * @template T The type of the elements in the source sequences.
13 * @param {...any[]} sources Async-iterable sequence to collect the last elements for.
14 * @returns {(Promise<T[] | undefined>)} An async-iterable sequence with an array of all the last elements of all sequences.
15 */
16export async function forkJoin(...sources) {
17 let signal = sources.shift();
18 if (!(signal instanceof AbortSignal)) {
19 sources.unshift(signal);
20 signal = undefined;
21 }
22 const length = sources.length;
23 const iterators = new Array(length);
24 const nexts = new Array(length);
25 let active = length;
26 const values = new Array(length);
27 const hasValues = new Array(length);
28 hasValues.fill(false);
29 for (let i = 0; i < length; i++) {
30 const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator]();
31 iterators[i] = iterator;
32 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
33 }
34 while (active > 0) {
35 const next = Promise.race(nexts);
36 const { value: next$, index } = await next;
37 if (next$.done) {
38 nexts[index] = NEVER_PROMISE;
39 active--;
40 }
41 else {
42 const iterator$ = iterators[index];
43 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
44 hasValues[index] = true;
45 values[index] = next$.value;
46 }
47 }
48 if (hasValues.length > 0 && hasValues.every(identity)) {
49 return values;
50 }
51 return undefined;
52}
53
54//# sourceMappingURL=forkjoin.mjs.map