UNPKG

2 kBJavaScriptView Raw
1import AggregateError from 'aggregate-error';
2
3export default async function pMap(
4 iterable,
5 mapper,
6 {
7 concurrency = Number.POSITIVE_INFINITY,
8 stopOnError = true
9 } = {}
10) {
11 return new Promise((resolve, reject) => {
12 if (typeof mapper !== 'function') {
13 throw new TypeError('Mapper function is required');
14 }
15
16 if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) {
17 throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
18 }
19
20 const result = [];
21 const errors = [];
22 const skippedIndexes = [];
23 const iterator = iterable[Symbol.iterator]();
24 let isRejected = false;
25 let isIterableDone = false;
26 let resolvingCount = 0;
27 let currentIndex = 0;
28
29 const next = () => {
30 if (isRejected) {
31 return;
32 }
33
34 const nextItem = iterator.next();
35 const index = currentIndex;
36 currentIndex++;
37
38 if (nextItem.done) {
39 isIterableDone = true;
40
41 if (resolvingCount === 0) {
42 if (!stopOnError && errors.length > 0) {
43 reject(new AggregateError(errors));
44 } else {
45 for (const skippedIndex of skippedIndexes) {
46 result.splice(skippedIndex, 1);
47 }
48
49 resolve(result);
50 }
51 }
52
53 return;
54 }
55
56 resolvingCount++;
57
58 (async () => {
59 try {
60 const element = await nextItem.value;
61
62 if (isRejected) {
63 return;
64 }
65
66 const value = await mapper(element, index);
67 if (value === pMapSkip) {
68 skippedIndexes.push(index);
69 } else {
70 result[index] = value;
71 }
72
73 resolvingCount--;
74 next();
75 } catch (error) {
76 if (stopOnError) {
77 isRejected = true;
78 reject(error);
79 } else {
80 errors.push(error);
81 resolvingCount--;
82 next();
83 }
84 }
85 })();
86 };
87
88 for (let index = 0; index < concurrency; index++) {
89 next();
90
91 if (isIterableDone) {
92 break;
93 }
94 }
95 });
96}
97
98export const pMapSkip = Symbol('skip');