UNPKG

1.94 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { wrapWithAbort } from './operators/withabort';
3import { throwIfAborted } from '../aborterror';
4import { safeRace } from '../util/safeRace';
5function wrapPromiseWithIndex(promise, index) {
6 return promise.then((value) => ({ value, index }));
7}
8export class RaceAsyncIterable extends AsyncIterableX {
9 constructor(sources) {
10 super();
11 this._sources = sources;
12 }
13 async *[Symbol.asyncIterator](signal) {
14 throwIfAborted(signal);
15 const sources = this._sources;
16 const length = sources.length;
17 const iterators = new Array(length);
18 const nexts = new Array(length);
19 for (let i = 0; i < length; i++) {
20 const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator]();
21 iterators[i] = iterator;
22 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
23 }
24 const next = safeRace(nexts);
25 const { value: next$, index } = await next;
26 if (!next$.done) {
27 yield next$.value;
28 }
29 const iterator$ = iterators[index];
30 // Cancel/finish other iterators
31 for (let i = 0; i < length; i++) {
32 if (i === index) {
33 continue;
34 }
35 const otherIterator = iterators[i];
36 if (otherIterator.return) {
37 otherIterator.return();
38 }
39 }
40 let nextItem;
41 while (!(nextItem = await iterator$.next()).done) {
42 yield nextItem.value;
43 }
44 }
45}
46/**
47 * Propagates the async sequence that reacts first.
48 *
49 * @export
50 * @param {...AsyncIterable<T>[]} sources The source sequences.
51 * @return {AsyncIterable<T>} An async sequence that surfaces either of the given sequences, whichever reacted first.
52 */
53export function race(...sources) {
54 return new RaceAsyncIterable(sources);
55}
56
57//# sourceMappingURL=race.mjs.map