UNPKG

2.69 kBJavaScriptView Raw
1const debug = require('debug')('graphbrainz:rate-limit')
2
3export default class RateLimit {
4 constructor ({
5 limit = 1,
6 period = 1000,
7 concurrency = limit || 1,
8 defaultPriority = 1
9 } = {}) {
10 this.limit = limit
11 this.period = period
12 this.defaultPriority = defaultPriority
13 this.concurrency = concurrency
14 this.queues = []
15 this.numPending = 0
16 this.periodStart = null
17 this.periodCapacity = this.limit
18 this.timer = null
19 this.pendingFlush = false
20 this.prevTaskID = null
21 }
22
23 nextTaskID (prevTaskID = this.prevTaskID) {
24 const id = (prevTaskID || 0) + 1
25 this.prevTaskID = id
26 return id
27 }
28
29 enqueue (fn, args, priority = this.defaultPriority) {
30 priority = Math.max(0, priority)
31 return new Promise((resolve, reject) => {
32 const queue = this.queues[priority] = this.queues[priority] || []
33 const id = this.nextTaskID()
34 debug(`Enqueuing task. id=${id} priority=${priority}`)
35 queue.push({ fn, args, resolve, reject, id })
36 if (!this.pendingFlush) {
37 this.pendingFlush = true
38 process.nextTick(() => {
39 this.pendingFlush = false
40 this.flush()
41 })
42 }
43 })
44 }
45
46 dequeue () {
47 let task
48 for (let i = this.queues.length - 1; i >= 0; i--) {
49 const queue = this.queues[i]
50 if (queue && queue.length) {
51 task = queue.shift()
52 }
53 if (!queue || !queue.length) {
54 this.queues.length = i
55 }
56 if (task) {
57 break
58 }
59 }
60 return task
61 }
62
63 flush () {
64 if (this.numPending < this.concurrency && this.periodCapacity > 0) {
65 const task = this.dequeue()
66 if (task) {
67 const { resolve, reject, fn, args, id } = task
68 if (this.timer == null) {
69 const now = Date.now()
70 let timeout = this.period
71 if (this.periodStart != null) {
72 const delay = now - (this.periodStart + timeout)
73 if (delay > 0 && delay <= timeout) {
74 timeout -= delay
75 }
76 }
77 this.periodStart = now
78 this.timer = setTimeout(() => {
79 this.timer = null
80 this.periodCapacity = this.limit
81 this.flush()
82 }, timeout)
83 }
84 this.numPending += 1
85 this.periodCapacity -= 1
86 const onResolve = (value) => {
87 this.numPending -= 1
88 resolve(value)
89 this.flush()
90 }
91 const onReject = (err) => {
92 this.numPending -= 1
93 reject(err)
94 this.flush()
95 }
96 debug(`Running task. id=${id}`)
97 fn(...args).then(onResolve, onReject)
98 this.flush()
99 }
100 }
101 }
102}