UNPKG

28.9 kBJavaScriptView Raw
1async function* _batch(size, iterable) {
2 let dataBatch = [];
3 for await (const data of iterable) {
4 dataBatch.push(data);
5 if (dataBatch.length === size) {
6 yield dataBatch;
7 dataBatch = [];
8 }
9 }
10 if (dataBatch.length > 0) {
11 yield dataBatch;
12 }
13}
14function* _syncBatch(size, iterable) {
15 let dataBatch = [];
16 for (const data of iterable) {
17 dataBatch.push(data);
18 if (dataBatch.length === size) {
19 yield dataBatch;
20 dataBatch = [];
21 }
22 }
23 if (dataBatch.length > 0) {
24 yield dataBatch;
25 }
26}
27function batch(size, iterable) {
28 if (iterable === undefined) {
29 return curriedIterable => batch(size, curriedIterable);
30 }
31 if (iterable[Symbol.asyncIterator]) {
32 return _batch(size, iterable);
33 }
34 return _syncBatch(size, iterable);
35}
36
37const TIMEOUT = Symbol('TIMEOUT');
38const createTimer = (duration) => {
39 let timeoutId;
40 return [
41 new Promise(resolve => {
42 timeoutId = setTimeout(() => resolve(TIMEOUT), duration);
43 }),
44 () => {
45 clearTimeout(timeoutId);
46 },
47 ];
48};
49// Like `batch` but flushes early if the `timeout` is reached
50// NOTE: The strategy is to only hold onto a single item for a maximum of `timeout` ms.
51async function* _batchWithTimeout(size, timeout, iterable) {
52 const iterator = iterable[Symbol.asyncIterator]();
53 let pendingData;
54 let batchData = [];
55 let timer;
56 let clearTimer;
57 const startTimer = () => {
58 deleteTimer();
59 [timer, clearTimer] = createTimer(timeout);
60 };
61 const deleteTimer = () => {
62 if (clearTimer) {
63 clearTimer();
64 }
65 timer = undefined;
66 };
67 pendingData = iterator.next();
68 while (true) {
69 const res = await (timer ? Promise.race([pendingData, timer]) : pendingData);
70 if (res === TIMEOUT || res.done) {
71 // Flush early (before we reach the batch size)
72 if (batchData.length) {
73 yield batchData;
74 batchData = [];
75 }
76 deleteTimer();
77 // And exit appropriately
78 if (res !== TIMEOUT) {
79 // done
80 break;
81 }
82 continue;
83 }
84 // Fetch next item early doors (before we potentially yield)
85 pendingData = iterator.next();
86 // Then handle the value
87 batchData.push(res.value);
88 if (batchData.length === 1) {
89 // Start timer once we have at least 1 item ready to go
90 startTimer();
91 }
92 if (batchData.length === size) {
93 yield batchData;
94 batchData = [];
95 deleteTimer();
96 continue;
97 }
98 }
99}
100function batchWithTimeout(size, timeout, iterable) {
101 if (iterable === undefined) {
102 return curriedIterable => batchWithTimeout(size, timeout, curriedIterable);
103 }
104 if (iterable[Symbol.asyncIterator] && timeout !== Infinity) {
105 return _batchWithTimeout(size, timeout, iterable);
106 }
107 // For sync iterables or an infinite timeout, the timeout is irrelevant so just fallback to regular `batch`.
108 return batch(size, iterable);
109}
110
111function getIterator(iterable) {
112 if (typeof iterable.next === 'function') {
113 return iterable;
114 }
115 if (typeof iterable[Symbol.iterator] === 'function') {
116 return iterable[Symbol.iterator]();
117 }
118 if (typeof iterable[Symbol.asyncIterator] === 'function') {
119 return iterable[Symbol.asyncIterator]();
120 }
121 throw new TypeError('"values" does not to conform to any of the iterator or iterable protocols');
122}
123
124function defer() {
125 let reject;
126 let resolve;
127 const promise = new Promise((resolveFunc, rejectFunc) => {
128 resolve = resolveFunc;
129 reject = rejectFunc;
130 });
131 return {
132 promise,
133 reject,
134 resolve,
135 };
136}
137
138function _buffer(size, iterable) {
139 const iterator = getIterator(iterable);
140 const resultQueue = [];
141 const readQueue = [];
142 let reading = false;
143 let ended = false;
144 function fulfillReadQueue() {
145 while (readQueue.length > 0 && resultQueue.length > 0) {
146 const readDeferred = readQueue.shift();
147 const { error, value } = resultQueue.shift();
148 if (error) {
149 readDeferred.reject(error);
150 }
151 else {
152 readDeferred.resolve({ done: false, value });
153 }
154 }
155 while (readQueue.length > 0 && ended) {
156 const { resolve } = readQueue.shift();
157 resolve({ done: true, value: undefined });
158 }
159 }
160 async function fillQueue() {
161 if (ended) {
162 return;
163 }
164 if (reading) {
165 return;
166 }
167 if (resultQueue.length >= size) {
168 return;
169 }
170 reading = true;
171 try {
172 const { done, value } = await iterator.next();
173 if (done) {
174 ended = true;
175 }
176 else {
177 resultQueue.push({ value });
178 }
179 }
180 catch (error) {
181 ended = true;
182 resultQueue.push({ error });
183 }
184 fulfillReadQueue();
185 reading = false;
186 fillQueue();
187 }
188 async function next() {
189 if (resultQueue.length > 0) {
190 const { error, value } = resultQueue.shift();
191 if (error) {
192 throw error;
193 }
194 fillQueue();
195 return { done: false, value };
196 }
197 if (ended) {
198 return { done: true, value: undefined }; // stupid ts
199 }
200 const deferred = defer();
201 readQueue.push(deferred);
202 fillQueue();
203 return deferred.promise;
204 }
205 const asyncIterableIterator = {
206 next,
207 [Symbol.asyncIterator]: () => asyncIterableIterator,
208 };
209 return asyncIterableIterator;
210}
211function* syncBuffer(size, iterable) {
212 const valueQueue = [];
213 let e;
214 try {
215 for (const value of iterable) {
216 valueQueue.push(value);
217 if (valueQueue.length <= size) {
218 continue;
219 }
220 yield valueQueue.shift();
221 }
222 }
223 catch (error) {
224 e = error;
225 }
226 for (const value of valueQueue) {
227 yield value;
228 }
229 if (e) {
230 throw e;
231 }
232}
233function buffer(size, iterable) {
234 if (iterable === undefined) {
235 return curriedIterable => buffer(size, curriedIterable);
236 }
237 if (size === 0) {
238 return iterable;
239 }
240 if (iterable[Symbol.asyncIterator]) {
241 return _buffer(size, iterable);
242 }
243 return syncBuffer(size, iterable);
244}
245
246async function _collect(iterable) {
247 const values = [];
248 for await (const value of iterable) {
249 values.push(value);
250 }
251 return values;
252}
253function collect(iterable) {
254 if (iterable[Symbol.asyncIterator]) {
255 return _collect(iterable);
256 }
257 return Array.from(iterable);
258}
259
260async function* _concat(iterables) {
261 for await (const iterable of iterables) {
262 yield* iterable;
263 }
264}
265function* _syncConcat(iterables) {
266 for (const iterable of iterables) {
267 yield* iterable;
268 }
269}
270function concat(...iterables) {
271 const hasAnyAsync = iterables.find(itr => itr[Symbol.asyncIterator] !== undefined);
272 if (hasAnyAsync) {
273 return _concat(iterables);
274 }
275 else {
276 return _syncConcat(iterables);
277 }
278}
279
280async function _consume(iterable) {
281 for await (const _val of iterable) {
282 // do nothing
283 }
284}
285function consume(iterable) {
286 if (iterable[Symbol.asyncIterator]) {
287 return _consume(iterable);
288 }
289 for (const _val of iterable) {
290 // do nothing
291 }
292}
293
294async function* _drop(count, iterable) {
295 let skipped = 0;
296 for await (const val of iterable) {
297 if (skipped < count) {
298 skipped++;
299 continue;
300 }
301 yield await val;
302 }
303}
304function* _syncDrop(count, iterable) {
305 let skipped = 0;
306 for (const val of iterable) {
307 if (skipped < count) {
308 skipped++;
309 continue;
310 }
311 yield val;
312 }
313}
314function drop(count, iterable) {
315 if (iterable === undefined) {
316 return curriedIterable => drop(count, curriedIterable);
317 }
318 if (iterable[Symbol.asyncIterator]) {
319 return _drop(count, iterable);
320 }
321 return _syncDrop(count, iterable);
322}
323
324async function* _filter(filterFunc, iterable) {
325 for await (const data of iterable) {
326 if (await filterFunc(data)) {
327 yield data;
328 }
329 }
330}
331function filter(filterFunc, iterable) {
332 if (iterable === undefined) {
333 return (curriedIterable) => _filter(filterFunc, curriedIterable);
334 }
335 return _filter(filterFunc, iterable);
336}
337
338/**
339 * Returns a new iterator by pulling every item out of `iterable` (and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists. If the value is a string it is not iterated as that ends up in an infinite loop. Errors from the source `iterable` are raised immediately.
340
341*note*: Typescript doesn't have recursive types but you can nest iterables as deep as you like.
342
343```ts
344import { flatten } from 'streaming-iterables'
345
346for await (const item of flatten([1, 2, [3, [4, 5], 6])) {
347 console.log(item)
348}
349// 1
350// 2
351// 3
352// 4
353// 5
354// 6
355```
356 */
357async function* flatten(iterable) {
358 for await (const maybeItr of iterable) {
359 if (maybeItr && typeof maybeItr !== 'string' && (maybeItr[Symbol.iterator] || maybeItr[Symbol.asyncIterator])) {
360 yield* flatten(maybeItr);
361 }
362 else {
363 yield maybeItr;
364 }
365 }
366}
367
368async function* _map(func, iterable) {
369 for await (const val of iterable) {
370 yield await func(val);
371 }
372}
373function map(func, iterable) {
374 if (iterable === undefined) {
375 return curriedIterable => _map(func, curriedIterable);
376 }
377 return _map(func, iterable);
378}
379
380function flatMap(func, iterable) {
381 if (iterable === undefined) {
382 return curriedIterable => flatMap(func, curriedIterable);
383 }
384 return filter(i => i !== undefined && i !== null, flatten(map(func, iterable)));
385}
386
387function _flatTransform(concurrency, func, iterable) {
388 const iterator = getIterator(iterable);
389 const resultQueue = [];
390 const readQueue = [];
391 let ended = false;
392 let reading = false;
393 let inflightCount = 0;
394 let lastError = null;
395 function fulfillReadQueue() {
396 while (readQueue.length > 0 && resultQueue.length > 0) {
397 const { resolve } = readQueue.shift();
398 const value = resultQueue.shift();
399 resolve({ done: false, value });
400 }
401 while (readQueue.length > 0 && inflightCount === 0 && ended) {
402 const { resolve, reject } = readQueue.shift();
403 if (lastError) {
404 reject(lastError);
405 lastError = null;
406 }
407 else {
408 resolve({ done: true, value: undefined });
409 }
410 }
411 }
412 async function fillQueue() {
413 if (ended) {
414 fulfillReadQueue();
415 return;
416 }
417 if (reading) {
418 return;
419 }
420 if (inflightCount + resultQueue.length >= concurrency) {
421 return;
422 }
423 reading = true;
424 inflightCount++;
425 try {
426 const { done, value } = await iterator.next();
427 if (done) {
428 ended = true;
429 inflightCount--;
430 fulfillReadQueue();
431 }
432 else {
433 mapAndQueue(value);
434 }
435 }
436 catch (error) {
437 ended = true;
438 inflightCount--;
439 lastError = error;
440 fulfillReadQueue();
441 }
442 reading = false;
443 fillQueue();
444 }
445 async function mapAndQueue(itrValue) {
446 try {
447 const value = await func(itrValue);
448 if (value && value[Symbol.asyncIterator]) {
449 for await (const asyncVal of value) {
450 resultQueue.push(asyncVal);
451 }
452 }
453 else {
454 resultQueue.push(value);
455 }
456 }
457 catch (error) {
458 ended = true;
459 lastError = error;
460 }
461 inflightCount--;
462 fulfillReadQueue();
463 fillQueue();
464 }
465 async function next() {
466 if (resultQueue.length === 0) {
467 const deferred = defer();
468 readQueue.push(deferred);
469 fillQueue();
470 return deferred.promise;
471 }
472 const value = resultQueue.shift();
473 fillQueue();
474 return { done: false, value };
475 }
476 const asyncIterableIterator = {
477 next,
478 [Symbol.asyncIterator]: () => asyncIterableIterator,
479 };
480 return asyncIterableIterator;
481}
482function flatTransform(concurrency, func, iterable) {
483 if (func === undefined) {
484 return (curriedFunc, curriedIterable) => curriedIterable
485 ? flatTransform(concurrency, curriedFunc, curriedIterable)
486 : flatTransform(concurrency, curriedFunc);
487 }
488 if (iterable === undefined) {
489 return (curriedIterable) => flatTransform(concurrency, func, curriedIterable);
490 }
491 return filter(i => i !== undefined && i !== null, flatten(_flatTransform(concurrency, func, iterable)));
492}
493
494async function onceReadable(stream) {
495 return new Promise(resolve => {
496 stream.once('readable', () => {
497 resolve();
498 });
499 });
500}
501async function* _fromStream(stream) {
502 while (true) {
503 const data = stream.read();
504 if (data !== null) {
505 yield data;
506 continue;
507 }
508 if (stream._readableState.ended) {
509 break;
510 }
511 await onceReadable(stream);
512 }
513}
514/**
515 * Wraps the stream in an async iterator or returns the stream if it already is an async iterator.
516
517*note*: Since Node 10, streams already async iterators. This function may be used to ensure compatibility with older versions of Node.
518
519```ts
520import { fromStream } from 'streaming-iterables'
521import { createReadStream } from 'fs'
522
523const pokeLog = fromStream(createReadStream('./pokedex-operating-system.log'))
524
525for await (const pokeData of pokeLog) {
526 console.log(pokeData) // Buffer(...)
527}
528```
529 * @deprecated This method is deprecated since, node 10 is out of LTS. It may be removed in an upcoming major release.
530 */
531function fromStream(stream) {
532 if (typeof stream[Symbol.asyncIterator] === 'function') {
533 return stream;
534 }
535 return _fromStream(stream);
536}
537
538/**
539 * Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted. If you care less about order and want them faster see `parallelMerge()`.
540 */
541async function* merge(...iterables) {
542 const sources = new Set(iterables.map(getIterator));
543 while (sources.size > 0) {
544 for (const iterator of sources) {
545 const nextVal = await iterator.next();
546 if (nextVal.done) {
547 sources.delete(iterator);
548 }
549 else {
550 yield nextVal.value;
551 }
552 }
553 }
554}
555
556function pipeline(firstFn, ...fns) {
557 let previousFn = firstFn();
558 for (const func of fns) {
559 previousFn = func(previousFn);
560 }
561 return previousFn;
562}
563
564async function* _parallelMap(concurrency, func, iterable) {
565 let transformError = null;
566 const wrapFunc = value => ({
567 value: func(value),
568 });
569 const stopOnError = async function* (source) {
570 for await (const value of source) {
571 if (transformError) {
572 return;
573 }
574 yield value;
575 }
576 };
577 const output = pipeline(() => iterable, buffer(1), stopOnError, map(wrapFunc), buffer(concurrency - 1));
578 const itr = getIterator(output);
579 while (true) {
580 const { value, done } = await itr.next();
581 if (done) {
582 break;
583 }
584 try {
585 const val = await value.value;
586 if (!transformError) {
587 yield val;
588 }
589 }
590 catch (error) {
591 transformError = error;
592 }
593 }
594 if (transformError) {
595 throw transformError;
596 }
597}
598function parallelMap(concurrency, func, iterable) {
599 if (func === undefined) {
600 return (curriedFunc, curriedIterable) => parallelMap(concurrency, curriedFunc, curriedIterable);
601 }
602 if (iterable === undefined) {
603 return curriedIterable => parallelMap(concurrency, func, curriedIterable);
604 }
605 if (concurrency === 1) {
606 return map(func, iterable);
607 }
608 return _parallelMap(concurrency, func, iterable);
609}
610
611function parallelFlatMap(concurrency, func, iterable) {
612 if (func === undefined) {
613 return (curriedFunc, curriedIterable) => curriedIterable
614 ? parallelFlatMap(concurrency, curriedFunc, curriedIterable)
615 : parallelFlatMap(concurrency, curriedFunc);
616 }
617 if (iterable === undefined) {
618 return (curriedIterable) => parallelFlatMap(concurrency, func, curriedIterable);
619 }
620 return filter(i => i !== undefined && i !== null, flatten(parallelMap(concurrency, func, iterable)));
621}
622
623/**
624 *Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of a collection of iterables as soon as they're available. Errors `iterables` are raised immediately.
625
626```ts
627import { parallelMerge } from 'streaming-iterables'
628import { getPokemon, getTransformer } from 'iterable-pokedex'
629
630// pokemon are much faster to load btw
631const heros = parallelMerge(getPokemon(), getTransformer())
632for await (const hero of heros) {
633 console.log(hero)
634}
635// charmander
636// bulbasaur
637// megatron
638// pikachu
639// eevee
640// bumblebee
641// jazz
642```
643 */
644async function* parallelMerge(...iterables) {
645 const inputs = iterables.map(getIterator);
646 const concurrentWork = new Set();
647 const values = new Map();
648 let lastError = null;
649 let errCb = null;
650 let valueCb = null;
651 const notifyError = err => {
652 lastError = err;
653 if (errCb) {
654 errCb(err);
655 }
656 };
657 const notifyDone = value => {
658 if (valueCb) {
659 valueCb(value);
660 }
661 };
662 const waitForQueue = () => new Promise((resolve, reject) => {
663 if (lastError) {
664 reject(lastError);
665 }
666 if (values.size > 0) {
667 return resolve();
668 }
669 valueCb = resolve;
670 errCb = reject;
671 });
672 const queueNext = input => {
673 const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {
674 if (!done) {
675 values.set(input, value);
676 }
677 concurrentWork.delete(nextVal);
678 });
679 concurrentWork.add(nextVal);
680 nextVal.then(notifyDone, notifyError);
681 };
682 for (const input of inputs) {
683 queueNext(input);
684 }
685 while (true) {
686 // We technically don't have to check `values.size` as the for loop should have emptied it
687 // However I haven't yet found specs verifying that behavior, only tests
688 // the guard in waitForQueue() checking for values is in place for the same reason
689 if (concurrentWork.size === 0 && values.size === 0) {
690 return;
691 }
692 await waitForQueue();
693 for (const [input, value] of values) {
694 values.delete(input);
695 yield value;
696 queueNext(input);
697 }
698 }
699}
700
701async function _reduce(func, start, iterable) {
702 let value = start;
703 for await (const nextItem of iterable) {
704 value = await func(value, nextItem);
705 }
706 return value;
707}
708function reduce(func, start, iterable) {
709 if (start === undefined) {
710 return (curriedStart, curriedIterable) => curriedIterable ? _reduce(func, curriedStart, curriedIterable) : reduce(func, curriedStart);
711 }
712 if (iterable === undefined) {
713 return (curriedIterable) => reduce(func, start, curriedIterable);
714 }
715 return _reduce(func, start, iterable);
716}
717
718async function* _take(count, iterable) {
719 let taken = 0;
720 for await (const val of iterable) {
721 yield await val;
722 taken++;
723 if (taken >= count) {
724 break;
725 }
726 }
727}
728function* _syncTake(count, iterable) {
729 let taken = 0;
730 for (const val of iterable) {
731 yield val;
732 taken++;
733 if (taken >= count) {
734 break;
735 }
736 }
737}
738function take(count, iterable) {
739 if (iterable === undefined) {
740 return curriedIterable => take(count, curriedIterable);
741 }
742 if (iterable[Symbol.asyncIterator]) {
743 return _take(count, iterable);
744 }
745 return _syncTake(count, iterable);
746}
747
748async function* _asyncTap(func, iterable) {
749 for await (const val of iterable) {
750 await func(val);
751 yield val;
752 }
753}
754function tap(func, iterable) {
755 if (iterable === undefined) {
756 return (curriedIterable) => _asyncTap(func, curriedIterable);
757 }
758 return _asyncTap(func, iterable);
759}
760
761const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
762function _throttle(limit, interval, iterable) {
763 if (!Number.isFinite(limit)) {
764 throw new TypeError('Expected `limit` to be a finite number');
765 }
766 if (limit <= 0) {
767 throw new TypeError('Expected `limit` to be greater than 0');
768 }
769 if (!Number.isFinite(interval)) {
770 throw new TypeError('Expected `interval` to be a finite number');
771 }
772 return (async function* __throttle() {
773 let sent = 0;
774 let time;
775 for await (const val of iterable) {
776 if (sent < limit) {
777 if (typeof time === 'undefined') {
778 time = Date.now();
779 }
780 sent++;
781 yield val;
782 continue;
783 }
784 // Only wait if the interval hasn't already passed while we were
785 // yielding the previous values.
786 const elapsedMs = Date.now() - (time || 0);
787 const waitFor = interval - elapsedMs;
788 if (waitFor > 0) {
789 await sleep(waitFor);
790 }
791 time = Date.now();
792 sent = 1;
793 yield val;
794 }
795 })();
796}
797function throttle(limit, interval, iterable) {
798 if (iterable === undefined) {
799 return (curriedIterable) => _throttle(limit, interval, curriedIterable);
800 }
801 return _throttle(limit, interval, iterable);
802}
803
804function addTime(a, b) {
805 let seconds = a[0] + b[0];
806 let nanoseconds = a[1] + b[1];
807 if (nanoseconds >= 1000000000) {
808 const remainder = nanoseconds % 1000000000;
809 seconds += (nanoseconds - remainder) / 1000000000;
810 nanoseconds = remainder;
811 }
812 return [seconds, nanoseconds];
813}
814async function* _asyncTime(config, iterable) {
815 const itr = iterable[Symbol.asyncIterator]();
816 let total = [0, 0];
817 while (true) {
818 const start = process.hrtime();
819 const { value, done } = await itr.next();
820 const delta = process.hrtime(start);
821 total = addTime(total, delta);
822 if (config.progress) {
823 config.progress(delta, total);
824 }
825 if (done) {
826 if (config.total) {
827 config.total(total);
828 }
829 return value;
830 }
831 yield value;
832 }
833}
834function* _syncTime(config, iterable) {
835 const itr = iterable[Symbol.iterator]();
836 let total = [0, 0];
837 while (true) {
838 const start = process.hrtime();
839 const { value, done } = itr.next();
840 const delta = process.hrtime(start);
841 total = addTime(total, delta);
842 if (config.progress) {
843 config.progress(delta, total);
844 }
845 if (done) {
846 if (config.total) {
847 config.total(total);
848 }
849 return value;
850 }
851 yield value;
852 }
853}
854function time(config = {}, iterable) {
855 if (iterable === undefined) {
856 return curriedIterable => time(config, curriedIterable);
857 }
858 if (iterable[Symbol.asyncIterator] !== undefined) {
859 return _asyncTime(config, iterable);
860 }
861 else {
862 return _syncTime(config, iterable);
863 }
864}
865
866function _transform(concurrency, func, iterable) {
867 const iterator = getIterator(iterable);
868 const resultQueue = [];
869 const readQueue = [];
870 let ended = false;
871 let reading = false;
872 let inflightCount = 0;
873 let lastError = null;
874 function fulfillReadQueue() {
875 while (readQueue.length > 0 && resultQueue.length > 0) {
876 const { resolve } = readQueue.shift();
877 const value = resultQueue.shift();
878 resolve({ done: false, value });
879 }
880 while (readQueue.length > 0 && inflightCount === 0 && ended) {
881 const { resolve, reject } = readQueue.shift();
882 if (lastError) {
883 reject(lastError);
884 lastError = null;
885 }
886 else {
887 resolve({ done: true, value: undefined });
888 }
889 }
890 }
891 async function fillQueue() {
892 if (ended) {
893 fulfillReadQueue();
894 return;
895 }
896 if (reading) {
897 return;
898 }
899 if (inflightCount + resultQueue.length >= concurrency) {
900 return;
901 }
902 reading = true;
903 inflightCount++;
904 try {
905 const { done, value } = await iterator.next();
906 if (done) {
907 ended = true;
908 inflightCount--;
909 fulfillReadQueue();
910 }
911 else {
912 mapAndQueue(value);
913 }
914 }
915 catch (error) {
916 ended = true;
917 inflightCount--;
918 lastError = error;
919 fulfillReadQueue();
920 }
921 reading = false;
922 fillQueue();
923 }
924 async function mapAndQueue(itrValue) {
925 try {
926 const value = await func(itrValue);
927 resultQueue.push(value);
928 }
929 catch (error) {
930 ended = true;
931 lastError = error;
932 }
933 inflightCount--;
934 fulfillReadQueue();
935 fillQueue();
936 }
937 async function next() {
938 if (resultQueue.length === 0) {
939 const deferred = defer();
940 readQueue.push(deferred);
941 fillQueue();
942 return deferred.promise;
943 }
944 const value = resultQueue.shift();
945 fillQueue();
946 return { done: false, value };
947 }
948 const asyncIterableIterator = {
949 next,
950 [Symbol.asyncIterator]: () => asyncIterableIterator,
951 };
952 return asyncIterableIterator;
953}
954function transform(concurrency, func, iterable) {
955 if (func === undefined) {
956 return (curriedFunc, curriedIterable) => curriedIterable
957 ? transform(concurrency, curriedFunc, curriedIterable)
958 : transform(concurrency, curriedFunc);
959 }
960 if (iterable === undefined) {
961 return (curriedIterable) => transform(concurrency, func, curriedIterable);
962 }
963 return _transform(concurrency, func, iterable);
964}
965
966async function _writeToStream(stream, iterable) {
967 let lastError = null;
968 let errCb = null;
969 let drainCb = null;
970 const notifyError = err => {
971 lastError = err;
972 if (errCb) {
973 errCb(err);
974 }
975 };
976 const notifyDrain = () => {
977 if (drainCb) {
978 drainCb();
979 }
980 };
981 const cleanup = () => {
982 stream.removeListener('error', notifyError);
983 stream.removeListener('drain', notifyDrain);
984 };
985 stream.once('error', notifyError);
986 const waitForDrain = () => new Promise((resolve, reject) => {
987 if (lastError) {
988 return reject(lastError);
989 }
990 stream.once('drain', notifyDrain);
991 drainCb = resolve;
992 errCb = reject;
993 });
994 for await (const value of iterable) {
995 if (stream.write(value) === false) {
996 await waitForDrain();
997 }
998 if (lastError) {
999 break;
1000 }
1001 }
1002 cleanup();
1003 if (lastError) {
1004 throw lastError;
1005 }
1006}
1007function writeToStream(stream, iterable) {
1008 if (iterable === undefined) {
1009 return (curriedIterable) => _writeToStream(stream, curriedIterable);
1010 }
1011 return _writeToStream(stream, iterable);
1012}
1013
1014export { batch, batchWithTimeout, buffer, collect, concat, consume, drop, filter, flatMap, flatTransform, flatten, fromStream, getIterator, map, merge, parallelFlatMap, parallelMap, parallelMerge, pipeline, reduce, take, tap, throttle, time, transform, writeToStream };