UNPKG

2.75 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7
8var _Deferred = require("./Deferred");
9
10function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
11
12class PromiseQueue {
13 constructor(opts = {
14 maxConcurrent: Infinity
15 }) {
16 _defineProperty(this, "_deferred", void 0);
17
18 _defineProperty(this, "_maxConcurrent", void 0);
19
20 _defineProperty(this, "_numRunning", 0);
21
22 _defineProperty(this, "_queue", []);
23
24 _defineProperty(this, "_runPromise", null);
25
26 _defineProperty(this, "_count", 0);
27
28 _defineProperty(this, "_results", []);
29
30 if (opts.maxConcurrent <= 0) {
31 throw new TypeError('maxConcurrent must be a positive, non-zero value');
32 }
33
34 this._maxConcurrent = opts.maxConcurrent;
35 }
36
37 add(fn) {
38 return new Promise((resolve, reject) => {
39 let i = this._count++;
40
41 let wrapped = () => fn().then(result => {
42 this._results[i] = result;
43 resolve(result);
44 }, err => {
45 reject(err);
46 throw err;
47 });
48
49 this._queue.push(wrapped);
50
51 if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) {
52 this._next();
53 }
54 });
55 }
56
57 run() {
58 if (this._runPromise != null) {
59 return this._runPromise;
60 }
61
62 if (this._queue.length === 0) {
63 return Promise.resolve([]);
64 }
65
66 let {
67 deferred,
68 promise
69 } = (0, _Deferred.makeDeferredWithPromise)();
70 this._deferred = deferred;
71 this._runPromise = promise;
72
73 while (this._queue.length && this._numRunning < this._maxConcurrent) {
74 this._next();
75 }
76
77 return promise;
78 }
79
80 async _next() {
81 let fn = this._queue.shift();
82
83 await this._runFn(fn);
84
85 if (this._queue.length) {
86 this._next();
87 } else if (this._numRunning === 0) {
88 this._resolve();
89 }
90 }
91
92 async _runFn(fn) {
93 this._numRunning++; // console.log('RUNNING', this._numRunning, this._maxConcurrent, this._queue.length)
94
95 try {
96 await fn();
97 this._numRunning--;
98 } catch (e) {
99 this._reject(e); // rejecting resets state so numRunning is reset to 0 here
100
101 }
102 }
103
104 _resetState() {
105 this._queue = [];
106 this._count = 0;
107 this._results = [];
108 this._runPromise = null;
109 this._numRunning = 0;
110 this._deferred = null;
111 }
112
113 _reject(err) {
114 if (this._deferred != null) {
115 this._deferred.reject(err);
116 }
117
118 this._resetState();
119 }
120
121 _resolve() {
122 if (this._deferred != null) {
123 this._deferred.resolve(this._results);
124 }
125
126 this._resetState();
127 }
128
129}
130
131exports.default = PromiseQueue;
\No newline at end of file