UNPKG

12.7 kBMarkdownView Raw
1# p-queue
2
3> Promise queue with concurrency control
4
5Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
6
7For servers, you probably want a Redis-backed [job queue](https://github.com/sindresorhus/awesome-nodejs#job-queues) instead.
8
9Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.
10
11## Install
12
13```sh
14npm install p-queue
15```
16
17**Warning:** This package is native [ESM](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Modules) and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to [convert to ESM](https://gist.github.com/sindresorhus/a39789f98801d908bbc7ff3ecc99d99c). Please don't open issues for questions regarding CommonJS / ESM.
18
19## Usage
20
21Here we run only one promise at the time. For example, set `concurrency` to 4 to run four promises at the same time.
22
23```js
24import PQueue from 'p-queue';
25import got from 'got';
26
27const queue = new PQueue({concurrency: 1});
28
29(async () => {
30 await queue.add(() => got('https://sindresorhus.com'));
31 console.log('Done: sindresorhus.com');
32})();
33
34(async () => {
35 await queue.add(() => got('https://avajs.dev'));
36 console.log('Done: avajs.dev');
37})();
38
39(async () => {
40 const task = await getUnicornTask();
41 await queue.add(task);
42 console.log('Done: Unicorn task');
43})();
44```
45
46## API
47
48### PQueue(options?)
49
50Returns a new `queue` instance, which is an [`EventEmitter3`](https://github.com/primus/eventemitter3) subclass.
51
52#### options
53
54Type: `object`
55
56##### concurrency
57
58Type: `number`\
59Default: `Infinity`\
60Minimum: `1`
61
62Concurrency limit.
63
64##### timeout
65
66Type: `number`
67
68Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
69
70##### throwOnTimeout
71
72Type: `boolean`\
73Default: `false`
74
75Whether or not a timeout is considered an exception.
76
77##### autoStart
78
79Type: `boolean`\
80Default: `true`
81
82Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
83
84##### queueClass
85
86Type: `Function`
87
88Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](#custom-queueclass) section.
89
90##### intervalCap
91
92Type: `number`\
93Default: `Infinity`\
94Minimum: `1`
95
96The max number of runs in the given interval of time.
97
98##### interval
99
100Type: `number`\
101Default: `0`\
102Minimum: `0`
103
104The length of time in milliseconds before the interval count resets. Must be finite.
105
106##### carryoverConcurrencyCount
107
108Type: `boolean`\
109Default: `false`
110
111If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) Promises, should be carried over into the next interval and counted against the `intervalCap`. If `false`, any of those pending Promises will not count towards the next `intervalCap`.
112
113### queue
114
115`PQueue` instance.
116
117#### .add(fn, options?)
118
119Adds a sync or async task to the queue. Always returns a promise.
120
121Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.
122
123##### fn
124
125Type: `Function`
126
127Promise-returning/async function. When executed, it will receive `{signal}` as the first argument.
128
129#### options
130
131Type: `object`
132
133##### priority
134
135Type: `number`\
136Default: `0`
137
138Priority of operation. Operations with greater priority will be scheduled first.
139
140##### signal
141
142[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an [error](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason). If the operation is already running, the signal will need to be handled by the operation itself.
143
144```js
145import PQueue from 'p-queue';
146import got, {CancelError} from 'got';
147
148const queue = new PQueue();
149
150const controller = new AbortController();
151
152try {
153 await queue.add(({signal}) => {
154 const request = got('https://sindresorhus.com');
155
156 signal.addEventListener('abort', () => {
157 request.cancel();
158 });
159
160 try {
161 return await request;
162 } catch (error) {
163 if (!(error instanceof CancelError)) {
164 throw error;
165 }
166 }
167 }, {signal: controller.signal});
168} catch (error) {
169 if (!(error instanceof DOMException)) {
170 throw error;
171 }
172}
173```
174
175#### .addAll(fns, options?)
176
177Same as `.add()`, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
178
179#### .pause()
180
181Put queue execution on hold.
182
183#### .start()
184
185Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
186
187Returns `this` (the instance).
188
189#### .onEmpty()
190
191Returns a promise that settles when the queue becomes empty.
192
193Can be called multiple times. Useful if you for example add additional items at a later time.
194
195#### .onIdle()
196
197Returns a promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
198
199The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
200
201#### .onSizeLessThan(limit)
202
203Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.
204
205If you want to avoid having the queue grow beyond a certain size you can `await queue.onSizeLessThan()` before adding a new item.
206
207Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
208
209#### .clear()
210
211Clear the queue.
212
213#### .size
214
215Size of the queue, the number of queued items waiting to run.
216
217#### .sizeBy(options)
218
219Size of the queue, filtered by the given options.
220
221For example, this can be used to find the number of items remaining in the queue with a specific priority level.
222
223```js
224import PQueue from 'p-queue';
225
226const queue = new PQueue();
227
228queue.add(async () => 'πŸ¦„', {priority: 1});
229queue.add(async () => 'πŸ¦„', {priority: 0});
230queue.add(async () => 'πŸ¦„', {priority: 1});
231
232console.log(queue.sizeBy({priority: 1}));
233//=> 2
234
235console.log(queue.sizeBy({priority: 0}));
236//=> 1
237```
238
239#### .pending
240
241Number of running items (no longer in the queue).
242
243#### [.timeout](#timeout)
244
245#### [.concurrency](#concurrency)
246
247#### .isPaused
248
249Whether the queue is currently paused.
250
251## Events
252
253#### active
254
255Emitted as each item is processed in the queue for the purpose of tracking progress.
256
257```js
258import delay from 'delay';
259import PQueue from 'p-queue';
260
261const queue = new PQueue({concurrency: 2});
262
263let count = 0;
264queue.on('active', () => {
265 console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`);
266});
267
268queue.add(() => Promise.resolve());
269queue.add(() => delay(2000));
270queue.add(() => Promise.resolve());
271queue.add(() => Promise.resolve());
272queue.add(() => delay(500));
273```
274
275#### completed
276
277Emitted when an item completes without error.
278
279```js
280import delay from 'delay';
281import PQueue from 'p-queue';
282
283const queue = new PQueue({concurrency: 2});
284
285queue.on('completed', result => {
286 console.log(result);
287});
288
289queue.add(() => Promise.resolve('hello, world!'));
290```
291
292#### error
293
294Emitted if an item throws an error.
295
296```js
297import delay from 'delay';
298import PQueue from 'p-queue';
299
300const queue = new PQueue({concurrency: 2});
301
302queue.on('error', error => {
303 console.error(error);
304});
305
306queue.add(() => Promise.reject(new Error('error')));
307```
308
309#### empty
310
311Emitted every time the queue becomes empty.
312
313Useful if you for example add additional items at a later time.
314
315#### idle
316
317Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
318
319The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
320
321```js
322import delay from 'delay';
323import PQueue from 'p-queue';
324
325const queue = new PQueue();
326
327queue.on('idle', () => {
328 console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
329});
330
331const job1 = queue.add(() => delay(2000));
332const job2 = queue.add(() => delay(500));
333
334await job1;
335await job2;
336// => 'Queue is idle. Size: 0 Pending: 0'
337
338await queue.add(() => delay(600));
339// => 'Queue is idle. Size: 0 Pending: 0'
340```
341
342The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.
343
344#### add
345
346Emitted every time the add method is called and the number of pending or queued tasks is increased.
347
348#### next
349
350Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.
351
352```js
353import delay from 'delay';
354import PQueue from 'p-queue';
355
356const queue = new PQueue();
357
358queue.on('add', () => {
359 console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
360});
361
362queue.on('next', () => {
363 console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
364});
365
366const job1 = queue.add(() => delay(2000));
367const job2 = queue.add(() => delay(500));
368
369await job1;
370await job2;
371//=> 'Task is added. Size: 0 Pending: 1'
372//=> 'Task is added. Size: 0 Pending: 2'
373
374await queue.add(() => delay(600));
375//=> 'Task is completed. Size: 0 Pending: 1'
376//=> 'Task is completed. Size: 0 Pending: 0'
377```
378
379## Advanced example
380
381A more advanced example to help you understand the flow.
382
383```js
384import delay from 'delay';
385import PQueue from 'p-queue';
386
387const queue = new PQueue({concurrency: 1});
388
389(async () => {
390 await delay(200);
391
392 console.log(`8. Pending promises: ${queue.pending}`);
393 //=> '8. Pending promises: 0'
394
395 (async () => {
396 await queue.add(async () => 'πŸ™');
397 console.log('11. Resolved')
398 })();
399
400 console.log('9. Added πŸ™');
401
402 console.log(`10. Pending promises: ${queue.pending}`);
403 //=> '10. Pending promises: 1'
404
405 await queue.onIdle();
406 console.log('12. All work is done');
407})();
408
409(async () => {
410 await queue.add(async () => 'πŸ¦„');
411 console.log('5. Resolved')
412})();
413console.log('1. Added πŸ¦„');
414
415(async () => {
416 await queue.add(async () => '🐴');
417 console.log('6. Resolved')
418})();
419console.log('2. Added 🐴');
420
421(async () => {
422 await queue.onEmpty();
423 console.log('7. Queue is empty');
424})();
425
426console.log(`3. Queue size: ${queue.size}`);
427//=> '3. Queue size: 1`
428
429console.log(`4. Pending promises: ${queue.pending}`);
430//=> '4. Pending promises: 1'
431```
432
433```
434$ node example.js
4351. Added πŸ¦„
4362. Added 🐴
4373. Queue size: 1
4384. Pending promises: 1
4395. Resolved πŸ¦„
4406. Resolved 🐴
4417. Queue is empty
4428. Pending promises: 0
4439. Added πŸ™
44410. Pending promises: 1
44511. Resolved πŸ™
44612. All work is done
447```
448
449## Custom QueueClass
450
451For implementing more complex scheduling policies, you can provide a QueueClass in the options:
452
453```js
454import PQueue from 'p-queue';
455
456class QueueClass {
457 constructor() {
458 this._queue = [];
459 }
460
461 enqueue(run, options) {
462 this._queue.push(run);
463 }
464
465 dequeue() {
466 return this._queue.shift();
467 }
468
469 get size() {
470 return this._queue.length;
471 }
472
473 filter(options) {
474 return this._queue;
475 }
476}
477
478const queue = new PQueue({queueClass: QueueClass});
479```
480
481`p-queue` will call corresponding methods to put and get operations from this queue.
482
483## FAQ
484
485#### How do the `concurrency` and `intervalCap` options affect each other?
486
487They are just different constraints. The `concurrency` option limits how many things run at the same time. The `intervalCap` option limits how many things run in total during the interval (over time).
488
489## Maintainers
490
491- [Sindre Sorhus](https://github.com/sindresorhus)
492- [Richie Bendall](https://github.com/Richienb)
493
494## Related
495
496- [p-limit](https://github.com/sindresorhus/p-limit) - Run multiple promise-returning & async functions with limited concurrency
497- [p-throttle](https://github.com/sindresorhus/p-throttle) - Throttle promise-returning & async functions
498- [p-debounce](https://github.com/sindresorhus/p-debounce) - Debounce promise-returning & async functions
499- [p-all](https://github.com/sindresorhus/p-all) - Run promise-returning & async functions concurrently with optional limited concurrency
500- [More…](https://github.com/sindresorhus/promise-fun)