UNPKG

10.2 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const EventEmitter = require("eventemitter3");
4const p_timeout_1 = require("p-timeout");
5const priority_queue_1 = require("./priority-queue");
6// eslint-disable-next-line @typescript-eslint/no-empty-function
7const empty = () => { };
8const timeoutError = new p_timeout_1.TimeoutError();
9/**
10Promise queue with concurrency control.
11*/
12class PQueue extends EventEmitter {
13 constructor(options) {
14 var _a, _b, _c, _d;
15 super();
16 this._intervalCount = 0;
17 this._intervalEnd = 0;
18 this._pendingCount = 0;
19 this._resolveEmpty = empty;
20 this._resolveIdle = empty;
21 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
22 options = Object.assign({ carryoverConcurrencyCount: false, intervalCap: Infinity, interval: 0, concurrency: Infinity, autoStart: true, queueClass: priority_queue_1.default }, options);
23 if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
24 throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${(_b = (_a = options.intervalCap) === null || _a === void 0 ? void 0 : _a.toString()) !== null && _b !== void 0 ? _b : ''}\` (${typeof options.intervalCap})`);
25 }
26 if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
27 throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${(_d = (_c = options.interval) === null || _c === void 0 ? void 0 : _c.toString()) !== null && _d !== void 0 ? _d : ''}\` (${typeof options.interval})`);
28 }
29 this._carryoverConcurrencyCount = options.carryoverConcurrencyCount;
30 this._isIntervalIgnored = options.intervalCap === Infinity || options.interval === 0;
31 this._intervalCap = options.intervalCap;
32 this._interval = options.interval;
33 this._queue = new options.queueClass();
34 this._queueClass = options.queueClass;
35 this.concurrency = options.concurrency;
36 this._timeout = options.timeout;
37 this._throwOnTimeout = options.throwOnTimeout === true;
38 this._isPaused = options.autoStart === false;
39 }
40 get _doesIntervalAllowAnother() {
41 return this._isIntervalIgnored || this._intervalCount < this._intervalCap;
42 }
43 get _doesConcurrentAllowAnother() {
44 return this._pendingCount < this._concurrency;
45 }
46 _next() {
47 this._pendingCount--;
48 this._tryToStartAnother();
49 this.emit('next');
50 }
51 _resolvePromises() {
52 this._resolveEmpty();
53 this._resolveEmpty = empty;
54 if (this._pendingCount === 0) {
55 this._resolveIdle();
56 this._resolveIdle = empty;
57 this.emit('idle');
58 }
59 }
60 _onResumeInterval() {
61 this._onInterval();
62 this._initializeIntervalIfNeeded();
63 this._timeoutId = undefined;
64 }
65 _isIntervalPaused() {
66 const now = Date.now();
67 if (this._intervalId === undefined) {
68 const delay = this._intervalEnd - now;
69 if (delay < 0) {
70 // Act as the interval was done
71 // We don't need to resume it here because it will be resumed on line 160
72 this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0;
73 }
74 else {
75 // Act as the interval is pending
76 if (this._timeoutId === undefined) {
77 this._timeoutId = setTimeout(() => {
78 this._onResumeInterval();
79 }, delay);
80 }
81 return true;
82 }
83 }
84 return false;
85 }
86 _tryToStartAnother() {
87 if (this._queue.size === 0) {
88 // We can clear the interval ("pause")
89 // Because we can redo it later ("resume")
90 if (this._intervalId) {
91 clearInterval(this._intervalId);
92 }
93 this._intervalId = undefined;
94 this._resolvePromises();
95 return false;
96 }
97 if (!this._isPaused) {
98 const canInitializeInterval = !this._isIntervalPaused();
99 if (this._doesIntervalAllowAnother && this._doesConcurrentAllowAnother) {
100 const job = this._queue.dequeue();
101 if (!job) {
102 return false;
103 }
104 this.emit('active');
105 job();
106 if (canInitializeInterval) {
107 this._initializeIntervalIfNeeded();
108 }
109 return true;
110 }
111 }
112 return false;
113 }
114 _initializeIntervalIfNeeded() {
115 if (this._isIntervalIgnored || this._intervalId !== undefined) {
116 return;
117 }
118 this._intervalId = setInterval(() => {
119 this._onInterval();
120 }, this._interval);
121 this._intervalEnd = Date.now() + this._interval;
122 }
123 _onInterval() {
124 if (this._intervalCount === 0 && this._pendingCount === 0 && this._intervalId) {
125 clearInterval(this._intervalId);
126 this._intervalId = undefined;
127 }
128 this._intervalCount = this._carryoverConcurrencyCount ? this._pendingCount : 0;
129 this._processQueue();
130 }
131 /**
132 Executes all queued functions until it reaches the limit.
133 */
134 _processQueue() {
135 // eslint-disable-next-line no-empty
136 while (this._tryToStartAnother()) { }
137 }
138 get concurrency() {
139 return this._concurrency;
140 }
141 set concurrency(newConcurrency) {
142 if (!(typeof newConcurrency === 'number' && newConcurrency >= 1)) {
143 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`);
144 }
145 this._concurrency = newConcurrency;
146 this._processQueue();
147 }
148 /**
149 Adds a sync or async task to the queue. Always returns a promise.
150 */
151 async add(fn, options = {}) {
152 return new Promise((resolve, reject) => {
153 const run = async () => {
154 this._pendingCount++;
155 this._intervalCount++;
156 try {
157 const operation = (this._timeout === undefined && options.timeout === undefined) ? fn() : p_timeout_1.default(Promise.resolve(fn()), (options.timeout === undefined ? this._timeout : options.timeout), () => {
158 if (options.throwOnTimeout === undefined ? this._throwOnTimeout : options.throwOnTimeout) {
159 reject(timeoutError);
160 }
161 return undefined;
162 });
163 resolve(await operation);
164 }
165 catch (error) {
166 reject(error);
167 }
168 this._next();
169 };
170 this._queue.enqueue(run, options);
171 this._tryToStartAnother();
172 this.emit('add');
173 });
174 }
175 /**
176 Same as `.add()`, but accepts an array of sync or async functions.
177
178 @returns A promise that resolves when all functions are resolved.
179 */
180 async addAll(functions, options) {
181 return Promise.all(functions.map(async (function_) => this.add(function_, options)));
182 }
183 /**
184 Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
185 */
186 start() {
187 if (!this._isPaused) {
188 return this;
189 }
190 this._isPaused = false;
191 this._processQueue();
192 return this;
193 }
194 /**
195 Put queue execution on hold.
196 */
197 pause() {
198 this._isPaused = true;
199 }
200 /**
201 Clear the queue.
202 */
203 clear() {
204 this._queue = new this._queueClass();
205 }
206 /**
207 Can be called multiple times. Useful if you for example add additional items at a later time.
208
209 @returns A promise that settles when the queue becomes empty.
210 */
211 async onEmpty() {
212 // Instantly resolve if the queue is empty
213 if (this._queue.size === 0) {
214 return;
215 }
216 return new Promise(resolve => {
217 const existingResolve = this._resolveEmpty;
218 this._resolveEmpty = () => {
219 existingResolve();
220 resolve();
221 };
222 });
223 }
224 /**
225 The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
226
227 @returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
228 */
229 async onIdle() {
230 // Instantly resolve if none pending and if nothing else is queued
231 if (this._pendingCount === 0 && this._queue.size === 0) {
232 return;
233 }
234 return new Promise(resolve => {
235 const existingResolve = this._resolveIdle;
236 this._resolveIdle = () => {
237 existingResolve();
238 resolve();
239 };
240 });
241 }
242 /**
243 Size of the queue.
244 */
245 get size() {
246 return this._queue.size;
247 }
248 /**
249 Size of the queue, filtered by the given options.
250
251 For example, this can be used to find the number of items remaining in the queue with a specific priority level.
252 */
253 sizeBy(options) {
254 // eslint-disable-next-line unicorn/no-fn-reference-in-iterator
255 return this._queue.filter(options).length;
256 }
257 /**
258 Number of pending promises.
259 */
260 get pending() {
261 return this._pendingCount;
262 }
263 /**
264 Whether the queue is currently paused.
265 */
266 get isPaused() {
267 return this._isPaused;
268 }
269 get timeout() {
270 return this._timeout;
271 }
272 /**
273 Set the timeout for future operations.
274 */
275 set timeout(milliseconds) {
276 this._timeout = milliseconds;
277 }
278}
279exports.default = PQueue;