UNPKG

18.7 kBJavaScriptView Raw
1var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
2 if (kind === "m") throw new TypeError("Private method is not writable");
3 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
4 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
5 return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
6};
7var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
8 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
9 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
10 return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
11};
12var _PQueue_instances, _PQueue_carryoverConcurrencyCount, _PQueue_isIntervalIgnored, _PQueue_intervalCount, _PQueue_intervalCap, _PQueue_interval, _PQueue_intervalEnd, _PQueue_intervalId, _PQueue_timeoutId, _PQueue_queue, _PQueue_queueClass, _PQueue_pending, _PQueue_concurrency, _PQueue_isPaused, _PQueue_throwOnTimeout, _PQueue_doesIntervalAllowAnother_get, _PQueue_doesConcurrentAllowAnother_get, _PQueue_next, _PQueue_onResumeInterval, _PQueue_isIntervalPaused_get, _PQueue_tryToStartAnother, _PQueue_initializeIntervalIfNeeded, _PQueue_onInterval, _PQueue_processQueue, _PQueue_throwOnAbort, _PQueue_onEvent;
13import { EventEmitter } from 'eventemitter3';
14import pTimeout, { TimeoutError } from 'p-timeout';
15import PriorityQueue from './priority-queue.js';
16/**
17The error thrown by `queue.add()` when a job is aborted before it is run. See `signal`.
18*/
19export class AbortError extends Error {
20}
21/**
22Promise queue with concurrency control.
23*/
24class PQueue extends EventEmitter {
25 // TODO: The `throwOnTimeout` option should affect the return types of `add()` and `addAll()`
26 constructor(options) {
27 var _a, _b, _c, _d;
28 super();
29 _PQueue_instances.add(this);
30 _PQueue_carryoverConcurrencyCount.set(this, void 0);
31 _PQueue_isIntervalIgnored.set(this, void 0);
32 _PQueue_intervalCount.set(this, 0);
33 _PQueue_intervalCap.set(this, void 0);
34 _PQueue_interval.set(this, void 0);
35 _PQueue_intervalEnd.set(this, 0);
36 _PQueue_intervalId.set(this, void 0);
37 _PQueue_timeoutId.set(this, void 0);
38 _PQueue_queue.set(this, void 0);
39 _PQueue_queueClass.set(this, void 0);
40 _PQueue_pending.set(this, 0);
41 // The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194
42 _PQueue_concurrency.set(this, void 0);
43 _PQueue_isPaused.set(this, void 0);
44 _PQueue_throwOnTimeout.set(this, void 0);
45 /**
46 Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
47
48 Applies to each future operation.
49 */
50 Object.defineProperty(this, "timeout", {
51 enumerable: true,
52 configurable: true,
53 writable: true,
54 value: void 0
55 });
56 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
57 options = {
58 carryoverConcurrencyCount: false,
59 intervalCap: Number.POSITIVE_INFINITY,
60 interval: 0,
61 concurrency: Number.POSITIVE_INFINITY,
62 autoStart: true,
63 queueClass: PriorityQueue,
64 ...options,
65 };
66 if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
67 throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${(_b = (_a = options.intervalCap) === null || _a === void 0 ? void 0 : _a.toString()) !== null && _b !== void 0 ? _b : ''}\` (${typeof options.intervalCap})`);
68 }
69 if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
70 throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${(_d = (_c = options.interval) === null || _c === void 0 ? void 0 : _c.toString()) !== null && _d !== void 0 ? _d : ''}\` (${typeof options.interval})`);
71 }
72 __classPrivateFieldSet(this, _PQueue_carryoverConcurrencyCount, options.carryoverConcurrencyCount, "f");
73 __classPrivateFieldSet(this, _PQueue_isIntervalIgnored, options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0, "f");
74 __classPrivateFieldSet(this, _PQueue_intervalCap, options.intervalCap, "f");
75 __classPrivateFieldSet(this, _PQueue_interval, options.interval, "f");
76 __classPrivateFieldSet(this, _PQueue_queue, new options.queueClass(), "f");
77 __classPrivateFieldSet(this, _PQueue_queueClass, options.queueClass, "f");
78 this.concurrency = options.concurrency;
79 this.timeout = options.timeout;
80 __classPrivateFieldSet(this, _PQueue_throwOnTimeout, options.throwOnTimeout === true, "f");
81 __classPrivateFieldSet(this, _PQueue_isPaused, options.autoStart === false, "f");
82 }
83 get concurrency() {
84 return __classPrivateFieldGet(this, _PQueue_concurrency, "f");
85 }
86 set concurrency(newConcurrency) {
87 if (!(typeof newConcurrency === 'number' && newConcurrency >= 1)) {
88 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`);
89 }
90 __classPrivateFieldSet(this, _PQueue_concurrency, newConcurrency, "f");
91 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_processQueue).call(this);
92 }
93 async add(function_, options = {}) {
94 options = {
95 timeout: this.timeout,
96 throwOnTimeout: __classPrivateFieldGet(this, _PQueue_throwOnTimeout, "f"),
97 ...options,
98 };
99 return new Promise((resolve, reject) => {
100 __classPrivateFieldGet(this, _PQueue_queue, "f").enqueue(async () => {
101 var _a;
102 var _b, _c;
103 __classPrivateFieldSet(this, _PQueue_pending, (_b = __classPrivateFieldGet(this, _PQueue_pending, "f"), _b++, _b), "f");
104 __classPrivateFieldSet(this, _PQueue_intervalCount, (_c = __classPrivateFieldGet(this, _PQueue_intervalCount, "f"), _c++, _c), "f");
105 try {
106 // TODO: Use options.signal?.throwIfAborted() when targeting Node.js 18
107 if ((_a = options.signal) === null || _a === void 0 ? void 0 : _a.aborted) {
108 // TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err)
109 throw new AbortError('The task was aborted.');
110 }
111 let operation = function_({ signal: options.signal });
112 if (options.timeout) {
113 operation = pTimeout(Promise.resolve(operation), options.timeout);
114 }
115 if (options.signal) {
116 operation = Promise.race([operation, __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_throwOnAbort).call(this, options.signal)]);
117 }
118 const result = await operation;
119 resolve(result);
120 this.emit('completed', result);
121 }
122 catch (error) {
123 if (error instanceof TimeoutError && !options.throwOnTimeout) {
124 resolve();
125 return;
126 }
127 reject(error);
128 this.emit('error', error);
129 }
130 finally {
131 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_next).call(this);
132 }
133 }, options);
134 this.emit('add');
135 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_tryToStartAnother).call(this);
136 });
137 }
138 async addAll(functions, options) {
139 return Promise.all(functions.map(async (function_) => this.add(function_, options)));
140 }
141 /**
142 Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
143 */
144 start() {
145 if (!__classPrivateFieldGet(this, _PQueue_isPaused, "f")) {
146 return this;
147 }
148 __classPrivateFieldSet(this, _PQueue_isPaused, false, "f");
149 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_processQueue).call(this);
150 return this;
151 }
152 /**
153 Put queue execution on hold.
154 */
155 pause() {
156 __classPrivateFieldSet(this, _PQueue_isPaused, true, "f");
157 }
158 /**
159 Clear the queue.
160 */
161 clear() {
162 __classPrivateFieldSet(this, _PQueue_queue, new (__classPrivateFieldGet(this, _PQueue_queueClass, "f"))(), "f");
163 }
164 /**
165 Can be called multiple times. Useful if you for example add additional items at a later time.
166
167 @returns A promise that settles when the queue becomes empty.
168 */
169 async onEmpty() {
170 // Instantly resolve if the queue is empty
171 if (__classPrivateFieldGet(this, _PQueue_queue, "f").size === 0) {
172 return;
173 }
174 await __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onEvent).call(this, 'empty');
175 }
176 /**
177 @returns A promise that settles when the queue size is less than the given limit: `queue.size < limit`.
178
179 If you want to avoid having the queue grow beyond a certain size you can `await queue.onSizeLessThan()` before adding a new item.
180
181 Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
182 */
183 async onSizeLessThan(limit) {
184 // Instantly resolve if the queue is empty.
185 if (__classPrivateFieldGet(this, _PQueue_queue, "f").size < limit) {
186 return;
187 }
188 await __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onEvent).call(this, 'next', () => __classPrivateFieldGet(this, _PQueue_queue, "f").size < limit);
189 }
190 /**
191 The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
192
193 @returns A promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
194 */
195 async onIdle() {
196 // Instantly resolve if none pending and if nothing else is queued
197 if (__classPrivateFieldGet(this, _PQueue_pending, "f") === 0 && __classPrivateFieldGet(this, _PQueue_queue, "f").size === 0) {
198 return;
199 }
200 await __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onEvent).call(this, 'idle');
201 }
202 /**
203 Size of the queue, the number of queued items waiting to run.
204 */
205 get size() {
206 return __classPrivateFieldGet(this, _PQueue_queue, "f").size;
207 }
208 /**
209 Size of the queue, filtered by the given options.
210
211 For example, this can be used to find the number of items remaining in the queue with a specific priority level.
212 */
213 sizeBy(options) {
214 // eslint-disable-next-line unicorn/no-array-callback-reference
215 return __classPrivateFieldGet(this, _PQueue_queue, "f").filter(options).length;
216 }
217 /**
218 Number of running items (no longer in the queue).
219 */
220 get pending() {
221 return __classPrivateFieldGet(this, _PQueue_pending, "f");
222 }
223 /**
224 Whether the queue is currently paused.
225 */
226 get isPaused() {
227 return __classPrivateFieldGet(this, _PQueue_isPaused, "f");
228 }
229}
230_PQueue_carryoverConcurrencyCount = new WeakMap(), _PQueue_isIntervalIgnored = new WeakMap(), _PQueue_intervalCount = new WeakMap(), _PQueue_intervalCap = new WeakMap(), _PQueue_interval = new WeakMap(), _PQueue_intervalEnd = new WeakMap(), _PQueue_intervalId = new WeakMap(), _PQueue_timeoutId = new WeakMap(), _PQueue_queue = new WeakMap(), _PQueue_queueClass = new WeakMap(), _PQueue_pending = new WeakMap(), _PQueue_concurrency = new WeakMap(), _PQueue_isPaused = new WeakMap(), _PQueue_throwOnTimeout = new WeakMap(), _PQueue_instances = new WeakSet(), _PQueue_doesIntervalAllowAnother_get = function _PQueue_doesIntervalAllowAnother_get() {
231 return __classPrivateFieldGet(this, _PQueue_isIntervalIgnored, "f") || __classPrivateFieldGet(this, _PQueue_intervalCount, "f") < __classPrivateFieldGet(this, _PQueue_intervalCap, "f");
232}, _PQueue_doesConcurrentAllowAnother_get = function _PQueue_doesConcurrentAllowAnother_get() {
233 return __classPrivateFieldGet(this, _PQueue_pending, "f") < __classPrivateFieldGet(this, _PQueue_concurrency, "f");
234}, _PQueue_next = function _PQueue_next() {
235 var _a;
236 __classPrivateFieldSet(this, _PQueue_pending, (_a = __classPrivateFieldGet(this, _PQueue_pending, "f"), _a--, _a), "f");
237 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_tryToStartAnother).call(this);
238 this.emit('next');
239}, _PQueue_onResumeInterval = function _PQueue_onResumeInterval() {
240 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onInterval).call(this);
241 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_initializeIntervalIfNeeded).call(this);
242 __classPrivateFieldSet(this, _PQueue_timeoutId, undefined, "f");
243}, _PQueue_isIntervalPaused_get = function _PQueue_isIntervalPaused_get() {
244 const now = Date.now();
245 if (__classPrivateFieldGet(this, _PQueue_intervalId, "f") === undefined) {
246 const delay = __classPrivateFieldGet(this, _PQueue_intervalEnd, "f") - now;
247 if (delay < 0) {
248 // Act as the interval was done
249 // We don't need to resume it here because it will be resumed on line 160
250 __classPrivateFieldSet(this, _PQueue_intervalCount, (__classPrivateFieldGet(this, _PQueue_carryoverConcurrencyCount, "f")) ? __classPrivateFieldGet(this, _PQueue_pending, "f") : 0, "f");
251 }
252 else {
253 // Act as the interval is pending
254 if (__classPrivateFieldGet(this, _PQueue_timeoutId, "f") === undefined) {
255 __classPrivateFieldSet(this, _PQueue_timeoutId, setTimeout(() => {
256 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onResumeInterval).call(this);
257 }, delay), "f");
258 }
259 return true;
260 }
261 }
262 return false;
263}, _PQueue_tryToStartAnother = function _PQueue_tryToStartAnother() {
264 if (__classPrivateFieldGet(this, _PQueue_queue, "f").size === 0) {
265 // We can clear the interval ("pause")
266 // Because we can redo it later ("resume")
267 if (__classPrivateFieldGet(this, _PQueue_intervalId, "f")) {
268 clearInterval(__classPrivateFieldGet(this, _PQueue_intervalId, "f"));
269 }
270 __classPrivateFieldSet(this, _PQueue_intervalId, undefined, "f");
271 this.emit('empty');
272 if (__classPrivateFieldGet(this, _PQueue_pending, "f") === 0) {
273 this.emit('idle');
274 }
275 return false;
276 }
277 if (!__classPrivateFieldGet(this, _PQueue_isPaused, "f")) {
278 const canInitializeInterval = !__classPrivateFieldGet(this, _PQueue_instances, "a", _PQueue_isIntervalPaused_get);
279 if (__classPrivateFieldGet(this, _PQueue_instances, "a", _PQueue_doesIntervalAllowAnother_get) && __classPrivateFieldGet(this, _PQueue_instances, "a", _PQueue_doesConcurrentAllowAnother_get)) {
280 const job = __classPrivateFieldGet(this, _PQueue_queue, "f").dequeue();
281 if (!job) {
282 return false;
283 }
284 this.emit('active');
285 job();
286 if (canInitializeInterval) {
287 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_initializeIntervalIfNeeded).call(this);
288 }
289 return true;
290 }
291 }
292 return false;
293}, _PQueue_initializeIntervalIfNeeded = function _PQueue_initializeIntervalIfNeeded() {
294 if (__classPrivateFieldGet(this, _PQueue_isIntervalIgnored, "f") || __classPrivateFieldGet(this, _PQueue_intervalId, "f") !== undefined) {
295 return;
296 }
297 __classPrivateFieldSet(this, _PQueue_intervalId, setInterval(() => {
298 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_onInterval).call(this);
299 }, __classPrivateFieldGet(this, _PQueue_interval, "f")), "f");
300 __classPrivateFieldSet(this, _PQueue_intervalEnd, Date.now() + __classPrivateFieldGet(this, _PQueue_interval, "f"), "f");
301}, _PQueue_onInterval = function _PQueue_onInterval() {
302 if (__classPrivateFieldGet(this, _PQueue_intervalCount, "f") === 0 && __classPrivateFieldGet(this, _PQueue_pending, "f") === 0 && __classPrivateFieldGet(this, _PQueue_intervalId, "f")) {
303 clearInterval(__classPrivateFieldGet(this, _PQueue_intervalId, "f"));
304 __classPrivateFieldSet(this, _PQueue_intervalId, undefined, "f");
305 }
306 __classPrivateFieldSet(this, _PQueue_intervalCount, __classPrivateFieldGet(this, _PQueue_carryoverConcurrencyCount, "f") ? __classPrivateFieldGet(this, _PQueue_pending, "f") : 0, "f");
307 __classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_processQueue).call(this);
308}, _PQueue_processQueue = function _PQueue_processQueue() {
309 // eslint-disable-next-line no-empty
310 while (__classPrivateFieldGet(this, _PQueue_instances, "m", _PQueue_tryToStartAnother).call(this)) { }
311}, _PQueue_throwOnAbort = async function _PQueue_throwOnAbort(signal) {
312 return new Promise((_resolve, reject) => {
313 signal.addEventListener('abort', () => {
314 // TODO: Reject with signal.throwIfAborted() when targeting Node.js 18
315 // TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err)
316 reject(new AbortError('The task was aborted.'));
317 }, { once: true });
318 });
319}, _PQueue_onEvent = async function _PQueue_onEvent(event, filter) {
320 return new Promise(resolve => {
321 const listener = () => {
322 if (filter && !filter()) {
323 return;
324 }
325 this.off(event, listener);
326 resolve();
327 };
328 this.on(event, listener);
329 });
330};
331export default PQueue;