UNPKG

12.7 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const EventEmitter = require("eventemitter3");
4const p_timeout_1 = require("p-timeout");
5const priority_queue_1 = require("./priority-queue");
6const empty = () => { };
7const timeoutError = new p_timeout_1.TimeoutError();
8/**
9Promise queue with concurrency control.
10*/
11class PQueue extends EventEmitter {
12 constructor(options) {
13 super();
14 Object.defineProperty(this, "_carryoverConcurrencyCount", {
15 enumerable: true,
16 configurable: true,
17 writable: true,
18 value: void 0
19 });
20 Object.defineProperty(this, "_isIntervalIgnored", {
21 enumerable: true,
22 configurable: true,
23 writable: true,
24 value: void 0
25 });
26 Object.defineProperty(this, "_intervalCount", {
27 enumerable: true,
28 configurable: true,
29 writable: true,
30 value: 0
31 });
32 Object.defineProperty(this, "_intervalCap", {
33 enumerable: true,
34 configurable: true,
35 writable: true,
36 value: void 0
37 });
38 Object.defineProperty(this, "_interval", {
39 enumerable: true,
40 configurable: true,
41 writable: true,
42 value: void 0
43 });
44 Object.defineProperty(this, "_intervalEnd", {
45 enumerable: true,
46 configurable: true,
47 writable: true,
48 value: 0
49 });
50 Object.defineProperty(this, "_intervalId", {
51 enumerable: true,
52 configurable: true,
53 writable: true,
54 value: void 0
55 });
56 Object.defineProperty(this, "_timeoutId", {
57 enumerable: true,
58 configurable: true,
59 writable: true,
60 value: void 0
61 });
62 Object.defineProperty(this, "_queue", {
63 enumerable: true,
64 configurable: true,
65 writable: true,
66 value: void 0
67 });
68 Object.defineProperty(this, "_queueClass", {
69 enumerable: true,
70 configurable: true,
71 writable: true,
72 value: void 0
73 });
74 Object.defineProperty(this, "_pendingCount", {
75 enumerable: true,
76 configurable: true,
77 writable: true,
78 value: 0
79 });
80 // The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194
81 Object.defineProperty(this, "_concurrency", {
82 enumerable: true,
83 configurable: true,
84 writable: true,
85 value: void 0
86 });
87 Object.defineProperty(this, "_isPaused", {
88 enumerable: true,
89 configurable: true,
90 writable: true,
91 value: void 0
92 });
93 Object.defineProperty(this, "_resolveEmpty", {
94 enumerable: true,
95 configurable: true,
96 writable: true,
97 value: empty
98 });
99 Object.defineProperty(this, "_resolveIdle", {
100 enumerable: true,
101 configurable: true,
102 writable: true,
103 value: empty
104 });
105 Object.defineProperty(this, "_timeout", {
106 enumerable: true,
107 configurable: true,
108 writable: true,
109 value: void 0
110 });
111 Object.defineProperty(this, "_throwOnTimeout", {
112 enumerable: true,
113 configurable: true,
114 writable: true,
115 value: void 0
116 });
117 // eslint-disable-next-line @typescript-eslint/no-object-literal-type-assertion
118 options = Object.assign({ carryoverConcurrencyCount: false, intervalCap: Infinity, interval: 0, concurrency: Infinity, autoStart: true, queueClass: priority_queue_1.default }, options
119 // TODO: Remove this `as`.
120 );
121 if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
122 throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap}\` (${typeof options.intervalCap})`);
123 }
124 if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
125 throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval}\` (${typeof options.interval})`);
126 }
127 this._carryoverConcurrencyCount = options.carryoverConcurrencyCount;
128 this._isIntervalIgnored = options.intervalCap === Infinity || options.interval === 0;
129 this._intervalCap = options.intervalCap;
130 this._interval = options.interval;
131 this._queue = new options.queueClass();
132 this._queueClass = options.queueClass;
133 this.concurrency = options.concurrency;
134 this._timeout = options.timeout;
135 this._throwOnTimeout = options.throwOnTimeout === true;
136 this._isPaused = options.autoStart === false;
137 }
138 get _doesIntervalAllowAnother() {
139 return this._isIntervalIgnored || this._intervalCount < this._intervalCap;
140 }
141 get _doesConcurrentAllowAnother() {
142 return this._pendingCount < this._concurrency;
143 }
144 _next() {
145 this._pendingCount--;
146 this._tryToStartAnother();
147 }
148 _resolvePromises() {
149 this._resolveEmpty();
150 this._resolveEmpty = empty;
151 if (this._pendingCount === 0) {
152 this._resolveIdle();
153 this._resolveIdle = empty;
154 }
155 }
156 _onResumeInterval() {
157 this._onInterval();
158 this._initializeIntervalIfNeeded();
159 this._timeoutId = undefined;
160 }
161 _isIntervalPaused() {
162 const now = Date.now();
163 if (this._intervalId === undefined) {
164 const delay = this._intervalEnd - now;
165 if (delay < 0) {
166 // Act as the interval was done
167 // We don't need to resume it here because it will be resumed on line 160
168 this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0;
169 }
170 else {
171 // Act as the interval is pending
172 if (this._timeoutId === undefined) {
173 this._timeoutId = setTimeout(() => {
174 this._onResumeInterval();
175 }, delay);
176 }
177 return true;
178 }
179 }
180 return false;
181 }
182 _tryToStartAnother() {
183 if (this._queue.size === 0) {
184 // We can clear the interval ("pause")
185 // Because we can redo it later ("resume")
186 if (this._intervalId) {
187 clearInterval(this._intervalId);
188 }
189 this._intervalId = undefined;
190 this._resolvePromises();
191 return false;
192 }
193 if (!this._isPaused) {
194 const canInitializeInterval = !this._isIntervalPaused();
195 if (this._doesIntervalAllowAnother && this._doesConcurrentAllowAnother) {
196 this.emit('active');
197 this._queue.dequeue()();
198 if (canInitializeInterval) {
199 this._initializeIntervalIfNeeded();
200 }
201 return true;
202 }
203 }
204 return false;
205 }
206 _initializeIntervalIfNeeded() {
207 if (this._isIntervalIgnored || this._intervalId !== undefined) {
208 return;
209 }
210 this._intervalId = setInterval(() => {
211 this._onInterval();
212 }, this._interval);
213 this._intervalEnd = Date.now() + this._interval;
214 }
215 _onInterval() {
216 if (this._intervalCount === 0 && this._pendingCount === 0 && this._intervalId) {
217 clearInterval(this._intervalId);
218 this._intervalId = undefined;
219 }
220 this._intervalCount = this._carryoverConcurrencyCount ? this._pendingCount : 0;
221 this._processQueue();
222 }
223 /**
224 Executes all queued functions until it reaches the limit.
225 */
226 _processQueue() {
227 // eslint-disable-next-line no-empty
228 while (this._tryToStartAnother()) { }
229 }
230 get concurrency() {
231 return this._concurrency;
232 }
233 set concurrency(newConcurrency) {
234 if (!(typeof newConcurrency === 'number' && newConcurrency >= 1)) {
235 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`);
236 }
237 this._concurrency = newConcurrency;
238 this._processQueue();
239 }
240 /**
241 Adds a sync or async task to the queue. Always returns a promise.
242 */
243 async add(fn, options = {}) {
244 return new Promise((resolve, reject) => {
245 const run = async () => {
246 this._pendingCount++;
247 this._intervalCount++;
248 try {
249 const operation = (this._timeout === undefined && options.timeout === undefined) ? fn() : p_timeout_1.default(Promise.resolve(fn()), (options.timeout === undefined ? this._timeout : options.timeout), () => {
250 if (options.throwOnTimeout === undefined ? this._throwOnTimeout : options.throwOnTimeout) {
251 reject(timeoutError);
252 }
253 return undefined;
254 });
255 resolve(await operation);
256 }
257 catch (error) {
258 reject(error);
259 }
260 this._next();
261 };
262 this._queue.enqueue(run, options);
263 this._tryToStartAnother();
264 });
265 }
266 /**
267 Same as `.add()`, but accepts an array of sync or async functions.
268
269 @returns A promise that resolves when all functions are resolved.
270 */
271 async addAll(functions, options) {
272 return Promise.all(functions.map(async (function_) => this.add(function_, options)));
273 }
274 /**
275 Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
276 */
277 start() {
278 if (!this._isPaused) {
279 return this;
280 }
281 this._isPaused = false;
282 this._processQueue();
283 return this;
284 }
285 /**
286 Put queue execution on hold.
287 */
288 pause() {
289 this._isPaused = true;
290 }
291 /**
292 Clear the queue.
293 */
294 clear() {
295 this._queue = new this._queueClass();
296 }
297 /**
298 Can be called multiple times. Useful if you for example add additional items at a later time.
299
300 @returns A promise that settles when the queue becomes empty.
301 */
302 async onEmpty() {
303 // Instantly resolve if the queue is empty
304 if (this._queue.size === 0) {
305 return;
306 }
307 return new Promise(resolve => {
308 const existingResolve = this._resolveEmpty;
309 this._resolveEmpty = () => {
310 existingResolve();
311 resolve();
312 };
313 });
314 }
315 /**
316 The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
317
318 @returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
319 */
320 async onIdle() {
321 // Instantly resolve if none pending and if nothing else is queued
322 if (this._pendingCount === 0 && this._queue.size === 0) {
323 return;
324 }
325 return new Promise(resolve => {
326 const existingResolve = this._resolveIdle;
327 this._resolveIdle = () => {
328 existingResolve();
329 resolve();
330 };
331 });
332 }
333 /**
334 Size of the queue.
335 */
336 get size() {
337 return this._queue.size;
338 }
339 /**
340 Size of the queue, filtered by the given options.
341
342 For example, this can be used to find the number of items remaining in the queue with a specific priority level.
343 */
344 sizeBy(options) {
345 return this._queue.filter(options).length;
346 }
347 /**
348 Number of pending promises.
349 */
350 get pending() {
351 return this._pendingCount;
352 }
353 /**
354 Whether the queue is currently paused.
355 */
356 get isPaused() {
357 return this._isPaused;
358 }
359 /**
360 Set the timeout for future operations.
361 */
362 set timeout(milliseconds) {
363 this._timeout = milliseconds;
364 }
365 get timeout() {
366 return this._timeout;
367 }
368}
369exports.default = PQueue;