UNPKG

2.25 kBJavaScriptView Raw
1"use strict";
2
3var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
4
5var _asyncToGenerator2 = _interopRequireDefault(require("@babel/runtime/helpers/asyncToGenerator"));
6
7class PromiseQueue {
8 constructor(callback, options = {}) {
9 this.process = callback;
10 this.maxConcurrent = options.maxConcurrent || Infinity;
11 this.retry = options.retry !== false;
12 this.queue = [];
13 this.processing = new Set();
14 this.processed = new Set();
15 this.numRunning = 0;
16 this.runPromise = null;
17 this.resolve = null;
18 this.reject = null;
19 }
20
21 add(job, ...args) {
22 if (this.processing.has(job)) {
23 return;
24 }
25
26 if (this.runPromise && this.numRunning < this.maxConcurrent) {
27 this._runJob(job, args);
28 } else {
29 this.queue.push([job, args]);
30 }
31
32 this.processing.add(job);
33 }
34
35 run() {
36 if (this.runPromise) {
37 return this.runPromise;
38 }
39
40 const runPromise = new Promise((resolve, reject) => {
41 this.resolve = resolve;
42 this.reject = reject;
43 });
44 this.runPromise = runPromise;
45
46 this._next();
47
48 return runPromise;
49 }
50
51 _runJob(job, args) {
52 var _this = this;
53
54 return (0, _asyncToGenerator2.default)(function* () {
55 try {
56 _this.numRunning++;
57 yield _this.process(job, ...args);
58
59 _this.processing.delete(job);
60
61 _this.processed.add(job);
62
63 _this.numRunning--;
64
65 _this._next();
66 } catch (err) {
67 _this.numRunning--;
68
69 if (_this.retry) {
70 _this.queue.push([job, args]);
71 } else {
72 _this.processing.delete(job);
73 }
74
75 if (_this.reject) {
76 _this.reject(err);
77 }
78
79 _this._reset();
80 }
81 })();
82 }
83
84 _next() {
85 if (!this.runPromise) {
86 return;
87 }
88
89 if (this.queue.length > 0) {
90 while (this.queue.length > 0 && this.numRunning < this.maxConcurrent) {
91 this._runJob(...this.queue.shift());
92 }
93 } else if (this.processing.size === 0) {
94 this.resolve(this.processed);
95
96 this._reset();
97 }
98 }
99
100 _reset() {
101 this.processed = new Set();
102 this.runPromise = null;
103 this.resolve = null;
104 this.reject = null;
105 }
106
107}
108
109module.exports = PromiseQueue;
\No newline at end of file