UNPKG

33.8 kBJavaScriptView Raw
1(function (global, factory) {
2 typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports) :
3 typeof define === 'function' && define.amd ? define(['exports'], factory) :
4 (global = typeof globalThis !== 'undefined' ? globalThis : global || self, factory(global.streamingIterables = {}));
5})(this, (function (exports) { 'use strict';
6
7 async function* _batch(size, iterable) {
8 let dataBatch = [];
9 for await (const data of iterable) {
10 dataBatch.push(data);
11 if (dataBatch.length === size) {
12 yield dataBatch;
13 dataBatch = [];
14 }
15 }
16 if (dataBatch.length > 0) {
17 yield dataBatch;
18 }
19 }
20 function* _syncBatch(size, iterable) {
21 let dataBatch = [];
22 for (const data of iterable) {
23 dataBatch.push(data);
24 if (dataBatch.length === size) {
25 yield dataBatch;
26 dataBatch = [];
27 }
28 }
29 if (dataBatch.length > 0) {
30 yield dataBatch;
31 }
32 }
33 function batch(size, iterable) {
34 if (iterable === undefined) {
35 return curriedIterable => batch(size, curriedIterable);
36 }
37 if (iterable[Symbol.asyncIterator]) {
38 return _batch(size, iterable);
39 }
40 return _syncBatch(size, iterable);
41 }
42
43 const TIMEOUT = Symbol('TIMEOUT');
44 const createTimer = (duration) => {
45 let timeoutId;
46 return [
47 new Promise(resolve => {
48 timeoutId = setTimeout(() => resolve(TIMEOUT), duration);
49 }),
50 () => {
51 clearTimeout(timeoutId);
52 },
53 ];
54 };
55 // Like `batch` but flushes early if the `timeout` is reached
56 // NOTE: The strategy is to only hold onto a single item for a maximum of `timeout` ms.
57 async function* _batchWithTimeout(size, timeout, iterable) {
58 const iterator = iterable[Symbol.asyncIterator]();
59 let pendingData;
60 let batchData = [];
61 let timer;
62 let clearTimer;
63 const startTimer = () => {
64 deleteTimer();
65 [timer, clearTimer] = createTimer(timeout);
66 };
67 const deleteTimer = () => {
68 if (clearTimer) {
69 clearTimer();
70 }
71 timer = undefined;
72 };
73 pendingData = iterator.next();
74 while (true) {
75 const res = await (timer ? Promise.race([pendingData, timer]) : pendingData);
76 if (res === TIMEOUT || res.done) {
77 // Flush early (before we reach the batch size)
78 if (batchData.length) {
79 yield batchData;
80 batchData = [];
81 }
82 deleteTimer();
83 // And exit appropriately
84 if (res !== TIMEOUT) {
85 // done
86 break;
87 }
88 continue;
89 }
90 // Fetch next item early doors (before we potentially yield)
91 pendingData = iterator.next();
92 // Then handle the value
93 batchData.push(res.value);
94 if (batchData.length === 1) {
95 // Start timer once we have at least 1 item ready to go
96 startTimer();
97 }
98 if (batchData.length === size) {
99 yield batchData;
100 batchData = [];
101 deleteTimer();
102 continue;
103 }
104 }
105 }
106 function batchWithTimeout(size, timeout, iterable) {
107 if (iterable === undefined) {
108 return curriedIterable => batchWithTimeout(size, timeout, curriedIterable);
109 }
110 if (iterable[Symbol.asyncIterator] && timeout !== Infinity) {
111 return _batchWithTimeout(size, timeout, iterable);
112 }
113 // For sync iterables or an infinite timeout, the timeout is irrelevant so just fallback to regular `batch`.
114 return batch(size, iterable);
115 }
116
117 function getIterator(iterable) {
118 if (typeof iterable.next === 'function') {
119 return iterable;
120 }
121 if (typeof iterable[Symbol.iterator] === 'function') {
122 return iterable[Symbol.iterator]();
123 }
124 if (typeof iterable[Symbol.asyncIterator] === 'function') {
125 return iterable[Symbol.asyncIterator]();
126 }
127 throw new TypeError('"values" does not to conform to any of the iterator or iterable protocols');
128 }
129
130 function defer() {
131 let reject;
132 let resolve;
133 const promise = new Promise((resolveFunc, rejectFunc) => {
134 resolve = resolveFunc;
135 reject = rejectFunc;
136 });
137 return {
138 promise,
139 reject,
140 resolve,
141 };
142 }
143
144 function _buffer(size, iterable) {
145 const iterator = getIterator(iterable);
146 const resultQueue = [];
147 const readQueue = [];
148 let reading = false;
149 let ended = false;
150 function fulfillReadQueue() {
151 while (readQueue.length > 0 && resultQueue.length > 0) {
152 const readDeferred = readQueue.shift();
153 const { error, value } = resultQueue.shift();
154 if (error) {
155 readDeferred.reject(error);
156 }
157 else {
158 readDeferred.resolve({ done: false, value });
159 }
160 }
161 while (readQueue.length > 0 && ended) {
162 const { resolve } = readQueue.shift();
163 resolve({ done: true, value: undefined });
164 }
165 }
166 async function fillQueue() {
167 if (ended) {
168 return;
169 }
170 if (reading) {
171 return;
172 }
173 if (resultQueue.length >= size) {
174 return;
175 }
176 reading = true;
177 try {
178 const { done, value } = await iterator.next();
179 if (done) {
180 ended = true;
181 }
182 else {
183 resultQueue.push({ value });
184 }
185 }
186 catch (error) {
187 ended = true;
188 resultQueue.push({ error });
189 }
190 fulfillReadQueue();
191 reading = false;
192 fillQueue();
193 }
194 async function next() {
195 if (resultQueue.length > 0) {
196 const { error, value } = resultQueue.shift();
197 if (error) {
198 throw error;
199 }
200 fillQueue();
201 return { done: false, value };
202 }
203 if (ended) {
204 return { done: true, value: undefined }; // stupid ts
205 }
206 const deferred = defer();
207 readQueue.push(deferred);
208 fillQueue();
209 return deferred.promise;
210 }
211 const asyncIterableIterator = {
212 next,
213 [Symbol.asyncIterator]: () => asyncIterableIterator,
214 };
215 return asyncIterableIterator;
216 }
217 function* syncBuffer(size, iterable) {
218 const valueQueue = [];
219 let e;
220 try {
221 for (const value of iterable) {
222 valueQueue.push(value);
223 if (valueQueue.length <= size) {
224 continue;
225 }
226 yield valueQueue.shift();
227 }
228 }
229 catch (error) {
230 e = error;
231 }
232 for (const value of valueQueue) {
233 yield value;
234 }
235 if (e) {
236 throw e;
237 }
238 }
239 function buffer(size, iterable) {
240 if (iterable === undefined) {
241 return curriedIterable => buffer(size, curriedIterable);
242 }
243 if (size === 0) {
244 return iterable;
245 }
246 if (iterable[Symbol.asyncIterator]) {
247 return _buffer(size, iterable);
248 }
249 return syncBuffer(size, iterable);
250 }
251
252 async function _collect(iterable) {
253 const values = [];
254 for await (const value of iterable) {
255 values.push(value);
256 }
257 return values;
258 }
259 function collect(iterable) {
260 if (iterable[Symbol.asyncIterator]) {
261 return _collect(iterable);
262 }
263 return Array.from(iterable);
264 }
265
266 async function* _concat(iterables) {
267 for await (const iterable of iterables) {
268 yield* iterable;
269 }
270 }
271 function* _syncConcat(iterables) {
272 for (const iterable of iterables) {
273 yield* iterable;
274 }
275 }
276 function concat(...iterables) {
277 const hasAnyAsync = iterables.find(itr => itr[Symbol.asyncIterator] !== undefined);
278 if (hasAnyAsync) {
279 return _concat(iterables);
280 }
281 else {
282 return _syncConcat(iterables);
283 }
284 }
285
286 async function _consume(iterable) {
287 for await (const _val of iterable) {
288 // do nothing
289 }
290 }
291 function consume(iterable) {
292 if (iterable[Symbol.asyncIterator]) {
293 return _consume(iterable);
294 }
295 for (const _val of iterable) {
296 // do nothing
297 }
298 }
299
300 async function* _drop(count, iterable) {
301 let skipped = 0;
302 for await (const val of iterable) {
303 if (skipped < count) {
304 skipped++;
305 continue;
306 }
307 yield await val;
308 }
309 }
310 function* _syncDrop(count, iterable) {
311 let skipped = 0;
312 for (const val of iterable) {
313 if (skipped < count) {
314 skipped++;
315 continue;
316 }
317 yield val;
318 }
319 }
320 function drop(count, iterable) {
321 if (iterable === undefined) {
322 return curriedIterable => drop(count, curriedIterable);
323 }
324 if (iterable[Symbol.asyncIterator]) {
325 return _drop(count, iterable);
326 }
327 return _syncDrop(count, iterable);
328 }
329
330 async function* _filter(filterFunc, iterable) {
331 for await (const data of iterable) {
332 if (await filterFunc(data)) {
333 yield data;
334 }
335 }
336 }
337 function filter(filterFunc, iterable) {
338 if (iterable === undefined) {
339 return (curriedIterable) => _filter(filterFunc, curriedIterable);
340 }
341 return _filter(filterFunc, iterable);
342 }
343
344 /**
345 * Returns a new iterator by pulling every item out of `iterable` (and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists. If the value is a string it is not iterated as that ends up in an infinite loop. Errors from the source `iterable` are raised immediately.
346
347 *note*: Typescript doesn't have recursive types but you can nest iterables as deep as you like.
348
349 ```ts
350 import { flatten } from 'streaming-iterables'
351
352 for await (const item of flatten([1, 2, [3, [4, 5], 6])) {
353 console.log(item)
354 }
355 // 1
356 // 2
357 // 3
358 // 4
359 // 5
360 // 6
361 ```
362 */
363 async function* flatten(iterable) {
364 for await (const maybeItr of iterable) {
365 if (maybeItr && typeof maybeItr !== 'string' && (maybeItr[Symbol.iterator] || maybeItr[Symbol.asyncIterator])) {
366 yield* flatten(maybeItr);
367 }
368 else {
369 yield maybeItr;
370 }
371 }
372 }
373
374 async function* _map(func, iterable) {
375 for await (const val of iterable) {
376 yield await func(val);
377 }
378 }
379 function map(func, iterable) {
380 if (iterable === undefined) {
381 return curriedIterable => _map(func, curriedIterable);
382 }
383 return _map(func, iterable);
384 }
385
386 function flatMap(func, iterable) {
387 if (iterable === undefined) {
388 return curriedIterable => flatMap(func, curriedIterable);
389 }
390 return filter(i => i !== undefined && i !== null, flatten(map(func, iterable)));
391 }
392
393 function _flatTransform(concurrency, func, iterable) {
394 const iterator = getIterator(iterable);
395 const resultQueue = [];
396 const readQueue = [];
397 let ended = false;
398 let reading = false;
399 let inflightCount = 0;
400 let lastError = null;
401 function fulfillReadQueue() {
402 while (readQueue.length > 0 && resultQueue.length > 0) {
403 const { resolve } = readQueue.shift();
404 const value = resultQueue.shift();
405 resolve({ done: false, value });
406 }
407 while (readQueue.length > 0 && inflightCount === 0 && ended) {
408 const { resolve, reject } = readQueue.shift();
409 if (lastError) {
410 reject(lastError);
411 lastError = null;
412 }
413 else {
414 resolve({ done: true, value: undefined });
415 }
416 }
417 }
418 async function fillQueue() {
419 if (ended) {
420 fulfillReadQueue();
421 return;
422 }
423 if (reading) {
424 return;
425 }
426 if (inflightCount + resultQueue.length >= concurrency) {
427 return;
428 }
429 reading = true;
430 inflightCount++;
431 try {
432 const { done, value } = await iterator.next();
433 if (done) {
434 ended = true;
435 inflightCount--;
436 fulfillReadQueue();
437 }
438 else {
439 mapAndQueue(value);
440 }
441 }
442 catch (error) {
443 ended = true;
444 inflightCount--;
445 lastError = error;
446 fulfillReadQueue();
447 }
448 reading = false;
449 fillQueue();
450 }
451 async function mapAndQueue(itrValue) {
452 try {
453 const value = await func(itrValue);
454 if (value && value[Symbol.asyncIterator]) {
455 for await (const asyncVal of value) {
456 resultQueue.push(asyncVal);
457 }
458 }
459 else {
460 resultQueue.push(value);
461 }
462 }
463 catch (error) {
464 ended = true;
465 lastError = error;
466 }
467 inflightCount--;
468 fulfillReadQueue();
469 fillQueue();
470 }
471 async function next() {
472 if (resultQueue.length === 0) {
473 const deferred = defer();
474 readQueue.push(deferred);
475 fillQueue();
476 return deferred.promise;
477 }
478 const value = resultQueue.shift();
479 fillQueue();
480 return { done: false, value };
481 }
482 const asyncIterableIterator = {
483 next,
484 [Symbol.asyncIterator]: () => asyncIterableIterator,
485 };
486 return asyncIterableIterator;
487 }
488 function flatTransform(concurrency, func, iterable) {
489 if (func === undefined) {
490 return (curriedFunc, curriedIterable) => curriedIterable
491 ? flatTransform(concurrency, curriedFunc, curriedIterable)
492 : flatTransform(concurrency, curriedFunc);
493 }
494 if (iterable === undefined) {
495 return (curriedIterable) => flatTransform(concurrency, func, curriedIterable);
496 }
497 return filter(i => i !== undefined && i !== null, flatten(_flatTransform(concurrency, func, iterable)));
498 }
499
500 async function onceReadable(stream) {
501 return new Promise(resolve => {
502 stream.once('readable', () => {
503 resolve();
504 });
505 });
506 }
507 async function* _fromStream(stream) {
508 while (true) {
509 const data = stream.read();
510 if (data !== null) {
511 yield data;
512 continue;
513 }
514 if (stream._readableState.ended) {
515 break;
516 }
517 await onceReadable(stream);
518 }
519 }
520 /**
521 * Wraps the stream in an async iterator or returns the stream if it already is an async iterator.
522
523 *note*: Since Node 10, streams already async iterators. This function may be used to ensure compatibility with older versions of Node.
524
525 ```ts
526 import { fromStream } from 'streaming-iterables'
527 import { createReadStream } from 'fs'
528
529 const pokeLog = fromStream(createReadStream('./pokedex-operating-system.log'))
530
531 for await (const pokeData of pokeLog) {
532 console.log(pokeData) // Buffer(...)
533 }
534 ```
535 * @deprecated This method is deprecated since, node 10 is out of LTS. It may be removed in an upcoming major release.
536 */
537 function fromStream(stream) {
538 if (typeof stream[Symbol.asyncIterator] === 'function') {
539 return stream;
540 }
541 return _fromStream(stream);
542 }
543
544 /**
545 * Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted. If you care less about order and want them faster see `parallelMerge()`.
546 */
547 async function* merge(...iterables) {
548 const sources = new Set(iterables.map(getIterator));
549 while (sources.size > 0) {
550 for (const iterator of sources) {
551 const nextVal = await iterator.next();
552 if (nextVal.done) {
553 sources.delete(iterator);
554 }
555 else {
556 yield nextVal.value;
557 }
558 }
559 }
560 }
561
562 function pipeline(firstFn, ...fns) {
563 let previousFn = firstFn();
564 for (const func of fns) {
565 previousFn = func(previousFn);
566 }
567 return previousFn;
568 }
569
570 async function* _parallelMap(concurrency, func, iterable) {
571 let transformError = null;
572 const wrapFunc = value => ({
573 value: func(value),
574 });
575 const stopOnError = async function* (source) {
576 for await (const value of source) {
577 if (transformError) {
578 return;
579 }
580 yield value;
581 }
582 };
583 const output = pipeline(() => iterable, buffer(1), stopOnError, map(wrapFunc), buffer(concurrency - 1));
584 const itr = getIterator(output);
585 while (true) {
586 const { value, done } = await itr.next();
587 if (done) {
588 break;
589 }
590 try {
591 const val = await value.value;
592 if (!transformError) {
593 yield val;
594 }
595 }
596 catch (error) {
597 transformError = error;
598 }
599 }
600 if (transformError) {
601 throw transformError;
602 }
603 }
604 function parallelMap(concurrency, func, iterable) {
605 if (func === undefined) {
606 return (curriedFunc, curriedIterable) => parallelMap(concurrency, curriedFunc, curriedIterable);
607 }
608 if (iterable === undefined) {
609 return curriedIterable => parallelMap(concurrency, func, curriedIterable);
610 }
611 if (concurrency === 1) {
612 return map(func, iterable);
613 }
614 return _parallelMap(concurrency, func, iterable);
615 }
616
617 function parallelFlatMap(concurrency, func, iterable) {
618 if (func === undefined) {
619 return (curriedFunc, curriedIterable) => curriedIterable
620 ? parallelFlatMap(concurrency, curriedFunc, curriedIterable)
621 : parallelFlatMap(concurrency, curriedFunc);
622 }
623 if (iterable === undefined) {
624 return (curriedIterable) => parallelFlatMap(concurrency, func, curriedIterable);
625 }
626 return filter(i => i !== undefined && i !== null, flatten(parallelMap(concurrency, func, iterable)));
627 }
628
629 /**
630 *Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of a collection of iterables as soon as they're available. Errors `iterables` are raised immediately.
631
632 ```ts
633 import { parallelMerge } from 'streaming-iterables'
634 import { getPokemon, getTransformer } from 'iterable-pokedex'
635
636 // pokemon are much faster to load btw
637 const heros = parallelMerge(getPokemon(), getTransformer())
638 for await (const hero of heros) {
639 console.log(hero)
640 }
641 // charmander
642 // bulbasaur
643 // megatron
644 // pikachu
645 // eevee
646 // bumblebee
647 // jazz
648 ```
649 */
650 async function* parallelMerge(...iterables) {
651 const inputs = iterables.map(getIterator);
652 const concurrentWork = new Set();
653 const values = new Map();
654 let lastError = null;
655 let errCb = null;
656 let valueCb = null;
657 const notifyError = err => {
658 lastError = err;
659 if (errCb) {
660 errCb(err);
661 }
662 };
663 const notifyDone = value => {
664 if (valueCb) {
665 valueCb(value);
666 }
667 };
668 const waitForQueue = () => new Promise((resolve, reject) => {
669 if (lastError) {
670 reject(lastError);
671 }
672 if (values.size > 0) {
673 return resolve();
674 }
675 valueCb = resolve;
676 errCb = reject;
677 });
678 const queueNext = input => {
679 const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {
680 if (!done) {
681 values.set(input, value);
682 }
683 concurrentWork.delete(nextVal);
684 });
685 concurrentWork.add(nextVal);
686 nextVal.then(notifyDone, notifyError);
687 };
688 for (const input of inputs) {
689 queueNext(input);
690 }
691 while (true) {
692 // We technically don't have to check `values.size` as the for loop should have emptied it
693 // However I haven't yet found specs verifying that behavior, only tests
694 // the guard in waitForQueue() checking for values is in place for the same reason
695 if (concurrentWork.size === 0 && values.size === 0) {
696 return;
697 }
698 await waitForQueue();
699 for (const [input, value] of values) {
700 values.delete(input);
701 yield value;
702 queueNext(input);
703 }
704 }
705 }
706
707 async function _reduce(func, start, iterable) {
708 let value = start;
709 for await (const nextItem of iterable) {
710 value = await func(value, nextItem);
711 }
712 return value;
713 }
714 function reduce(func, start, iterable) {
715 if (start === undefined) {
716 return (curriedStart, curriedIterable) => curriedIterable ? _reduce(func, curriedStart, curriedIterable) : reduce(func, curriedStart);
717 }
718 if (iterable === undefined) {
719 return (curriedIterable) => reduce(func, start, curriedIterable);
720 }
721 return _reduce(func, start, iterable);
722 }
723
724 async function* _take(count, iterable) {
725 let taken = 0;
726 for await (const val of iterable) {
727 yield await val;
728 taken++;
729 if (taken >= count) {
730 break;
731 }
732 }
733 }
734 function* _syncTake(count, iterable) {
735 let taken = 0;
736 for (const val of iterable) {
737 yield val;
738 taken++;
739 if (taken >= count) {
740 break;
741 }
742 }
743 }
744 function take(count, iterable) {
745 if (iterable === undefined) {
746 return curriedIterable => take(count, curriedIterable);
747 }
748 if (iterable[Symbol.asyncIterator]) {
749 return _take(count, iterable);
750 }
751 return _syncTake(count, iterable);
752 }
753
754 async function* _asyncTap(func, iterable) {
755 for await (const val of iterable) {
756 await func(val);
757 yield val;
758 }
759 }
760 function tap(func, iterable) {
761 if (iterable === undefined) {
762 return (curriedIterable) => _asyncTap(func, curriedIterable);
763 }
764 return _asyncTap(func, iterable);
765 }
766
767 const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
768 function _throttle(limit, interval, iterable) {
769 if (!Number.isFinite(limit)) {
770 throw new TypeError('Expected `limit` to be a finite number');
771 }
772 if (limit <= 0) {
773 throw new TypeError('Expected `limit` to be greater than 0');
774 }
775 if (!Number.isFinite(interval)) {
776 throw new TypeError('Expected `interval` to be a finite number');
777 }
778 return (async function* __throttle() {
779 let sent = 0;
780 let time;
781 for await (const val of iterable) {
782 if (sent < limit) {
783 if (typeof time === 'undefined') {
784 time = Date.now();
785 }
786 sent++;
787 yield val;
788 continue;
789 }
790 // Only wait if the interval hasn't already passed while we were
791 // yielding the previous values.
792 const elapsedMs = Date.now() - (time || 0);
793 const waitFor = interval - elapsedMs;
794 if (waitFor > 0) {
795 await sleep(waitFor);
796 }
797 time = Date.now();
798 sent = 1;
799 yield val;
800 }
801 })();
802 }
803 function throttle(limit, interval, iterable) {
804 if (iterable === undefined) {
805 return (curriedIterable) => _throttle(limit, interval, curriedIterable);
806 }
807 return _throttle(limit, interval, iterable);
808 }
809
810 function addTime(a, b) {
811 let seconds = a[0] + b[0];
812 let nanoseconds = a[1] + b[1];
813 if (nanoseconds >= 1000000000) {
814 const remainder = nanoseconds % 1000000000;
815 seconds += (nanoseconds - remainder) / 1000000000;
816 nanoseconds = remainder;
817 }
818 return [seconds, nanoseconds];
819 }
820 async function* _asyncTime(config, iterable) {
821 const itr = iterable[Symbol.asyncIterator]();
822 let total = [0, 0];
823 while (true) {
824 const start = process.hrtime();
825 const { value, done } = await itr.next();
826 const delta = process.hrtime(start);
827 total = addTime(total, delta);
828 if (config.progress) {
829 config.progress(delta, total);
830 }
831 if (done) {
832 if (config.total) {
833 config.total(total);
834 }
835 return value;
836 }
837 yield value;
838 }
839 }
840 function* _syncTime(config, iterable) {
841 const itr = iterable[Symbol.iterator]();
842 let total = [0, 0];
843 while (true) {
844 const start = process.hrtime();
845 const { value, done } = itr.next();
846 const delta = process.hrtime(start);
847 total = addTime(total, delta);
848 if (config.progress) {
849 config.progress(delta, total);
850 }
851 if (done) {
852 if (config.total) {
853 config.total(total);
854 }
855 return value;
856 }
857 yield value;
858 }
859 }
860 function time(config = {}, iterable) {
861 if (iterable === undefined) {
862 return curriedIterable => time(config, curriedIterable);
863 }
864 if (iterable[Symbol.asyncIterator] !== undefined) {
865 return _asyncTime(config, iterable);
866 }
867 else {
868 return _syncTime(config, iterable);
869 }
870 }
871
872 function _transform(concurrency, func, iterable) {
873 const iterator = getIterator(iterable);
874 const resultQueue = [];
875 const readQueue = [];
876 let ended = false;
877 let reading = false;
878 let inflightCount = 0;
879 let lastError = null;
880 function fulfillReadQueue() {
881 while (readQueue.length > 0 && resultQueue.length > 0) {
882 const { resolve } = readQueue.shift();
883 const value = resultQueue.shift();
884 resolve({ done: false, value });
885 }
886 while (readQueue.length > 0 && inflightCount === 0 && ended) {
887 const { resolve, reject } = readQueue.shift();
888 if (lastError) {
889 reject(lastError);
890 lastError = null;
891 }
892 else {
893 resolve({ done: true, value: undefined });
894 }
895 }
896 }
897 async function fillQueue() {
898 if (ended) {
899 fulfillReadQueue();
900 return;
901 }
902 if (reading) {
903 return;
904 }
905 if (inflightCount + resultQueue.length >= concurrency) {
906 return;
907 }
908 reading = true;
909 inflightCount++;
910 try {
911 const { done, value } = await iterator.next();
912 if (done) {
913 ended = true;
914 inflightCount--;
915 fulfillReadQueue();
916 }
917 else {
918 mapAndQueue(value);
919 }
920 }
921 catch (error) {
922 ended = true;
923 inflightCount--;
924 lastError = error;
925 fulfillReadQueue();
926 }
927 reading = false;
928 fillQueue();
929 }
930 async function mapAndQueue(itrValue) {
931 try {
932 const value = await func(itrValue);
933 resultQueue.push(value);
934 }
935 catch (error) {
936 ended = true;
937 lastError = error;
938 }
939 inflightCount--;
940 fulfillReadQueue();
941 fillQueue();
942 }
943 async function next() {
944 if (resultQueue.length === 0) {
945 const deferred = defer();
946 readQueue.push(deferred);
947 fillQueue();
948 return deferred.promise;
949 }
950 const value = resultQueue.shift();
951 fillQueue();
952 return { done: false, value };
953 }
954 const asyncIterableIterator = {
955 next,
956 [Symbol.asyncIterator]: () => asyncIterableIterator,
957 };
958 return asyncIterableIterator;
959 }
960 function transform(concurrency, func, iterable) {
961 if (func === undefined) {
962 return (curriedFunc, curriedIterable) => curriedIterable
963 ? transform(concurrency, curriedFunc, curriedIterable)
964 : transform(concurrency, curriedFunc);
965 }
966 if (iterable === undefined) {
967 return (curriedIterable) => transform(concurrency, func, curriedIterable);
968 }
969 return _transform(concurrency, func, iterable);
970 }
971
972 async function _writeToStream(stream, iterable) {
973 let lastError = null;
974 let errCb = null;
975 let drainCb = null;
976 const notifyError = err => {
977 lastError = err;
978 if (errCb) {
979 errCb(err);
980 }
981 };
982 const notifyDrain = () => {
983 if (drainCb) {
984 drainCb();
985 }
986 };
987 const cleanup = () => {
988 stream.removeListener('error', notifyError);
989 stream.removeListener('drain', notifyDrain);
990 };
991 stream.once('error', notifyError);
992 const waitForDrain = () => new Promise((resolve, reject) => {
993 if (lastError) {
994 return reject(lastError);
995 }
996 stream.once('drain', notifyDrain);
997 drainCb = resolve;
998 errCb = reject;
999 });
1000 for await (const value of iterable) {
1001 if (stream.write(value) === false) {
1002 await waitForDrain();
1003 }
1004 if (lastError) {
1005 break;
1006 }
1007 }
1008 cleanup();
1009 if (lastError) {
1010 throw lastError;
1011 }
1012 }
1013 function writeToStream(stream, iterable) {
1014 if (iterable === undefined) {
1015 return (curriedIterable) => _writeToStream(stream, curriedIterable);
1016 }
1017 return _writeToStream(stream, iterable);
1018 }
1019
1020 exports.batch = batch;
1021 exports.batchWithTimeout = batchWithTimeout;
1022 exports.buffer = buffer;
1023 exports.collect = collect;
1024 exports.concat = concat;
1025 exports.consume = consume;
1026 exports.drop = drop;
1027 exports.filter = filter;
1028 exports.flatMap = flatMap;
1029 exports.flatTransform = flatTransform;
1030 exports.flatten = flatten;
1031 exports.fromStream = fromStream;
1032 exports.getIterator = getIterator;
1033 exports.map = map;
1034 exports.merge = merge;
1035 exports.parallelFlatMap = parallelFlatMap;
1036 exports.parallelMap = parallelMap;
1037 exports.parallelMerge = parallelMerge;
1038 exports.pipeline = pipeline;
1039 exports.reduce = reduce;
1040 exports.take = take;
1041 exports.tap = tap;
1042 exports.throttle = throttle;
1043 exports.time = time;
1044 exports.transform = transform;
1045 exports.writeToStream = writeToStream;
1046
1047 Object.defineProperty(exports, '__esModule', { value: true });
1048
1049}));