1 |
|
2 |
|
3 | import {makeDeferredWithPromise, type Deferred} from './Deferred';
|
4 |
|
5 | type PromiseQueueOpts = {
|
6 | maxConcurrent: number
|
7 | };
|
8 |
|
9 | export default class PromiseQueue<T> {
|
10 | _deferred: ?Deferred<Array<T>>;
|
11 | _maxConcurrent: number;
|
12 | _numRunning: number = 0;
|
13 | _queue: Array<() => Promise<void>> = [];
|
14 | _runPromise: ?Promise<Array<T>> = null;
|
15 | _count: number = 0;
|
16 | _results: Array<T> = [];
|
17 |
|
18 | constructor(opts: PromiseQueueOpts = {maxConcurrent: Infinity}) {
|
19 | if (opts.maxConcurrent <= 0) {
|
20 | throw new TypeError('maxConcurrent must be a positive, non-zero value');
|
21 | }
|
22 |
|
23 | this._maxConcurrent = opts.maxConcurrent;
|
24 | }
|
25 |
|
26 | add(fn: () => Promise<T>): Promise<T> {
|
27 | return new Promise((resolve, reject) => {
|
28 | let i = this._count++;
|
29 | let wrapped = () =>
|
30 | fn().then(
|
31 | result => {
|
32 | this._results[i] = result;
|
33 | resolve(result);
|
34 | },
|
35 | err => {
|
36 | reject(err);
|
37 | throw err;
|
38 | }
|
39 | );
|
40 |
|
41 | this._queue.push(wrapped);
|
42 |
|
43 | if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) {
|
44 | this._next();
|
45 | }
|
46 | });
|
47 | }
|
48 |
|
49 | run(): Promise<Array<T>> {
|
50 | if (this._runPromise != null) {
|
51 | return this._runPromise;
|
52 | }
|
53 |
|
54 | if (this._queue.length === 0) {
|
55 | return Promise.resolve([]);
|
56 | }
|
57 |
|
58 | let {deferred, promise} = makeDeferredWithPromise();
|
59 | this._deferred = deferred;
|
60 | this._runPromise = promise;
|
61 |
|
62 | while (this._queue.length && this._numRunning < this._maxConcurrent) {
|
63 | this._next();
|
64 | }
|
65 |
|
66 | return promise;
|
67 | }
|
68 |
|
69 | async _next(): Promise<void> {
|
70 | let fn = this._queue.shift();
|
71 | await this._runFn(fn);
|
72 | if (this._queue.length) {
|
73 | this._next();
|
74 | } else if (this._numRunning === 0) {
|
75 | this._resolve();
|
76 | }
|
77 | }
|
78 |
|
79 | async _runFn(fn: () => mixed): Promise<void> {
|
80 | this._numRunning++;
|
81 |
|
82 | try {
|
83 | await fn();
|
84 | this._numRunning--;
|
85 | } catch (e) {
|
86 | this._reject(e);
|
87 |
|
88 | }
|
89 | }
|
90 |
|
91 | _resetState(): void {
|
92 | this._queue = [];
|
93 | this._count = 0;
|
94 | this._results = [];
|
95 | this._runPromise = null;
|
96 | this._numRunning = 0;
|
97 | this._deferred = null;
|
98 | }
|
99 |
|
100 | _reject(err: mixed): void {
|
101 | if (this._deferred != null) {
|
102 | this._deferred.reject(err);
|
103 | }
|
104 | this._resetState();
|
105 | }
|
106 |
|
107 | _resolve(): void {
|
108 | if (this._deferred != null) {
|
109 | this._deferred.resolve(this._results);
|
110 | }
|
111 | this._resetState();
|
112 | }
|
113 | }
|