UNPKG

1.48 kBJavaScriptView Raw
1/*
2 MIT License http://www.opensource.org/licenses/mit-license.php
3 Author Tobias Koppers @sokra
4*/
5
6"use strict";
7
8/**
9 * @template T
10 * @template {Error} E
11 * @param {Iterable<T>} items initial items
12 * @param {number} concurrency number of items running in parallel
13 * @param {function(T, function(T): void, function(E=): void): void} processor worker which pushes more items
14 * @param {function(E=): void} callback all items processed
15 * @returns {void}
16 */
17const processAsyncTree = (items, concurrency, processor, callback) => {
18 const queue = Array.from(items);
19 if (queue.length === 0) return callback();
20 let processing = 0;
21 let finished = false;
22 let processScheduled = true;
23
24 const push = item => {
25 queue.push(item);
26 if (!processScheduled && processing < concurrency) {
27 processScheduled = true;
28 process.nextTick(processQueue);
29 }
30 };
31
32 const processorCallback = err => {
33 processing--;
34 if (err && !finished) {
35 finished = true;
36 callback(err);
37 return;
38 }
39 if (!processScheduled) {
40 processScheduled = true;
41 process.nextTick(processQueue);
42 }
43 };
44
45 const processQueue = () => {
46 if (finished) return;
47 while (processing < concurrency && queue.length > 0) {
48 processing++;
49 const item = queue.pop();
50 processor(item, push, processorCallback);
51 }
52 processScheduled = false;
53 if (queue.length === 0 && processing === 0 && !finished) {
54 finished = true;
55 callback();
56 }
57 };
58
59 processQueue();
60};
61
62module.exports = processAsyncTree;