UNPKG

12.9 kBMarkdownView Raw
1# p-queue
2
3> Promise queue with concurrency control
4
5Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
6
7For servers, you probably want a Redis-backed [job queue](https://github.com/sindresorhus/awesome-nodejs#job-queues) instead.
8
9Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.
10
11## Install
12
13```sh
14npm install p-queue
15```
16
17**Warning:** This package is native [ESM](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Modules) and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to [convert to ESM](https://gist.github.com/sindresorhus/a39789f98801d908bbc7ff3ecc99d99c) or use the [dynamic `import()`](https://v8.dev/features/dynamic-import) function. Please don't open issues for questions regarding CommonJS / ESM.
18
19## Usage
20
21Here we run only one promise at the time. For example, set `concurrency` to 4 to run four promises at the same time.
22
23```js
24import PQueue from 'p-queue';
25import got from 'got';
26
27const queue = new PQueue({concurrency: 1});
28
29(async () => {
30 await queue.add(() => got('https://sindresorhus.com'));
31 console.log('Done: sindresorhus.com');
32})();
33
34(async () => {
35 await queue.add(() => got('https://avajs.dev'));
36 console.log('Done: avajs.dev');
37})();
38
39(async () => {
40 const task = await getUnicornTask();
41 await queue.add(task);
42 console.log('Done: Unicorn task');
43})();
44```
45
46## API
47
48### PQueue(options?)
49
50Returns a new `queue` instance, which is an [`EventEmitter3`](https://github.com/primus/eventemitter3) subclass.
51
52#### options
53
54Type: `object`
55
56##### concurrency
57
58Type: `number`\
59Default: `Infinity`\
60Minimum: `1`
61
62Concurrency limit.
63
64##### timeout
65
66Type: `number`
67
68Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
69
70##### throwOnTimeout
71
72Type: `boolean`\
73Default: `false`
74
75Whether or not a timeout is considered an exception.
76
77##### autoStart
78
79Type: `boolean`\
80Default: `true`
81
82Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
83
84##### queueClass
85
86Type: `Function`
87
88Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](#custom-queueclass) section.
89
90##### intervalCap
91
92Type: `number`\
93Default: `Infinity`\
94Minimum: `1`
95
96The max number of runs in the given interval of time.
97
98##### interval
99
100Type: `number`\
101Default: `0`\
102Minimum: `0`
103
104The length of time in milliseconds before the interval count resets. Must be finite.
105
106##### carryoverConcurrencyCount
107
108Type: `boolean`\
109Default: `false`
110
111If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) Promises, should be carried over into the next interval and counted against the `intervalCap`. If `false`, any of those pending Promises will not count towards the next `intervalCap`.
112
113### queue
114
115`PQueue` instance.
116
117#### .add(fn, options?)
118
119Adds a sync or async task to the queue. Always returns a promise.
120
121Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.
122
123##### fn
124
125Type: `Function`
126
127Promise-returning/async function. When executed, it will receive `{signal}` as the first argument.
128
129#### options
130
131Type: `object`
132
133##### priority
134
135Type: `number`\
136Default: `0`
137
138Priority of operation. Operations with greater priority will be scheduled first.
139
140##### signal
141
142*Requires Node.js 16 or later.*
143
144[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an `AbortError`. If the operation is already running, the signal will need to be handled by the operation itself.
145
146```js
147import PQueue, {AbortError} from 'p-queue';
148import got, {CancelError} from 'got';
149
150const queue = new PQueue();
151
152const controller = new AbortController();
153
154try {
155 await queue.add(({signal}) => {
156 const request = got('https://sindresorhus.com');
157
158 signal.addEventListener('abort', () => {
159 request.cancel();
160 });
161
162 try {
163 return await request;
164 } catch (error) {
165 if (!(error instanceof CancelError)) {
166 throw error;
167 }
168 }
169 }, {signal: controller.signal});
170} catch (error) {
171 if (!(error instanceof AbortError)) {
172 throw error;
173 }
174}
175```
176
177#### .addAll(fns, options?)
178
179Same as `.add()`, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
180
181#### .pause()
182
183Put queue execution on hold.
184
185#### .start()
186
187Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
188
189Returns `this` (the instance).
190
191#### .onEmpty()
192
193Returns a promise that settles when the queue becomes empty.
194
195Can be called multiple times. Useful if you for example add additional items at a later time.
196
197#### .onIdle()
198
199Returns a promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
200
201The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
202
203#### .onSizeLessThan(limit)
204
205Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.
206
207If you want to avoid having the queue grow beyond a certain size you can `await queue.onSizeLessThan()` before adding a new item.
208
209Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
210
211#### .clear()
212
213Clear the queue.
214
215#### .size
216
217Size of the queue, the number of queued items waiting to run.
218
219#### .sizeBy(options)
220
221Size of the queue, filtered by the given options.
222
223For example, this can be used to find the number of items remaining in the queue with a specific priority level.
224
225```js
226import PQueue from 'p-queue';
227
228const queue = new PQueue();
229
230queue.add(async () => 'πŸ¦„', {priority: 1});
231queue.add(async () => 'πŸ¦„', {priority: 0});
232queue.add(async () => 'πŸ¦„', {priority: 1});
233
234console.log(queue.sizeBy({priority: 1}));
235//=> 2
236
237console.log(queue.sizeBy({priority: 0}));
238//=> 1
239```
240
241#### .pending
242
243Number of running items (no longer in the queue).
244
245#### [.timeout](#timeout)
246
247#### [.concurrency](#concurrency)
248
249#### .isPaused
250
251Whether the queue is currently paused.
252
253## Events
254
255#### active
256
257Emitted as each item is processed in the queue for the purpose of tracking progress.
258
259```js
260import delay from 'delay';
261import PQueue from 'p-queue';
262
263const queue = new PQueue({concurrency: 2});
264
265let count = 0;
266queue.on('active', () => {
267 console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`);
268});
269
270queue.add(() => Promise.resolve());
271queue.add(() => delay(2000));
272queue.add(() => Promise.resolve());
273queue.add(() => Promise.resolve());
274queue.add(() => delay(500));
275```
276
277#### completed
278
279Emitted when an item completes without error.
280
281```js
282import delay from 'delay';
283import PQueue from 'p-queue';
284
285const queue = new PQueue({concurrency: 2});
286
287queue.on('completed', result => {
288 console.log(result);
289});
290
291queue.add(() => Promise.resolve('hello, world!'));
292```
293
294#### error
295
296Emitted if an item throws an error.
297
298```js
299import delay from 'delay';
300import PQueue from 'p-queue';
301
302const queue = new PQueue({concurrency: 2});
303
304queue.on('error', error => {
305 console.error(error);
306});
307
308queue.add(() => Promise.reject(new Error('error')));
309```
310
311#### empty
312
313Emitted every time the queue becomes empty.
314
315Useful if you for example add additional items at a later time.
316
317#### idle
318
319Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
320
321The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
322
323```js
324import delay from 'delay';
325import PQueue from 'p-queue';
326
327const queue = new PQueue();
328
329queue.on('idle', () => {
330 console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
331});
332
333const job1 = queue.add(() => delay(2000));
334const job2 = queue.add(() => delay(500));
335
336await job1;
337await job2;
338// => 'Queue is idle. Size: 0 Pending: 0'
339
340await queue.add(() => delay(600));
341// => 'Queue is idle. Size: 0 Pending: 0'
342```
343
344The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.
345
346#### add
347
348Emitted every time the add method is called and the number of pending or queued tasks is increased.
349
350#### next
351
352Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.
353
354```js
355import delay from 'delay';
356import PQueue from 'p-queue';
357
358const queue = new PQueue();
359
360queue.on('add', () => {
361 console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
362});
363
364queue.on('next', () => {
365 console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
366});
367
368const job1 = queue.add(() => delay(2000));
369const job2 = queue.add(() => delay(500));
370
371await job1;
372await job2;
373//=> 'Task is added. Size: 0 Pending: 1'
374//=> 'Task is added. Size: 0 Pending: 2'
375
376await queue.add(() => delay(600));
377//=> 'Task is completed. Size: 0 Pending: 1'
378//=> 'Task is completed. Size: 0 Pending: 0'
379```
380
381### AbortError
382
383The error thrown by `queue.add()` when a job is aborted before it is run. See [`signal`](#signal).
384
385## Advanced example
386
387A more advanced example to help you understand the flow.
388
389```js
390import delay from 'delay';
391import PQueue from 'p-queue';
392
393const queue = new PQueue({concurrency: 1});
394
395(async () => {
396 await delay(200);
397
398 console.log(`8. Pending promises: ${queue.pending}`);
399 //=> '8. Pending promises: 0'
400
401 (async () => {
402 await queue.add(async () => 'πŸ™');
403 console.log('11. Resolved')
404 })();
405
406 console.log('9. Added πŸ™');
407
408 console.log(`10. Pending promises: ${queue.pending}`);
409 //=> '10. Pending promises: 1'
410
411 await queue.onIdle();
412 console.log('12. All work is done');
413})();
414
415(async () => {
416 await queue.add(async () => 'πŸ¦„');
417 console.log('5. Resolved')
418})();
419console.log('1. Added πŸ¦„');
420
421(async () => {
422 await queue.add(async () => '🐴');
423 console.log('6. Resolved')
424})();
425console.log('2. Added 🐴');
426
427(async () => {
428 await queue.onEmpty();
429 console.log('7. Queue is empty');
430})();
431
432console.log(`3. Queue size: ${queue.size}`);
433//=> '3. Queue size: 1`
434
435console.log(`4. Pending promises: ${queue.pending}`);
436//=> '4. Pending promises: 1'
437```
438
439```
440$ node example.js
4411. Added πŸ¦„
4422. Added 🐴
4433. Queue size: 1
4444. Pending promises: 1
4455. Resolved πŸ¦„
4466. Resolved 🐴
4477. Queue is empty
4488. Pending promises: 0
4499. Added πŸ™
45010. Pending promises: 1
45111. Resolved πŸ™
45212. All work is done
453```
454
455## Custom QueueClass
456
457For implementing more complex scheduling policies, you can provide a QueueClass in the options:
458
459```js
460import PQueue from 'p-queue';
461
462class QueueClass {
463 constructor() {
464 this._queue = [];
465 }
466
467 enqueue(run, options) {
468 this._queue.push(run);
469 }
470
471 dequeue() {
472 return this._queue.shift();
473 }
474
475 get size() {
476 return this._queue.length;
477 }
478
479 filter(options) {
480 return this._queue;
481 }
482}
483
484const queue = new PQueue({queueClass: QueueClass});
485```
486
487`p-queue` will call corresponding methods to put and get operations from this queue.
488
489## FAQ
490
491#### How do the `concurrency` and `intervalCap` options affect each other?
492
493They are just different constraints. The `concurrency` option limits how many things run at the same time. The `intervalCap` option limits how many things run in total during the interval (over time).
494
495## Maintainers
496
497- [Sindre Sorhus](https://github.com/sindresorhus)
498- [Richie Bendall](https://github.com/Richienb)
499
500## Related
501
502- [p-limit](https://github.com/sindresorhus/p-limit) - Run multiple promise-returning & async functions with limited concurrency
503- [p-throttle](https://github.com/sindresorhus/p-throttle) - Throttle promise-returning & async functions
504- [p-debounce](https://github.com/sindresorhus/p-debounce) - Debounce promise-returning & async functions
505- [p-all](https://github.com/sindresorhus/p-all) - Run promise-returning & async functions concurrently with optional limited concurrency
506- [More…](https://github.com/sindresorhus/promise-fun)