UNPKG

2.69 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7
8var _Deferred = require("./Deferred");
9
10function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
11
12class PromiseQueue {
13 constructor(opts = {
14 maxConcurrent: Infinity
15 }) {
16 _defineProperty(this, "_deferred", void 0);
17
18 _defineProperty(this, "_maxConcurrent", void 0);
19
20 _defineProperty(this, "_numRunning", 0);
21
22 _defineProperty(this, "_queue", []);
23
24 _defineProperty(this, "_runPromise", null);
25
26 _defineProperty(this, "_count", 0);
27
28 _defineProperty(this, "_results", []);
29
30 if (opts.maxConcurrent <= 0) {
31 throw new TypeError('maxConcurrent must be a positive, non-zero value');
32 }
33
34 this._maxConcurrent = opts.maxConcurrent;
35 }
36
37 getNumWaiting() {
38 return this._queue.length;
39 }
40
41 add(fn) {
42 return new Promise((resolve, reject) => {
43 let i = this._count++;
44
45 this._queue.push(() => fn().then(result => {
46 this._results[i] = result;
47 resolve(result);
48 }, err => {
49 reject(err);
50 throw err;
51 }));
52
53 if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) {
54 this._next();
55 }
56 });
57 }
58
59 run() {
60 if (this._runPromise != null) {
61 return this._runPromise;
62 }
63
64 if (this._queue.length === 0) {
65 return Promise.resolve([]);
66 }
67
68 let {
69 deferred,
70 promise
71 } = (0, _Deferred.makeDeferredWithPromise)();
72 this._deferred = deferred;
73 this._runPromise = promise;
74
75 while (this._queue.length && this._numRunning < this._maxConcurrent) {
76 this._next();
77 }
78
79 return promise;
80 }
81
82 async _next() {
83 let fn = this._queue.shift();
84
85 await this._runFn(fn);
86
87 if (this._queue.length) {
88 this._next();
89 } else if (this._numRunning === 0) {
90 this._resolve();
91 }
92 }
93
94 async _runFn(fn) {
95 this._numRunning++;
96
97 try {
98 await fn();
99 this._numRunning--;
100 } catch (e) {
101 this._reject(e); // rejecting resets state so numRunning is reset to 0 here
102
103 }
104 }
105
106 _resetState() {
107 this._queue = [];
108 this._count = 0;
109 this._results = [];
110 this._runPromise = null;
111 this._numRunning = 0;
112 this._deferred = null;
113 }
114
115 _reject(err) {
116 if (this._deferred != null) {
117 this._deferred.reject(err);
118 }
119
120 this._resetState();
121 }
122
123 _resolve() {
124 if (this._deferred != null) {
125 this._deferred.resolve(this._results);
126 }
127
128 this._resetState();
129 }
130
131}
132
133exports.default = PromiseQueue;
\No newline at end of file