UNPKG

3.72 kBJavaScriptView Raw
1/**
2 * Array.prototype.findIndex ponyfill for old browsers.
3 */
4function findIndex (array, predicate) {
5 for (let i = 0; i < array.length; i++) {
6 if (predicate(array[i])) return i
7 }
8 return -1
9}
10
11function createCancelError () {
12 return new Error('Cancelled')
13}
14
15module.exports = class RateLimitedQueue {
16 constructor (limit) {
17 if (typeof limit !== 'number' || limit === 0) {
18 this.limit = Infinity
19 } else {
20 this.limit = limit
21 }
22
23 this.activeRequests = 0
24 this.queuedHandlers = []
25 }
26
27 _call (fn) {
28 this.activeRequests += 1
29
30 let done = false
31
32 let cancelActive
33 try {
34 cancelActive = fn()
35 } catch (err) {
36 this.activeRequests -= 1
37 throw err
38 }
39
40 return {
41 abort: () => {
42 if (done) return
43 done = true
44 this.activeRequests -= 1
45 cancelActive()
46 this._queueNext()
47 },
48
49 done: () => {
50 if (done) return
51 done = true
52 this.activeRequests -= 1
53 this._queueNext()
54 }
55 }
56 }
57
58 _queueNext () {
59 // Do it soon but not immediately, this allows clearing out the entire queue synchronously
60 // one by one without continuously _advancing_ it (and starting new tasks before immediately
61 // aborting them)
62 Promise.resolve().then(() => {
63 this._next()
64 })
65 }
66
67 _next () {
68 if (this.activeRequests >= this.limit) {
69 return
70 }
71 if (this.queuedHandlers.length === 0) {
72 return
73 }
74
75 // Dispatch the next request, and update the abort/done handlers
76 // so that cancelling it does the Right Thing (and doesn't just try
77 // to dequeue an already-running request).
78 const next = this.queuedHandlers.shift()
79 const handler = this._call(next.fn)
80 next.abort = handler.abort
81 next.done = handler.done
82 }
83
84 _queue (fn, options = {}) {
85 const handler = {
86 fn,
87 priority: options.priority || 0,
88 abort: () => {
89 this._dequeue(handler)
90 },
91 done: () => {
92 throw new Error('Cannot mark a queued request as done: this indicates a bug')
93 }
94 }
95
96 const index = findIndex(this.queuedHandlers, (other) => {
97 return handler.priority > other.priority
98 })
99 if (index === -1) {
100 this.queuedHandlers.push(handler)
101 } else {
102 this.queuedHandlers.splice(index, 0, handler)
103 }
104 return handler
105 }
106
107 _dequeue (handler) {
108 const index = this.queuedHandlers.indexOf(handler)
109 if (index !== -1) {
110 this.queuedHandlers.splice(index, 1)
111 }
112 }
113
114 run (fn, queueOptions) {
115 if (this.activeRequests < this.limit) {
116 return this._call(fn)
117 }
118 return this._queue(fn, queueOptions)
119 }
120
121 wrapPromiseFunction (fn, queueOptions) {
122 return (...args) => {
123 let queuedRequest
124 const outerPromise = new Promise((resolve, reject) => {
125 queuedRequest = this.run(() => {
126 let cancelError
127 let innerPromise
128 try {
129 innerPromise = Promise.resolve(fn(...args))
130 } catch (err) {
131 innerPromise = Promise.reject(err)
132 }
133
134 innerPromise.then((result) => {
135 if (cancelError) {
136 reject(cancelError)
137 } else {
138 queuedRequest.done()
139 resolve(result)
140 }
141 }, (err) => {
142 if (cancelError) {
143 reject(cancelError)
144 } else {
145 queuedRequest.done()
146 reject(err)
147 }
148 })
149
150 return () => {
151 cancelError = createCancelError()
152 }
153 }, queueOptions)
154 })
155
156 outerPromise.abort = () => {
157 queuedRequest.abort()
158 }
159
160 return outerPromise
161 }
162 }
163}