UNPKG

2.58 kBJavaScriptView Raw
1// @flow strict-local
2
3import {makeDeferredWithPromise, type Deferred} from './Deferred';
4
5type PromiseQueueOpts = {
6 maxConcurrent: number
7};
8
9export default class PromiseQueue<T> {
10 _deferred: ?Deferred<Array<T>>;
11 _maxConcurrent: number;
12 _numRunning: number = 0;
13 _queue: Array<() => Promise<void>> = [];
14 _runPromise: ?Promise<Array<T>> = null;
15 _count: number = 0;
16 _results: Array<T> = [];
17
18 constructor(opts: PromiseQueueOpts = {maxConcurrent: Infinity}) {
19 if (opts.maxConcurrent <= 0) {
20 throw new TypeError('maxConcurrent must be a positive, non-zero value');
21 }
22
23 this._maxConcurrent = opts.maxConcurrent;
24 }
25
26 add(fn: () => Promise<T>): Promise<T> {
27 return new Promise((resolve, reject) => {
28 let i = this._count++;
29 let wrapped = () =>
30 fn().then(
31 result => {
32 this._results[i] = result;
33 resolve(result);
34 },
35 err => {
36 reject(err);
37 throw err;
38 }
39 );
40
41 this._queue.push(wrapped);
42
43 if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) {
44 this._next();
45 }
46 });
47 }
48
49 run(): Promise<Array<T>> {
50 if (this._runPromise != null) {
51 return this._runPromise;
52 }
53
54 if (this._queue.length === 0) {
55 return Promise.resolve([]);
56 }
57
58 let {deferred, promise} = makeDeferredWithPromise();
59 this._deferred = deferred;
60 this._runPromise = promise;
61
62 while (this._queue.length && this._numRunning < this._maxConcurrent) {
63 this._next();
64 }
65
66 return promise;
67 }
68
69 async _next(): Promise<void> {
70 let fn = this._queue.shift();
71 await this._runFn(fn);
72 if (this._queue.length) {
73 this._next();
74 } else if (this._numRunning === 0) {
75 this._resolve();
76 }
77 }
78
79 async _runFn(fn: () => mixed): Promise<void> {
80 this._numRunning++;
81 // console.log('RUNNING', this._numRunning, this._maxConcurrent, this._queue.length)
82 try {
83 await fn();
84 this._numRunning--;
85 } catch (e) {
86 this._reject(e);
87 // rejecting resets state so numRunning is reset to 0 here
88 }
89 }
90
91 _resetState(): void {
92 this._queue = [];
93 this._count = 0;
94 this._results = [];
95 this._runPromise = null;
96 this._numRunning = 0;
97 this._deferred = null;
98 }
99
100 _reject(err: mixed): void {
101 if (this._deferred != null) {
102 this._deferred.reject(err);
103 }
104 this._resetState();
105 }
106
107 _resolve(): void {
108 if (this._deferred != null) {
109 this._deferred.resolve(this._results);
110 }
111 this._resetState();
112 }
113}