UNPKG

11 kBMarkdownView Raw
1# p-queue
2
3> Promise queue with concurrency control
4
5Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
6
7## Install
8
9```
10$ npm install p-queue
11```
12
13## Usage
14
15Here we run only one promise at the time. For example, set `concurrency` to 4 to run four promises at the same time.
16
17```js
18import PQueue from 'p-queue';
19import got from 'got';
20
21const queue = new PQueue({concurrency: 1});
22
23(async () => {
24 await queue.add(() => got('https://sindresorhus.com'));
25 console.log('Done: sindresorhus.com');
26})();
27
28(async () => {
29 await queue.add(() => got('https://avajs.dev'));
30 console.log('Done: avajs.dev');
31})();
32
33(async () => {
34 const task = await getUnicornTask();
35 await queue.add(task);
36 console.log('Done: Unicorn task');
37})();
38```
39
40## API
41
42### PQueue(options?)
43
44Returns a new `queue` instance, which is an [`EventEmitter3`](https://github.com/primus/eventemitter3) subclass.
45
46#### options
47
48Type: `object`
49
50##### concurrency
51
52Type: `number`\
53Default: `Infinity`\
54Minimum: `1`
55
56Concurrency limit.
57
58##### timeout
59
60Type: `number`
61
62Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
63
64##### throwOnTimeout
65
66Type: `boolean`\
67Default: `false`
68
69Whether or not a timeout is considered an exception.
70
71##### autoStart
72
73Type: `boolean`\
74Default: `true`
75
76Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
77
78##### queueClass
79
80Type: `Function`
81
82Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](#custom-queueclass) section.
83
84##### intervalCap
85
86Type: `number`\
87Default: `Infinity`\
88Minimum: `1`
89
90The max number of runs in the given interval of time.
91
92##### interval
93
94Type: `number`\
95Default: `0`\
96Minimum: `0`
97
98The length of time in milliseconds before the interval count resets. Must be finite.
99
100##### carryoverConcurrencyCount
101
102Type: `boolean`\
103Default: `false`
104
105If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) Promises, should be carried over into the next interval and counted against the `intervalCap`. If `false`, any of those pending Promises will not count towards the next `intervalCap`.
106
107### queue
108
109`PQueue` instance.
110
111#### .add(fn, options?)
112
113Adds a sync or async task to the queue. Always returns a promise.
114
115Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.
116
117##### fn
118
119Type: `Function`
120
121Promise-returning/async function.
122
123#### options
124
125Type: `object`
126
127##### priority
128
129Type: `number`\
130Default: `0`
131
132Priority of operation. Operations with greater priority will be scheduled first.
133
134#### .addAll(fns, options?)
135
136Same as `.add()`, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
137
138#### .pause()
139
140Put queue execution on hold.
141
142#### .start()
143
144Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via `options.autoStart = false` or by `.pause()` method.)
145
146Returns `this` (the instance).
147
148#### .onEmpty()
149
150Returns a promise that settles when the queue becomes empty.
151
152Can be called multiple times. Useful if you for example add additional items at a later time.
153
154#### .onIdle()
155
156Returns a promise that settles when the queue becomes empty, and all promises have completed; `queue.size === 0 && queue.pending === 0`.
157
158The difference with `.onEmpty` is that `.onIdle` guarantees that all work from the queue has finished. `.onEmpty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
159
160#### .onSizeLessThan(limit)
161
162Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.
163
164If you want to avoid having the queue grow beyond a certain size you can `await queue.onSizeLessThan()` before adding a new item.
165
166Note that this only limits the number of items waiting to start. There could still be up to `concurrency` jobs already running that this call does not include in its calculation.
167
168#### .clear()
169
170Clear the queue.
171
172#### .size
173
174Size of the queue, the number of queued items waiting to run.
175
176#### .sizeBy(options)
177
178Size of the queue, filtered by the given options.
179
180For example, this can be used to find the number of items remaining in the queue with a specific priority level.
181
182```js
183import PQueue from 'p-queue';
184
185const queue = new PQueue();
186
187queue.add(async () => 'πŸ¦„', {priority: 1});
188queue.add(async () => 'πŸ¦„', {priority: 0});
189queue.add(async () => 'πŸ¦„', {priority: 1});
190
191console.log(queue.sizeBy({priority: 1}));
192//=> 2
193
194console.log(queue.sizeBy({priority: 0}));
195//=> 1
196```
197
198#### .pending
199
200Number of running items (no longer in the queue).
201
202#### [.timeout](#timeout)
203
204#### [.concurrency](#concurrency)
205
206#### .isPaused
207
208Whether the queue is currently paused.
209
210## Events
211
212#### active
213
214Emitted as each item is processed in the queue for the purpose of tracking progress.
215
216```js
217import delay from 'delay';
218import PQueue from 'p-queue';
219
220const queue = new PQueue({concurrency: 2});
221
222let count = 0;
223queue.on('active', () => {
224 console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`);
225});
226
227queue.add(() => Promise.resolve());
228queue.add(() => delay(2000));
229queue.add(() => Promise.resolve());
230queue.add(() => Promise.resolve());
231queue.add(() => delay(500));
232```
233
234#### completed
235
236Emitted when an item completes without error.
237
238```js
239import delay from 'delay';
240import PQueue from 'p-queue';
241
242const queue = new PQueue({concurrency: 2});
243
244queue.on('completed', result => {
245 console.log(result);
246});
247
248queue.add(() => Promise.resolve('hello, world!'));
249```
250
251#### error
252
253Emitted if an item throws an error.
254
255```js
256import delay from 'delay';
257import PQueue from 'p-queue';
258
259const queue = new PQueue({concurrency: 2});
260
261queue.on('error', error => {
262 console.error(error);
263});
264
265queue.add(() => Promise.reject(new Error('error')));
266```
267
268#### idle
269
270Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
271
272```js
273import delay from 'delay';
274import PQueue from 'p-queue';
275
276const queue = new PQueue();
277
278queue.on('idle', () => {
279 console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
280});
281
282const job1 = queue.add(() => delay(2000));
283const job2 = queue.add(() => delay(500));
284
285await job1;
286await job2;
287// => 'Queue is idle. Size: 0 Pending: 0'
288
289await queue.add(() => delay(600));
290// => 'Queue is idle. Size: 0 Pending: 0'
291```
292
293The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.
294
295#### add
296
297Emitted every time the add method is called and the number of pending or queued tasks is increased.
298
299#### next
300
301Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.
302
303```js
304import delay from 'delay';
305import PQueue from 'p-queue';
306
307const queue = new PQueue();
308
309queue.on('add', () => {
310 console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
311});
312
313queue.on('next', () => {
314 console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
315});
316
317const job1 = queue.add(() => delay(2000));
318const job2 = queue.add(() => delay(500));
319
320await job1;
321await job2;
322//=> 'Task is added. Size: 0 Pending: 1'
323//=> 'Task is added. Size: 0 Pending: 2'
324
325await queue.add(() => delay(600));
326//=> 'Task is completed. Size: 0 Pending: 1'
327//=> 'Task is completed. Size: 0 Pending: 0'
328```
329
330## Advanced example
331
332A more advanced example to help you understand the flow.
333
334```js
335import delay from 'delay';
336import PQueue from 'p-queue';
337
338const queue = new PQueue({concurrency: 1});
339
340(async () => {
341 await delay(200);
342
343 console.log(`8. Pending promises: ${queue.pending}`);
344 //=> '8. Pending promises: 0'
345
346 (async () => {
347 await queue.add(async () => 'πŸ™');
348 console.log('11. Resolved')
349 })();
350
351 console.log('9. Added πŸ™');
352
353 console.log(`10. Pending promises: ${queue.pending}`);
354 //=> '10. Pending promises: 1'
355
356 await queue.onIdle();
357 console.log('12. All work is done');
358})();
359
360(async () => {
361 await queue.add(async () => 'πŸ¦„');
362 console.log('5. Resolved')
363})();
364console.log('1. Added πŸ¦„');
365
366(async () => {
367 await queue.add(async () => '🐴');
368 console.log('6. Resolved')
369})();
370console.log('2. Added 🐴');
371
372(async () => {
373 await queue.onEmpty();
374 console.log('7. Queue is empty');
375})();
376
377console.log(`3. Queue size: ${queue.size}`);
378//=> '3. Queue size: 1`
379
380console.log(`4. Pending promises: ${queue.pending}`);
381//=> '4. Pending promises: 1'
382```
383
384```
385$ node example.js
3861. Added πŸ¦„
3872. Added 🐴
3883. Queue size: 1
3894. Pending promises: 1
3905. Resolved πŸ¦„
3916. Resolved 🐴
3927. Queue is empty
3938. Pending promises: 0
3949. Added πŸ™
39510. Pending promises: 1
39611. Resolved πŸ™
39712. All work is done
398```
399
400## Custom QueueClass
401
402For implementing more complex scheduling policies, you can provide a QueueClass in the options:
403
404```js
405import PQueue from 'p-queue';
406
407class QueueClass {
408 constructor() {
409 this._queue = [];
410 }
411
412 enqueue(run, options) {
413 this._queue.push(run);
414 }
415
416 dequeue() {
417 return this._queue.shift();
418 }
419
420 get size() {
421 return this._queue.length;
422 }
423
424 filter(options) {
425 return this._queue;
426 }
427}
428
429const queue = new PQueue({queueClass: QueueClass});
430```
431
432`p-queue` will call corresponding methods to put and get operations from this queue.
433
434## FAQ
435
436#### How do the `concurrency` and `intervalCap` options affect each other?
437
438They are just different constraints. The `concurrency` option limits how many things run at the same time. The `intervalCap` option limits how many things run in total during the interval (over time).
439
440## Related
441
442- [p-limit](https://github.com/sindresorhus/p-limit) - Run multiple promise-returning & async functions with limited concurrency
443- [p-throttle](https://github.com/sindresorhus/p-throttle) - Throttle promise-returning & async functions
444- [p-debounce](https://github.com/sindresorhus/p-debounce) - Debounce promise-returning & async functions
445- [p-all](https://github.com/sindresorhus/p-all) - Run promise-returning & async functions concurrently with optional limited concurrency
446- [More…](https://github.com/sindresorhus/promise-fun)
447
448---
449
450<div align="center">
451 <b>
452 <a href="https://tidelift.com/subscription/pkg/npm-p-queue?utm_source=npm-p-queue&utm_medium=referral&utm_campaign=readme">Get professional support for this package with a Tidelift subscription</a>
453 </b>
454 <br>
455 <sub>
456 Tidelift helps make open source sustainable for maintainers while giving companies<br>assurances about security, maintenance, and licensing for their dependencies.
457 </sub>
458</div>