UNPKG

3.04 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7var _promise = require('babel-runtime/core-js/promise');
8
9var _promise2 = _interopRequireDefault(_promise);
10
11function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
12
13const debug = require('debug')('graphbrainz:rate-limit');
14
15class RateLimit {
16 constructor({
17 limit = 1,
18 period = 1000,
19 concurrency = limit || 1,
20 defaultPriority = 1
21 } = {}) {
22 this.limit = limit;
23 this.period = period;
24 this.defaultPriority = defaultPriority;
25 this.concurrency = concurrency;
26 this.queues = [];
27 this.numPending = 0;
28 this.periodStart = null;
29 this.periodCapacity = this.limit;
30 this.timer = null;
31 this.pendingFlush = false;
32 this.prevTaskID = null;
33 }
34
35 nextTaskID(prevTaskID = this.prevTaskID) {
36 const id = (prevTaskID || 0) + 1;
37 this.prevTaskID = id;
38 return id;
39 }
40
41 enqueue(fn, args, priority = this.defaultPriority) {
42 priority = Math.max(0, priority);
43 return new _promise2.default((resolve, reject) => {
44 const queue = this.queues[priority] = this.queues[priority] || [];
45 const id = this.nextTaskID();
46 debug(`Enqueuing task. id=${id} priority=${priority}`);
47 queue.push({ fn, args, resolve, reject, id });
48 if (!this.pendingFlush) {
49 this.pendingFlush = true;
50 process.nextTick(() => {
51 this.pendingFlush = false;
52 this.flush();
53 });
54 }
55 });
56 }
57
58 dequeue() {
59 let task;
60 for (let i = this.queues.length - 1; i >= 0; i--) {
61 const queue = this.queues[i];
62 if (queue && queue.length) {
63 task = queue.shift();
64 }
65 if (!queue || !queue.length) {
66 this.queues.length = i;
67 }
68 if (task) {
69 break;
70 }
71 }
72 return task;
73 }
74
75 flush() {
76 if (this.numPending < this.concurrency && this.periodCapacity > 0) {
77 const task = this.dequeue();
78 if (task) {
79 const { resolve, reject, fn, args, id } = task;
80 if (this.timer == null) {
81 const now = Date.now();
82 let timeout = this.period;
83 if (this.periodStart != null) {
84 const delay = now - (this.periodStart + timeout);
85 if (delay > 0 && delay <= timeout) {
86 timeout -= delay;
87 }
88 }
89 this.periodStart = now;
90 this.timer = setTimeout(() => {
91 this.timer = null;
92 this.periodCapacity = this.limit;
93 this.flush();
94 }, timeout);
95 }
96 this.numPending += 1;
97 this.periodCapacity -= 1;
98 const onResolve = value => {
99 this.numPending -= 1;
100 resolve(value);
101 this.flush();
102 };
103 const onReject = err => {
104 this.numPending -= 1;
105 reject(err);
106 this.flush();
107 };
108 debug(`Running task. id=${id}`);
109 fn(...args).then(onResolve, onReject);
110 this.flush();
111 }
112 }
113 }
114}
115exports.default = RateLimit;
\No newline at end of file