UNPKG

1.6 kBJavaScriptView Raw
1import { AsyncIterableX } from './asynciterablex';
2import { wrapWithAbort } from './operators/withabort';
3import { throwIfAborted } from '../aborterror';
4// eslint-disable-next-line @typescript-eslint/no-empty-function
5const NEVER_PROMISE = new Promise(() => { });
6function wrapPromiseWithIndex(promise, index) {
7 return promise.then((value) => ({ value, index }));
8}
9export class MergeAsyncIterable extends AsyncIterableX {
10 constructor(source) {
11 super();
12 this._source = source;
13 }
14 async *[Symbol.asyncIterator](signal) {
15 throwIfAborted(signal);
16 const length = this._source.length;
17 const iterators = new Array(length);
18 const nexts = new Array(length);
19 let active = length;
20 for (let i = 0; i < length; i++) {
21 const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator]();
22 iterators[i] = iterator;
23 nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
24 }
25 while (active > 0) {
26 const next = Promise.race(nexts);
27 const { value: { done: done$, value: value$ }, index, } = await next;
28 if (done$) {
29 nexts[index] = NEVER_PROMISE;
30 active--;
31 }
32 else {
33 const iterator$ = iterators[index];
34 nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
35 yield value$;
36 }
37 }
38 }
39}
40export function merge(source, ...args) {
41 return new MergeAsyncIterable([source, ...args]);
42}
43
44//# sourceMappingURL=merge.mjs.map