UNPKG

1.64 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { wrapWithAbort } from './operators/withabort';
3import { throwIfAborted } from '../aborterror';
4import { safeRace } from '../util/safeRace';
5// eslint-disable-next-line @typescript-eslint/no-empty-function
6const NEVER_PROMISE = new Promise(() => { });
7function wrapPromiseWithIndex(promise, index) {
8 return promise.then((value) => ({ value, index }));
9}
10export class MergeAsyncIterable extends AsyncIterableX {
11 constructor(source) {
12 super();
13 this._source = source;
14 }
15 async *[Symbol.asyncIterator](signal) {
16 throwIfAborted(signal);
17 const length = this._source.length;
18 const iterators = new Array(length);
19 const nexts = new Array(length);
20 let active = length;
21 for (let i = 0; i < length; i++) {
22 const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator]();
23 iterators[i] = iterator;
24 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
25 }
26 while (active > 0) {
27 const next = safeRace(nexts);
28 const { value: { done: done$, value: value$ }, index, } = await next;
29 if (done$) {
30 nexts[index] = NEVER_PROMISE;
31 active--;
32 }
33 else {
34 const iterator$ = iterators[index];
35 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
36 yield value$;
37 }
38 }
39 }
40}
41export function merge(source, ...args) {
42 return new MergeAsyncIterable([source, ...args]);
43}
44
45//# sourceMappingURL=merge.mjs.map