1 |
|
2 |
|
3 |
|
4 | function findIndex (array, predicate) {
|
5 | for (let i = 0; i < array.length; i++) {
|
6 | if (predicate(array[i])) return i
|
7 | }
|
8 | return -1
|
9 | }
|
10 |
|
11 | function createCancelError () {
|
12 | return new Error('Cancelled')
|
13 | }
|
14 |
|
15 | module.exports = class RateLimitedQueue {
|
16 | constructor (limit) {
|
17 | if (typeof limit !== 'number' || limit === 0) {
|
18 | this.limit = Infinity
|
19 | } else {
|
20 | this.limit = limit
|
21 | }
|
22 |
|
23 | this.activeRequests = 0
|
24 | this.queuedHandlers = []
|
25 | }
|
26 |
|
27 | _call (fn) {
|
28 | this.activeRequests += 1
|
29 |
|
30 | let done = false
|
31 |
|
32 | let cancelActive
|
33 | try {
|
34 | cancelActive = fn()
|
35 | } catch (err) {
|
36 | this.activeRequests -= 1
|
37 | throw err
|
38 | }
|
39 |
|
40 | return {
|
41 | abort: () => {
|
42 | if (done) return
|
43 | done = true
|
44 | this.activeRequests -= 1
|
45 | cancelActive()
|
46 | this._queueNext()
|
47 | },
|
48 |
|
49 | done: () => {
|
50 | if (done) return
|
51 | done = true
|
52 | this.activeRequests -= 1
|
53 | this._queueNext()
|
54 | }
|
55 | }
|
56 | }
|
57 |
|
58 | _queueNext () {
|
59 |
|
60 |
|
61 |
|
62 | Promise.resolve().then(() => {
|
63 | this._next()
|
64 | })
|
65 | }
|
66 |
|
67 | _next () {
|
68 | if (this.activeRequests >= this.limit) {
|
69 | return
|
70 | }
|
71 | if (this.queuedHandlers.length === 0) {
|
72 | return
|
73 | }
|
74 |
|
75 |
|
76 |
|
77 |
|
78 | const next = this.queuedHandlers.shift()
|
79 | const handler = this._call(next.fn)
|
80 | next.abort = handler.abort
|
81 | next.done = handler.done
|
82 | }
|
83 |
|
84 | _queue (fn, options = {}) {
|
85 | const handler = {
|
86 | fn,
|
87 | priority: options.priority || 0,
|
88 | abort: () => {
|
89 | this._dequeue(handler)
|
90 | },
|
91 | done: () => {
|
92 | throw new Error('Cannot mark a queued request as done: this indicates a bug')
|
93 | }
|
94 | }
|
95 |
|
96 | const index = findIndex(this.queuedHandlers, (other) => {
|
97 | return handler.priority > other.priority
|
98 | })
|
99 | if (index === -1) {
|
100 | this.queuedHandlers.push(handler)
|
101 | } else {
|
102 | this.queuedHandlers.splice(index, 0, handler)
|
103 | }
|
104 | return handler
|
105 | }
|
106 |
|
107 | _dequeue (handler) {
|
108 | const index = this.queuedHandlers.indexOf(handler)
|
109 | if (index !== -1) {
|
110 | this.queuedHandlers.splice(index, 1)
|
111 | }
|
112 | }
|
113 |
|
114 | run (fn, queueOptions) {
|
115 | if (this.activeRequests < this.limit) {
|
116 | return this._call(fn)
|
117 | }
|
118 | return this._queue(fn, queueOptions)
|
119 | }
|
120 |
|
121 | wrapPromiseFunction (fn, queueOptions) {
|
122 | return (...args) => {
|
123 | let queuedRequest
|
124 | const outerPromise = new Promise((resolve, reject) => {
|
125 | queuedRequest = this.run(() => {
|
126 | let cancelError
|
127 | let innerPromise
|
128 | try {
|
129 | innerPromise = Promise.resolve(fn(...args))
|
130 | } catch (err) {
|
131 | innerPromise = Promise.reject(err)
|
132 | }
|
133 |
|
134 | innerPromise.then((result) => {
|
135 | if (cancelError) {
|
136 | reject(cancelError)
|
137 | } else {
|
138 | queuedRequest.done()
|
139 | resolve(result)
|
140 | }
|
141 | }, (err) => {
|
142 | if (cancelError) {
|
143 | reject(cancelError)
|
144 | } else {
|
145 | queuedRequest.done()
|
146 | reject(err)
|
147 | }
|
148 | })
|
149 |
|
150 | return () => {
|
151 | cancelError = createCancelError()
|
152 | }
|
153 | }, queueOptions)
|
154 | })
|
155 |
|
156 | outerPromise.abort = () => {
|
157 | queuedRequest.abort()
|
158 | }
|
159 |
|
160 | return outerPromise
|
161 | }
|
162 | }
|
163 | }
|