1 | import { isThrottlingError } from "@aws-sdk/service-error-classification";
|
2 | export class DefaultRateLimiter {
|
3 | constructor(options) {
|
4 | this.currentCapacity = 0;
|
5 | this.enabled = false;
|
6 | this.lastMaxRate = 0;
|
7 | this.measuredTxRate = 0;
|
8 | this.requestCount = 0;
|
9 | this.lastTimestamp = 0;
|
10 | this.timeWindow = 0;
|
11 | this.beta = options?.beta ?? 0.7;
|
12 | this.minCapacity = options?.minCapacity ?? 1;
|
13 | this.minFillRate = options?.minFillRate ?? 0.5;
|
14 | this.scaleConstant = options?.scaleConstant ?? 0.4;
|
15 | this.smooth = options?.smooth ?? 0.8;
|
16 | const currentTimeInSeconds = this.getCurrentTimeInSeconds();
|
17 | this.lastThrottleTime = currentTimeInSeconds;
|
18 | this.lastTxRateBucket = Math.floor(this.getCurrentTimeInSeconds());
|
19 | this.fillRate = this.minFillRate;
|
20 | this.maxCapacity = this.minCapacity;
|
21 | }
|
22 | getCurrentTimeInSeconds() {
|
23 | return Date.now() / 1000;
|
24 | }
|
25 | async getSendToken() {
|
26 | return this.acquireTokenBucket(1);
|
27 | }
|
28 | async acquireTokenBucket(amount) {
|
29 | if (!this.enabled) {
|
30 | return;
|
31 | }
|
32 | this.refillTokenBucket();
|
33 | if (amount > this.currentCapacity) {
|
34 | const delay = ((amount - this.currentCapacity) / this.fillRate) * 1000;
|
35 | await new Promise((resolve) => setTimeout(resolve, delay));
|
36 | }
|
37 | this.currentCapacity = this.currentCapacity - amount;
|
38 | }
|
39 | refillTokenBucket() {
|
40 | const timestamp = this.getCurrentTimeInSeconds();
|
41 | if (!this.lastTimestamp) {
|
42 | this.lastTimestamp = timestamp;
|
43 | return;
|
44 | }
|
45 | const fillAmount = (timestamp - this.lastTimestamp) * this.fillRate;
|
46 | this.currentCapacity = Math.min(this.maxCapacity, this.currentCapacity + fillAmount);
|
47 | this.lastTimestamp = timestamp;
|
48 | }
|
49 | updateClientSendingRate(response) {
|
50 | let calculatedRate;
|
51 | this.updateMeasuredRate();
|
52 | if (isThrottlingError(response)) {
|
53 | const rateToUse = !this.enabled ? this.measuredTxRate : Math.min(this.measuredTxRate, this.fillRate);
|
54 | this.lastMaxRate = rateToUse;
|
55 | this.calculateTimeWindow();
|
56 | this.lastThrottleTime = this.getCurrentTimeInSeconds();
|
57 | calculatedRate = this.cubicThrottle(rateToUse);
|
58 | this.enableTokenBucket();
|
59 | }
|
60 | else {
|
61 | this.calculateTimeWindow();
|
62 | calculatedRate = this.cubicSuccess(this.getCurrentTimeInSeconds());
|
63 | }
|
64 | const newRate = Math.min(calculatedRate, 2 * this.measuredTxRate);
|
65 | this.updateTokenBucketRate(newRate);
|
66 | }
|
67 | calculateTimeWindow() {
|
68 | this.timeWindow = this.getPrecise(Math.pow((this.lastMaxRate * (1 - this.beta)) / this.scaleConstant, 1 / 3));
|
69 | }
|
70 | cubicThrottle(rateToUse) {
|
71 | return this.getPrecise(rateToUse * this.beta);
|
72 | }
|
73 | cubicSuccess(timestamp) {
|
74 | return this.getPrecise(this.scaleConstant * Math.pow(timestamp - this.lastThrottleTime - this.timeWindow, 3) + this.lastMaxRate);
|
75 | }
|
76 | enableTokenBucket() {
|
77 | this.enabled = true;
|
78 | }
|
79 | updateTokenBucketRate(newRate) {
|
80 | this.refillTokenBucket();
|
81 | this.fillRate = Math.max(newRate, this.minFillRate);
|
82 | this.maxCapacity = Math.max(newRate, this.minCapacity);
|
83 | this.currentCapacity = Math.min(this.currentCapacity, this.maxCapacity);
|
84 | }
|
85 | updateMeasuredRate() {
|
86 | const t = this.getCurrentTimeInSeconds();
|
87 | const timeBucket = Math.floor(t * 2) / 2;
|
88 | this.requestCount++;
|
89 | if (timeBucket > this.lastTxRateBucket) {
|
90 | const currentRate = this.requestCount / (timeBucket - this.lastTxRateBucket);
|
91 | this.measuredTxRate = this.getPrecise(currentRate * this.smooth + this.measuredTxRate * (1 - this.smooth));
|
92 | this.requestCount = 0;
|
93 | this.lastTxRateBucket = timeBucket;
|
94 | }
|
95 | }
|
96 | getPrecise(num) {
|
97 | return parseFloat(num.toFixed(8));
|
98 | }
|
99 | }
|