1 | const debug = require('debug')('graphbrainz:rate-limit')
|
2 |
|
3 | export default class RateLimit {
|
4 | constructor({
|
5 | limit = 1,
|
6 | period = 1000,
|
7 | concurrency = limit || 1,
|
8 | defaultPriority = 1
|
9 | } = {}) {
|
10 | this.limit = limit
|
11 | this.period = period
|
12 | this.defaultPriority = defaultPriority
|
13 | this.concurrency = concurrency
|
14 | this.queues = []
|
15 | this.numPending = 0
|
16 | this.periodStart = null
|
17 | this.periodCapacity = this.limit
|
18 | this.timer = null
|
19 | this.pendingFlush = false
|
20 | this.prevTaskID = null
|
21 | }
|
22 |
|
23 | nextTaskID(prevTaskID = this.prevTaskID) {
|
24 | const id = (prevTaskID || 0) + 1
|
25 | this.prevTaskID = id
|
26 | return id
|
27 | }
|
28 |
|
29 | enqueue(fn, args, priority = this.defaultPriority) {
|
30 | priority = Math.max(0, priority)
|
31 | return new Promise((resolve, reject) => {
|
32 | const queue = (this.queues[priority] = this.queues[priority] || [])
|
33 | const id = this.nextTaskID()
|
34 | debug(`Enqueuing task. id=${id} priority=${priority}`)
|
35 | queue.push({ fn, args, resolve, reject, id })
|
36 | if (!this.pendingFlush) {
|
37 | this.pendingFlush = true
|
38 | process.nextTick(() => {
|
39 | this.pendingFlush = false
|
40 | this.flush()
|
41 | })
|
42 | }
|
43 | })
|
44 | }
|
45 |
|
46 | dequeue() {
|
47 | let task
|
48 | for (let i = this.queues.length - 1; i >= 0; i--) {
|
49 | const queue = this.queues[i]
|
50 | if (queue && queue.length) {
|
51 | task = queue.shift()
|
52 | }
|
53 | if (!queue || !queue.length) {
|
54 | this.queues.length = i
|
55 | }
|
56 | if (task) {
|
57 | break
|
58 | }
|
59 | }
|
60 | return task
|
61 | }
|
62 |
|
63 | flush() {
|
64 | if (this.numPending < this.concurrency && this.periodCapacity > 0) {
|
65 | const task = this.dequeue()
|
66 | if (task) {
|
67 | const { resolve, reject, fn, args, id } = task
|
68 | if (this.timer == null) {
|
69 | const now = Date.now()
|
70 | let timeout = this.period
|
71 | if (this.periodStart != null) {
|
72 | const delay = now - (this.periodStart + timeout)
|
73 | if (delay > 0 && delay <= timeout) {
|
74 | timeout -= delay
|
75 | }
|
76 | }
|
77 | this.periodStart = now
|
78 | this.timer = setTimeout(() => {
|
79 | this.timer = null
|
80 | this.periodCapacity = this.limit
|
81 | this.flush()
|
82 | }, timeout)
|
83 | }
|
84 | this.numPending += 1
|
85 | this.periodCapacity -= 1
|
86 | const onResolve = value => {
|
87 | this.numPending -= 1
|
88 | resolve(value)
|
89 | this.flush()
|
90 | }
|
91 | const onReject = err => {
|
92 | this.numPending -= 1
|
93 | reject(err)
|
94 | this.flush()
|
95 | }
|
96 | debug(`Running task. id=${id}`)
|
97 | fn(...args).then(onResolve, onReject)
|
98 | this.flush()
|
99 | }
|
100 | }
|
101 | }
|
102 | }
|