UNPKG

19.7 kBMarkdownView Raw
1# Better Queue - Powerful flow control
2
3[![npm package](https://nodei.co/npm/better-queue.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/better-queue/)
4
5[![Build status](https://img.shields.io/travis/diamondio/better-queue.svg?style=flat-square)](https://travis-ci.org/diamondio/better-queue)
6[![Dependency Status](https://img.shields.io/david/diamondio/better-queue.svg?style=flat-square)](https://david-dm.org/diamondio/better-queue)
7[![Known Vulnerabilities](https://snyk.io/test/npm/better-queue/badge.svg?style=flat-square)](https://snyk.io/test/npm/better-queue)
8[![Gitter](https://img.shields.io/badge/gitter-join_chat-blue.svg?style=flat-square)](https://gitter.im/diamondio/better-queue?utm_source=badge)
9
10
11## Super simple to use
12
13Better Queue is designed to be simple to set up but still let you do complex things.
14
15- Persistent (and extendable) storage
16- Batched processing
17- Prioritize tasks
18- Merge/filter tasks
19- Progress events (with ETA!)
20- Fine-tuned timing controls
21- Retry on fail
22- Concurrent batch processing
23- Task statistics (average completion time, failure rate and peak queue size)
24- ... and more!
25
26---
27
28#### Install (via npm)
29
30```bash
31npm install --save better-queue
32```
33
34---
35
36#### Quick Example
37
38```js
39var Queue = require('better-queue');
40
41var q = new Queue(function (input, cb) {
42
43 // Some processing here ...
44
45 cb(null, result);
46})
47
48q.push(1)
49q.push({ x: 1 })
50```
51
52## Table of contents
53
54- [Queuing](#queuing)
55- [Task Management](#task-management)
56- [Queue Management](#queue-management)
57- [Advanced](#advanced)
58- [Storage](#storage)
59- [Full Documentation](#full-documentation)
60
61---
62
63You will be able to combine any (and all) of these options
64for your queue!
65
66
67## Queuing
68
69It's very easy to push tasks into the queue.
70
71```js
72var q = new Queue(fn);
73q.push(1);
74q.push({ x: 1, y: 2 });
75q.push("hello");
76```
77
78You can also include a callback as a second parameter to the push
79function, which would be called when that task is done. For example:
80
81```js
82var q = new Queue(fn);
83q.push(1, function (err, result) {
84 // Results from the task!
85});
86```
87
88You can also listen to events on the results of the `push` call.
89
90```js
91var q = new Queue(fn);
92q.push(1)
93 .on('done', function (result) {
94 // Task succeeded with {result}!
95 })
96 .on('fail', function (err) {
97 // Task failed!
98 })
99```
100
101Alternatively, you can subscribe to the queue's events.
102
103```js
104var q = new Queue(fn);
105q.on('task_finish', function (taskId, result, stats) {
106 // taskId = 1, result: 3, stats = { elapsed: <time taken> }
107 // taskId = 2, result: 5, stats = { elapsed: <time taken> }
108})
109q.on('task_failed', function (taskId, err, stats) {
110 // Handle error, stats = { elapsed: <time taken> }
111})
112q.on('empty', function (){})
113q.on('drain', function (){})
114q.push({ id: 1, a: 1, b: 2 });
115q.push({ id: 2, a: 2, b: 3 });
116```
117
118`empty` event fires when all of the tasks have been pulled off of
119the queue (there may still be tasks running!)
120
121`drain` event fires when there are no more tasks on the queue _and_
122when no more tasks are running.
123
124You can control how many tasks process at the same time.
125
126```js
127var q = new Queue(fn, { concurrent: 3 })
128```
129
130Now the queue will allow 3 tasks running at the same time. (By
131default, we handle tasks one at a time.)
132
133You can also turn the queue into a stack by turning on `filo`.
134
135```js
136var q = new Queue(fn, { filo: true })
137```
138
139Now items you push on will be handled first.
140
141
142
143[back to top](#table-of-contents)
144
145---
146
147## Task Management
148
149#### Task ID
150
151Tasks can be given an ID to help identify and track it as it goes through
152the queue.
153
154By default, we look for `task.id` to see if it's a string property,
155otherwise we generate a random ID for the task.
156
157You can pass in an `id` property to options to change this behaviour.
158Here are some examples of how:
159
160```js
161var q = new Queue(fn, {
162 id: 'id', // Default: task's `id` property
163 id: 'name', // task's `name` property
164 id: function (task, cb) {
165 // Compute the ID
166 cb(null, 'computed_id');
167 }
168})
169```
170
171One thing you can do with Task ID is merge tasks:
172
173```js
174var counter = new Queue(function (task, cb) {
175 console.log("I have %d %ss.", task.count, task.id);
176 cb();
177}, {
178 merge: function (oldTask, newTask, cb) {
179 oldTask.count += newTask.count;
180 cb(null, oldTask);
181 }
182})
183counter.push({ id: 'apple', count: 2 });
184counter.push({ id: 'apple', count: 1 });
185counter.push({ id: 'orange', count: 1 });
186counter.push({ id: 'orange', count: 1 });
187// Prints out:
188// I have 3 apples.
189// I have 2 oranges.
190```
191
192By default, if tasks have the same ID they replace the previous task.
193
194```js
195var counter = new Queue(function (task, cb) {
196 console.log("I have %d %ss.", task.count, task.id);
197 cb();
198})
199counter.push({ id: 'apple', count: 1 });
200counter.push({ id: 'apple', count: 3 });
201counter.push({ id: 'orange', count: 1 });
202counter.push({ id: 'orange', count: 2 });
203// Prints out:
204// I have 3 apples.
205// I have 2 oranges.
206```
207
208You can also use the task ID when subscribing to events from Queue.
209
210```js
211var counter = new Queue(fn)
212counter.on('task_finish', function (taskId, result) {
213 // taskId will be 'jim' or 'bob'
214})
215counter.push({ id: 'jim', count: 2 });
216counter.push({ id: 'bob', count: 1 });
217```
218
219
220#### Batch Processing
221
222Your processing function can also be modified to handle multiple
223tasks at the same time. For example:
224
225```js
226var ages = new Queue(function (batch, cb) {
227 // Batch 1:
228 // [ { id: 'steve', age: 21 },
229 // { id: 'john', age: 34 },
230 // { id: 'joe', age: 18 } ]
231 // Batch 2:
232 // [ { id: 'mary', age: 23 } ]
233 cb();
234}, { batchSize: 3 })
235ages.push({ id: 'steve', age: 21 });
236ages.push({ id: 'john', age: 34 });
237ages.push({ id: 'joe', age: 18 });
238ages.push({ id: 'mary', age: 23 });
239```
240
241Note how the queue will only handle at most 3 items at a time.
242
243Below is another example of a batched call with numbers.
244
245```js
246var ages = new Queue(function (batch, cb) {
247 // batch = [1,2,3]
248 cb();
249}, { batchSize: 3 })
250ages.push(1);
251ages.push(2);
252ages.push(3);
253```
254
255
256#### Filtering, Validation and Priority
257
258You can also format (and filter) the input that arrives from a push
259before it gets processed by the queue by passing in a `filter`
260function.
261
262```js
263var greeter = new Queue(function (name, cb) {
264 console.log("Hello, %s!", name)
265 cb();
266}, {
267 filter: function (input, cb) {
268 if (input === 'Bob') {
269 return cb('not_allowed');
270 }
271 return cb(null, input.toUpperCase())
272 }
273});
274greeter.push('anna'); // Prints 'Hello, ANNA!'
275```
276
277This can be particularly useful if your queue needs to do some pre-processing,
278input validation, database lookup, etc. before you load it onto the queue.
279
280You can also define a priority function to control which tasks get
281processed first.
282
283```js
284var greeter = new Queue(function (name, cb) {
285 console.log("Greetings, %s.", name);
286 cb();
287}, {
288 priority: function (name, cb) {
289 if (name === "Steve") return cb(null, 10);
290 if (name === "Mary") return cb(null, 5);
291 if (name === "Joe") return cb(null, 5);
292 cb(null, 1);
293 }
294})
295greeter.push("Steve");
296greeter.push("John");
297greeter.push("Joe");
298greeter.push("Mary");
299
300// Prints out:
301// Greetings, Steve.
302// Greetings, Joe.
303// Greetings, Mary.
304// Greetings, John.
305```
306
307If `filo` is set to `true` in the example above, then Joe and Mary
308would swap order.
309
310
311[back to top](#table-of-contents)
312
313---
314
315## Queue Management
316
317#### Retry
318
319You can set tasks to retry `maxRetries` times if they fail. By default,
320tasks will fail (and will not retry.) Optionally, you can set a `retryDelay`
321to wait a little while before retrying.
322
323```js
324var q = new Queue(fn, { maxRetries: 10, retryDelay: 1000 })
325```
326
327
328#### Timing
329
330You can configure the queue to have a `maxTimeout`.
331
332```js
333var q = new Queue(function (name, cb) {
334 someLongTask(function () {
335 cb();
336 })
337}, { maxTimeout: 2000 })
338```
339
340After 2 seconds, the process will throw an error instead of waiting for the
341callback to finish.
342
343You can also delay the queue before it starts its processing. This is the
344behaviour of a timed cargo.
345
346```js
347var q = new Queue(function (batch, cb) {
348 // Batch [1,2] will process after 2s.
349 cb();
350}, { batchSize: 5, batchDelay: 2000 })
351q.push(1);
352setTimeout(function () {
353 q.push(2);
354}, 1000)
355```
356
357You can also set `afterProcessDelay`, which will delay processing between tasks.
358
359```js
360var q = new Queue(function (task, cb) {
361 cb(); // Will wait 1 second before taking the next task
362}, { afterProcessDelay: 1000 })
363q.push(1);
364q.push(2);
365```
366
367Instead of just the `batchDelay`, you can add a `batchDelayTimeout`, which is for firing off a batch if it hasn't had any new tasks pushed to the queue in the `batchDelayTimeout` time (in milliseconds.)
368
369```js
370var q = new Queue(fn, {
371 batchSize: 50,
372 batchDelay: 5000,
373 batchDelayTimeout: 1000
374})
375q.push(1);
376q.push(2);
377```
378
379In the example above, the queue will wait for 50 items to fill up in 5s or process the queue if no new tasks were added in 1s.
380
381#### Precondition
382
383You can define a function called `precondition` that checks that it's ok to process
384the next batch. If the preconditions fail, it will keep calling this function until
385it passes again.
386
387```js
388var q = new Queue(function (batch, cb) {
389
390 // Do something that requires internet
391
392}, {
393 precondition: function (cb) {
394 isOnline(function (err, ok) {
395 if (ok) {
396 cb(null, true);
397 } else {
398 cb(null, false);
399 }
400 })
401 },
402 preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s
403})
404```
405
406
407#### Pause/Resume
408
409There are options to control processes while they are running.
410
411You can return an object in your processing function with the functions
412`cancel`, `pause` and `resume`. This will allow operations to pause, resume
413or cancel while it's running.
414
415```js
416var uploader = new Queue(function (file, cb) {
417
418 var worker = someLongProcess(file);
419
420 return {
421 cancel: function () {
422 // Cancel the file upload
423 },
424 pause: function () {
425 // Pause the file upload
426 },
427 resume: function () {
428 // Resume the file upload
429 }
430 }
431})
432uploader.push('/path/to/file.pdf');
433uploader.pause();
434uploader.resume();
435```
436
437#### Cancel/Abort
438
439You can also set `cancelIfRunning` to `true`. This will cancel a running task if
440a task with the same ID is pushed onto the queue.
441
442```js
443var uploader = new Queue(function (file, cb) {
444 var request = someLongProcess(file);
445 return {
446 cancel: function () {
447 request.cancel();
448 }
449 }
450}, {
451 id: 'path',
452 cancelIfRunning: true
453})
454uploader.push({ path: '/path/to/file.pdf' });
455// ... Some time later
456uploader.push({ path: '/path/to/file.pdf' });
457```
458
459In the example above, the first upload process is cancelled and the task is requeued.
460
461You can also call `.cancel(taskId)` to cancel and unqueue the task.
462
463```js
464uploader.cancel('/path/to/file.pdf');
465```
466
467Note that if you enable this option in batch mode, it will cancel the entire batch!
468
469
470[back to top](#table-of-contents)
471
472---
473
474## Advanced
475
476#### Updating Task Status
477
478The process function will be run in a context with `progress`,
479`finishBatch` and `failedBatch` functions.
480
481The example below illustrates how you can use these:
482
483```js
484var uploader = new Queue(function (file, cb) {
485 this.failedBatch('some_error')
486 this.finishBatch(result)
487 this.progressBatch(bytesUploaded, totalBytes, "uploading")
488});
489uploader.on('task_finish', function (taskId, result) {
490 // Handle finished result
491})
492uploader.on('task_failed', function (taskId, errorMessage) {
493 // Handle error
494})
495uploader.on('task_progress', function (taskId, completed, total) {
496 // Handle task progress
497})
498
499uploader.push('/some/file.jpg')
500 .on('finish', function (result) {
501 // Handle upload result
502 })
503 .on('failed', function (err) {
504 // Handle error
505 })
506 .on('progress', function (progress) {
507 // progress.eta - human readable string estimating time remaining
508 // progress.pct - % complete (out of 100)
509 // progress.complete - # completed so far
510 // progress.total - # for completion
511 // progress.message - status message
512 })
513```
514
515#### Update Status in Batch mode (batchSize > 1)
516
517You can also complete individual tasks in a batch by using `failedTask` and
518`finishTask` functions.
519
520```js
521var uploader = new Queue(function (files, cb) {
522 this.failedTask(0, 'some_error') // files[0] has failed with 'some_error'
523 this.finishTask(1, result) // files[1] has finished with {result}
524 this.progressTask(2, 30, 100, "copying") // files[2] is 30% done, currently copying
525}, { batchSize: 3 });
526uploader.push('/some/file1.jpg')
527uploader.push('/some/file2.jpg')
528uploader.push('/some/file3.jpg')
529```
530
531Note that if you use *-Task and *-Batch functions together, the batch functions will only
532apply to the tasks that have not yet finished/failed.
533
534
535#### Queue Statistics
536
537You can inspect the queue at any given time to see information about how many items are
538queued, average queue time, success rate and total item processed.
539
540```js
541var q = new Queue(fn);
542var stats = q.getStats();
543
544// stats.total = Total tasks processed
545// stats.average = Average process time
546// stats.successRate = % success (between 0 and 1)
547// stats.peak = Most tasks queued at any given point in time
548```
549
550
551[back to top](#table-of-contents)
552
553---
554
555
556## Storage
557
558
559#### Using a store
560
561For your convenience, we have added compatibility for a few storage options.
562
563By default, we are using an in-memory store that doesn't persist. You can change
564to one of our other built in stores by passing in the `store` option.
565
566#### Built-in store
567
568Currently, we support the following stores:
569
570 - memory
571 - sql (SQLite, PostgreSQL)
572
573#### SQLite store (`npm install sqlite3`)
574```
575var q = new Queue(fn, {
576 store: {
577 type: 'sql',
578 dialect: 'sqlite',
579 path: '/path/to/sqlite/file'
580 }
581});
582```
583
584#### PostgreSQL store (`npm install pg`)
585```
586var q = new Queue(fn, {
587 store: {
588 type: 'sql',
589 dialect: 'postgres',
590 host: 'localhost',
591 port: 5432,
592 username: 'username',
593 password: 'password',
594 dbname: 'template1',
595 tableName: 'tasks'
596 }
597});
598```
599
600Please help us add support for more stores; contributions are welcome!
601
602#### Custom Store
603
604Writing your own store is very easy; you just need to implement a few functions
605then call `queue.use(store)` on your store.
606
607```js
608var q = new Queue(fn, { store: myStore });
609```
610
611or
612
613```js
614q.use(myStore);
615```
616
617Your store needs the following functions:
618```js
619q.use({
620 connect: function (cb) {
621 // Connect to your db
622 },
623 getTask: function (taskId, cb) {
624 // Retrieves a task
625 },
626 putTask: function (taskId, task, priority, cb) {
627 // Save task with given priority
628 },
629 takeFirstN: function (n, cb) {
630 // Removes the first N items (sorted by priority and age)
631 },
632 takeLastN: function (n, cb) {
633 // Removes the last N items (sorted by priority and recency)
634 }
635})
636```
637
638[back to top](#table-of-contents)
639
640---
641
642## Full Documentation
643
644#### new Queue(process, options)
645
646The first argument can be either the process function or the `options` object.
647
648A process function is required, all other options are optional.
649
650- `process` - function to process tasks. Will be run with either one single task (if `batchSize` is 1) or as an array of at most `batchSize` items. The second argument will be a callback `cb(error, result)` that must be called regardless of success or failure.
651
652---
653
654- `filter` - function to filter input. Will be run with `input` whatever was passed to `q.push()`. If you define this function, then you will be expected to call the callback `cb(error, task)`. If an error is sent in the callback then the input is rejected.
655- `merge` - function to merge tasks with the same task ID. Will be run with `oldTask`, `newTask` and a callback `cb(error, mergedTask)`. If you define this function then the callback is expected to be called.
656- `priority` - function to determine the priority of a task. Takes in a task and returns callback `cb(error, priority)`.
657- `precondition` - function that runs a check before processing to ensure it can process the next batch. Takes a callback `cb(error, passOrFail)`.
658
659---
660
661- `id` - The property to use as the task ID. This can be a string or a function (for more complicated IDs). The function `(task, cb)` and must call the callback with `cb(error, taskId)`.
662- `cancelIfRunning` - If true, when a task with the same ID is running, its worker will be cancelled. Defaults to `false`.
663- `autoResume` - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to `true`.
664- `failTaskOnProcessException` - If true, when the process function throws an error the batch fails. Defaults to `true`.
665- `filo` - If true, tasks will be completed in a first in, last out order. Defaults to `false`.
666- `batchSize` - The number of tasks (at most) that can be processed at once. Defaults to `1`.
667- `batchDelay` - Number of milliseconds to delay before starting to popping items off the queue. Defaults to `0`.
668- `batchDelayTimeout` - Number of milliseconds to wait for a new task to arrive before firing off the batch. Defaults to `Infinity`.
669- `concurrent` - Number of workers that can be running at any given time. Defaults to `1`.
670- `maxTimeout` - Number of milliseconds before a task is considered timed out. Defaults to `Infinity`.
671- `afterProcessDelay` - Number of milliseconds to delay before processing the next batch of items. Defaults to `1`.
672- `maxRetries` - Maximum number of attempts to retry on a failed task. Defaults to `0`.
673- `retryDelay` - Number of milliseconds before retrying. Defaults to `0`.
674- `storeMaxRetries` - Maximum number of attempts before giving up on the store. Defaults to `Infinity`.
675- `storeRetryTimeout` - Number of milliseconds to delay before trying to connect to the store again. Defaults to `1000`.
676- `preconditionRetryTimeout` - Number of milliseconds to delay before checking the precondition function again. Defaults to `1000`.
677- `store` - Represents the options for the initial store. Can be an object containing `{ type: storeType, ... options ... }`, or the store instance itself.
678
679#### Methods on Queue
680
681- `push(task, cb)` - Push a task onto the queue, with an optional callback when it completes. Returns a `Ticket` object.
682- `pause()` - Pauses the queue: tries to pause running tasks and prevents tasks from getting processed until resumed.
683- `resume()` - Resumes the queue and its runnign tasks.
684- `destroy(cb)` - Destroys the queue: closes the store, tries to clean up.
685- `use(store)` - Sets the queue to read from and write to the given store.
686- `getStats()` - Gets the aggregate stats for the queue. Returns an object with properties `successRate`, `peak`, `total` and `average`, representing the success rate on tasks, peak number of items queued, total number of items processed and average processing time, respectively.
687- `resetStats()` - Resets all of the aggregate stats.
688
689#### Events on Queue
690
691- `task_queued` - When a task is queued
692- `task_accepted` - When a task is accepted
693- `task_started` - When a task begins processing
694- `task_finish` - When a task is completed
695- `task_failed` - When a task fails
696- `task_progress` - When a task progress changes
697- `batch_finish` - When a batch of tasks (or worker) completes
698- `batch_failed` - When a batch of tasks (or worker) fails
699- `batch_progress` - When a batch of tasks (or worker) updates its progress
700
701#### Events on Ticket
702
703- `accept` - When the corresponding task is accepted (has passed filter)
704- `queued` - When the corresponding task is queued (and saved into the store)
705- `started` - When the corresponding task is started
706- `progress` - When the corresponding task progress changes
707- `finish` - When the corresponding task completes
708- `failed` - When the corresponding task fails
709