1 | 'use strict'
|
2 | module.exports = RunQueue
|
3 |
|
4 | var validate = require('aproba')
|
5 |
|
6 | function RunQueue (opts) {
|
7 | validate('Z|O', [opts])
|
8 | if (!opts) opts = {}
|
9 | this.finished = false
|
10 | this.inflight = 0
|
11 | this.maxConcurrency = opts.maxConcurrency || 1
|
12 | this.queued = 0
|
13 | this.queue = []
|
14 | this.currentPrio = null
|
15 | this.currentQueue = null
|
16 | this.Promise = opts.Promise || global.Promise
|
17 | this.deferred = {}
|
18 | }
|
19 |
|
20 | RunQueue.prototype = {}
|
21 |
|
22 | RunQueue.prototype.run = function () {
|
23 | if (arguments.length !== 0) throw new Error('RunQueue.run takes no arguments')
|
24 | var self = this
|
25 | var deferred = this.deferred
|
26 | if (!deferred.promise) {
|
27 | deferred.promise = new this.Promise(function (resolve, reject) {
|
28 | deferred.resolve = resolve
|
29 | deferred.reject = reject
|
30 | self._runQueue()
|
31 | })
|
32 | }
|
33 | return deferred.promise
|
34 | }
|
35 |
|
36 | RunQueue.prototype._runQueue = function () {
|
37 | var self = this
|
38 |
|
39 | while ((this.inflight < this.maxConcurrency) && this.queued) {
|
40 | if (!this.currentQueue || this.currentQueue.length === 0) {
|
41 |
|
42 |
|
43 | if (this.inflight) return
|
44 | var prios = Object.keys(this.queue)
|
45 | for (var ii = 0; ii < prios.length; ++ii) {
|
46 | var prioQueue = this.queue[prios[ii]]
|
47 | if (prioQueue.length) {
|
48 | this.currentQueue = prioQueue
|
49 | this.currentPrio = prios[ii]
|
50 | break
|
51 | }
|
52 | }
|
53 | }
|
54 |
|
55 | --this.queued
|
56 | ++this.inflight
|
57 | var next = this.currentQueue.shift()
|
58 | var args = next.args || []
|
59 |
|
60 |
|
61 |
|
62 | var queueEntry = new this.Promise(function (resolve) {
|
63 | resolve(next.cmd.apply(null, args))
|
64 | })
|
65 |
|
66 | queueEntry.then(function () {
|
67 | --self.inflight
|
68 | if (self.finished) return
|
69 | if (self.queued <= 0 && self.inflight <= 0) {
|
70 | self.finished = true
|
71 | self.deferred.resolve()
|
72 | }
|
73 | self._runQueue()
|
74 | }, function (err) {
|
75 | self.finished = true
|
76 | self.deferred.reject(err)
|
77 | })
|
78 | }
|
79 | }
|
80 |
|
81 | RunQueue.prototype.add = function (prio, cmd, args) {
|
82 | if (this.finished) throw new Error("Can't add to a finished queue. Create a new queue.")
|
83 | if (Math.abs(Math.floor(prio)) !== prio) throw new Error('Priorities must be a positive integer value.')
|
84 | validate('NFA|NFZ', [prio, cmd, args])
|
85 | prio = Number(prio)
|
86 | if (!this.queue[prio]) this.queue[prio] = []
|
87 | ++this.queued
|
88 | this.queue[prio].push({cmd: cmd, args: args})
|
89 |
|
90 |
|
91 | if (this.currentPrio > prio) {
|
92 | this.currentQueue = this.queue[prio]
|
93 | this.currentPrio = prio
|
94 | }
|
95 | }
|