UNPKG

4.07 kBJavaScriptView Raw
1import AggregateError from 'aggregate-error';
2
3export default async function pMap(
4 iterable,
5 mapper,
6 {
7 concurrency = Number.POSITIVE_INFINITY,
8 stopOnError = true
9 } = {}
10) {
11 return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names
12 if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
13 throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
14 }
15
16 if (typeof mapper !== 'function') {
17 throw new TypeError('Mapper function is required');
18 }
19
20 if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) {
21 throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
22 }
23
24 const result = [];
25 const errors = [];
26 const skippedIndexesMap = new Map();
27 let isRejected = false;
28 let isResolved = false;
29 let isIterableDone = false;
30 let resolvingCount = 0;
31 let currentIndex = 0;
32 const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
33
34 const reject = reason => {
35 isRejected = true;
36 isResolved = true;
37 reject_(reason);
38 };
39
40 const next = async () => {
41 if (isResolved) {
42 return;
43 }
44
45 const nextItem = await iterator.next();
46
47 const index = currentIndex;
48 currentIndex++;
49
50 // Note: `iterator.next()` can be called many times in parallel.
51 // This can cause multiple calls to this `next()` function to
52 // receive a `nextItem` with `done === true`.
53 // The shutdown logic that rejects/resolves must be protected
54 // so it runs only one time as the `skippedIndex` logic is
55 // non-idempotent.
56 if (nextItem.done) {
57 isIterableDone = true;
58
59 if (resolvingCount === 0 && !isResolved) {
60 if (!stopOnError && errors.length > 0) {
61 reject(new AggregateError(errors));
62 return;
63 }
64
65 isResolved = true;
66
67 if (!skippedIndexesMap.size) {
68 resolve(result);
69 return;
70 }
71
72 const pureResult = [];
73
74 // Support multiple `pMapSkip`'s.
75 for (const [index, value] of result.entries()) {
76 if (skippedIndexesMap.get(index) === pMapSkip) {
77 continue;
78 }
79
80 pureResult.push(value);
81 }
82
83 resolve(pureResult);
84 }
85
86 return;
87 }
88
89 resolvingCount++;
90
91 // Intentionally detached
92 (async () => {
93 try {
94 const element = await nextItem.value;
95
96 if (isResolved) {
97 return;
98 }
99
100 const value = await mapper(element, index);
101
102 // Use Map to stage the index of the element.
103 if (value === pMapSkip) {
104 skippedIndexesMap.set(index, value);
105 }
106
107 result[index] = value;
108
109 resolvingCount--;
110 await next();
111 } catch (error) {
112 if (stopOnError) {
113 reject(error);
114 } else {
115 errors.push(error);
116 resolvingCount--;
117
118 // In that case we can't really continue regardless of `stopOnError` state
119 // since an iterable is likely to continue throwing after it throws once.
120 // If we continue calling `next()` indefinitely we will likely end up
121 // in an infinite loop of failed iteration.
122 try {
123 await next();
124 } catch (error) {
125 reject(error);
126 }
127 }
128 }
129 })();
130 };
131
132 // Create the concurrent runners in a detached (non-awaited)
133 // promise. We need this so we can await the `next()` calls
134 // to stop creating runners before hitting the concurrency limit
135 // if the iterable has already been marked as done.
136 // NOTE: We *must* do this for async iterators otherwise we'll spin up
137 // infinite `next()` calls by default and never start the event loop.
138 (async () => {
139 for (let index = 0; index < concurrency; index++) {
140 try {
141 // eslint-disable-next-line no-await-in-loop
142 await next();
143 } catch (error) {
144 reject(error);
145 break;
146 }
147
148 if (isIterableDone || isRejected) {
149 break;
150 }
151 }
152 })();
153 });
154}
155
156export const pMapSkip = Symbol('skip');