UNPKG

14.3 kBJavaScriptView Raw
1import EventEmitter from 'eventemitter3';
2import pTimeout, { TimeoutError } from 'p-timeout';
3import PriorityQueue from './priority-queue.js';
4// eslint-disable-next-line @typescript-eslint/no-empty-function
5const empty = () => { };
6const timeoutError = new TimeoutError();
7/**
8Promise queue with concurrency control.
9*/
10export default class PQueue extends EventEmitter {
11 constructor(options) {
12 var _a, _b, _c, _d;
13 super();
14 Object.defineProperty(this, "_carryoverConcurrencyCount", {
15 enumerable: true,
16 configurable: true,
17 writable: true,
18 value: void 0
19 });
20 Object.defineProperty(this, "_isIntervalIgnored", {
21 enumerable: true,
22 configurable: true,
23 writable: true,
24 value: void 0
25 });
26 Object.defineProperty(this, "_intervalCount", {
27 enumerable: true,
28 configurable: true,
29 writable: true,
30 value: 0
31 });
32 Object.defineProperty(this, "_intervalCap", {
33 enumerable: true,
34 configurable: true,
35 writable: true,
36 value: void 0
37 });
38 Object.defineProperty(this, "_interval", {
39 enumerable: true,
40 configurable: true,
41 writable: true,
42 value: void 0
43 });
44 Object.defineProperty(this, "_intervalEnd", {
45 enumerable: true,
46 configurable: true,
47 writable: true,
48 value: 0
49 });
50 Object.defineProperty(this, "_intervalId", {
51 enumerable: true,
52 configurable: true,
53 writable: true,
54 value: void 0
55 });
56 Object.defineProperty(this, "_timeoutId", {
57 enumerable: true,
58 configurable: true,
59 writable: true,
60 value: void 0
61 });
62 Object.defineProperty(this, "_queue", {
63 enumerable: true,
64 configurable: true,
65 writable: true,
66 value: void 0
67 });
68 Object.defineProperty(this, "_queueClass", {
69 enumerable: true,
70 configurable: true,
71 writable: true,
72 value: void 0
73 });
74 Object.defineProperty(this, "_pendingCount", {
75 enumerable: true,
76 configurable: true,
77 writable: true,
78 value: 0
79 });
80 // The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194
81 Object.defineProperty(this, "_concurrency", {
82 enumerable: true,
83 configurable: true,
84 writable: true,
85 value: void 0
86 });
87 Object.defineProperty(this, "_isPaused", {
88 enumerable: true,
89 configurable: true,
90 writable: true,
91 value: void 0
92 });
93 Object.defineProperty(this, "_resolveEmpty", {
94 enumerable: true,
95 configurable: true,
96 writable: true,
97 value: empty
98 });
99 Object.defineProperty(this, "_resolveIdle", {
100 enumerable: true,
101 configurable: true,
102 writable: true,
103 value: empty
104 });
105 Object.defineProperty(this, "_timeout", {
106 enumerable: true,
107 configurable: true,
108 writable: true,
109 value: void 0
110 });
111 Object.defineProperty(this, "_throwOnTimeout", {
112 enumerable: true,
113 configurable: true,
114 writable: true,
115 value: void 0
116 });
117 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
118 options = {
119 carryoverConcurrencyCount: false,
120 intervalCap: Number.POSITIVE_INFINITY,
121 interval: 0,
122 concurrency: Number.POSITIVE_INFINITY,
123 autoStart: true,
124 queueClass: PriorityQueue,
125 ...options
126 };
127 if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
128 throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${(_b = (_a = options.intervalCap) === null || _a === void 0 ? void 0 : _a.toString()) !== null && _b !== void 0 ? _b : ''}\` (${typeof options.intervalCap})`);
129 }
130 if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
131 throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${(_d = (_c = options.interval) === null || _c === void 0 ? void 0 : _c.toString()) !== null && _d !== void 0 ? _d : ''}\` (${typeof options.interval})`);
132 }
133 this._carryoverConcurrencyCount = options.carryoverConcurrencyCount;
134 this._isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0;
135 this._intervalCap = options.intervalCap;
136 this._interval = options.interval;
137 this._queue = new options.queueClass();
138 this._queueClass = options.queueClass;
139 this.concurrency = options.concurrency;
140 this._timeout = options.timeout;
141 this._throwOnTimeout = options.throwOnTimeout === true;
142 this._isPaused = options.autoStart === false;
143 }
144 get _doesIntervalAllowAnother() {
145 return this._isIntervalIgnored || this._intervalCount < this._intervalCap;
146 }
147 get _doesConcurrentAllowAnother() {
148 return this._pendingCount < this._concurrency;
149 }
150 _next() {
151 this._pendingCount--;
152 this._tryToStartAnother();
153 this.emit('next');
154 }
155 _resolvePromises() {
156 this._resolveEmpty();
157 this._resolveEmpty = empty;
158 if (this._pendingCount === 0) {
159 this._resolveIdle();
160 this._resolveIdle = empty;
161 this.emit('idle');
162 }
163 }
164 _onResumeInterval() {
165 this._onInterval();
166 this._initializeIntervalIfNeeded();
167 this._timeoutId = undefined;
168 }
169 _isIntervalPaused() {
170 const now = Date.now();
171 if (this._intervalId === undefined) {
172 const delay = this._intervalEnd - now;
173 if (delay < 0) {
174 // Act as the interval was done
175 // We don't need to resume it here because it will be resumed on line 160
176 this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0;
177 }
178 else {
179 // Act as the interval is pending
180 if (this._timeoutId === undefined) {
181 this._timeoutId = setTimeout(() => {
182 this._onResumeInterval();
183 }, delay);
184 }
185 return true;
186 }
187 }
188 return false;
189 }
190 _tryToStartAnother() {
191 if (this._queue.size === 0) {
192 // We can clear the interval ("pause")
193 // Because we can redo it later ("resume")
194 if (this._intervalId) {
195 clearInterval(this._intervalId);
196 }
197 this._intervalId = undefined;
198 this._resolvePromises();
199 return false;
200 }
201 if (!this._isPaused) {
202 const canInitializeInterval = !this._isIntervalPaused();
203 if (this._doesIntervalAllowAnother && this._doesConcurrentAllowAnother) {
204 const job = this._queue.dequeue();
205 if (!job) {
206 return false;
207 }
208 this.emit('active');
209 job();
210 if (canInitializeInterval) {
211 this._initializeIntervalIfNeeded();
212 }
213 return true;
214 }
215 }
216 return false;
217 }
218 _initializeIntervalIfNeeded() {
219 if (this._isIntervalIgnored || this._intervalId !== undefined) {
220 return;
221 }
222 this._intervalId = setInterval(() => {
223 this._onInterval();
224 }, this._interval);
225 this._intervalEnd = Date.now() + this._interval;
226 }
227 _onInterval() {
228 if (this._intervalCount === 0 && this._pendingCount === 0 && this._intervalId) {
229 clearInterval(this._intervalId);
230 this._intervalId = undefined;
231 }
232 this._intervalCount = this._carryoverConcurrencyCount ? this._pendingCount : 0;
233 this._processQueue();
234 }
235 /**
236 Executes all queued functions until it reaches the limit.
237 */
238 _processQueue() {
239 // eslint-disable-next-line no-empty
240 while (this._tryToStartAnother()) { }
241 }
242 get concurrency() {
243 return this._concurrency;
244 }
245 set concurrency(newConcurrency) {
246 if (!(typeof newConcurrency === 'number' && newConcurrency >= 1)) {
247 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`);
248 }
249 this._concurrency = newConcurrency;
250 this._processQueue();
251 }
252 /**
253 Adds a sync or async task to the queue. Always returns a promise.
254 */
255 async add(fn, options = {}) {
256 return new Promise((resolve, reject) => {
257 const run = async () => {
258 this._pendingCount++;
259 this._intervalCount++;
260 try {
261 const operation = (this._timeout === undefined && options.timeout === undefined) ? fn() : pTimeout(Promise.resolve(fn()), (options.timeout === undefined ? this._timeout : options.timeout), () => {
262 if (options.throwOnTimeout === undefined ? this._throwOnTimeout : options.throwOnTimeout) {
263 reject(timeoutError);
264 }
265 return undefined;
266 });
267 const result = await operation;
268 resolve(result);
269 this.emit('completed', result);
270 }
271 catch (error) {
272 reject(error);
273 this.emit('error', error);
274 }
275 this._next();
276 };
277 this._queue.enqueue(run, options);
278 this._tryToStartAnother();
279 this.emit('add');
280 });
281 }
282 /**
283 Same as `.add()`, but accepts an array of sync or async functions.
284
285 @returns A promise that resolves when all functions are resolved.
286 */
287 async addAll(functions, options) {
288 return Promise.all(functions.map(async (function_) => this.add(function_, options)));
289 }
290 /**
291 Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
292 */
293 start() {
294 if (!this._isPaused) {
295 return this;
296 }
297 this._isPaused = false;
298 this._processQueue();
299 return this;
300 }
301 /**
302 Put queue execution on hold.
303 */
304 pause() {
305 this._isPaused = true;
306 }
307 /**
308 Clear the queue.
309 */
310 clear() {
311 this._queue = new this._queueClass();
312 }
313 /**
314 Can be called multiple times. Useful if you for example add additional items at a later time.
315
316 @returns A promise that settles when the queue becomes empty.
317 */
318 async onEmpty() {
319 // Instantly resolve if the queue is empty
320 if (this._queue.size === 0) {
321 return;
322 }
323 return new Promise(resolve => {
324 const existingResolve = this._resolveEmpty;
325 this._resolveEmpty = () => {
326 existingResolve();
327 resolve();
328 };
329 });
330 }
331 /**
332 @returns A promise that settles when the queue size is less than the given limit: `queue.size < limit`.
333
334 If you want to avoid having the queue grow beyond a certain size you can `await queue.onSizeLessThan()` before adding a new item.
335
336 Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
337 */
338 async onSizeLessThan(limit) {
339 // Instantly resolve if the queue is empty.
340 if (this._queue.size < limit) {
341 return;
342 }
343 return new Promise(resolve => {
344 const listener = () => {
345 if (this._queue.size < limit) {
346 this.removeListener('next', listener);
347 resolve();
348 }
349 };
350 this.on('next', listener);
351 });
352 }
353 /**
354 The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
355
356 @returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
357 */
358 async onIdle() {
359 // Instantly resolve if none pending and if nothing else is queued
360 if (this._pendingCount === 0 && this._queue.size === 0) {
361 return;
362 }
363 return new Promise(resolve => {
364 const existingResolve = this._resolveIdle;
365 this._resolveIdle = () => {
366 existingResolve();
367 resolve();
368 };
369 });
370 }
371 /**
372 Size of the queue, the number of queued items waiting to run.
373 */
374 get size() {
375 return this._queue.size;
376 }
377 /**
378 Size of the queue, filtered by the given options.
379
380 For example, this can be used to find the number of items remaining in the queue with a specific priority level.
381 */
382 sizeBy(options) {
383 // eslint-disable-next-line unicorn/no-array-callback-reference
384 return this._queue.filter(options).length;
385 }
386 /**
387 Number of running items (no longer in the queue).
388 */
389 get pending() {
390 return this._pendingCount;
391 }
392 /**
393 Whether the queue is currently paused.
394 */
395 get isPaused() {
396 return this._isPaused;
397 }
398 get timeout() {
399 return this._timeout;
400 }
401 /**
402 Set the timeout for future operations.
403 */
404 set timeout(milliseconds) {
405 this._timeout = milliseconds;
406 }
407}