UNPKG

1.11 kBJavaScriptView Raw
1module.exports = class Queue {
2 /**
3 * A really simple queue with concurrency.
4 *
5 * @param {Function(Object, Function)} worker
6 * @param {Object} options
7 */
8 constructor(worker, options) {
9 this._worker = worker;
10 options = options || {};
11 this._concurrency = options.concurrency || 1;
12 this.tasks = [];
13 this.total = 0;
14 this.active = 0;
15 }
16
17
18 /**
19 * Push a task to the queue.
20 *
21 * @param {Object} item
22 * @param {Function(Error)} callback
23 */
24 push(item, callback) {
25 this.tasks.push({ item, callback });
26 this.total++;
27 this._next();
28 }
29
30
31 /**
32 * Process next job in queue.
33 */
34 _next() {
35 if (this.active >= this._concurrency || !this.tasks.length) { return; }
36 const { item, callback } = this.tasks.shift();
37 let callbackCalled = false;
38 this.active++;
39 this._worker(item, (err) => {
40 if (callbackCalled) { return; }
41 this.active--;
42 callbackCalled = true;
43 if (callback) { callback(err); }
44 this._next();
45 });
46 }
47
48
49 /**
50 * Stops processing queued jobs.
51 */
52 die() {
53 this.tasks = [];
54 }
55};