1 | import { create as createLogger } from '../common/log'
|
2 | const log = createLogger('throughput-middleware')
|
3 | import { Middleware, MiddlewareCallback, MiddlewareServices, Pipelines } from '../types/middleware'
|
4 | import { AccountInfo } from '../types/accounts'
|
5 | import TokenBucket from '../lib/token-bucket'
|
6 | import * as IlpPacket from 'ilp-packet'
|
7 | const { InsufficientLiquidityError } = IlpPacket.Errors
|
8 |
|
9 | const DEFAULT_REFILL_PERIOD = 1000
|
10 |
|
11 | export default class ThroughputMiddleware implements Middleware {
|
12 | private getInfo: (accountId: string) => AccountInfo
|
13 |
|
14 | constructor (opts: {}, { getInfo }: MiddlewareServices) {
|
15 | this.getInfo = getInfo
|
16 | }
|
17 |
|
18 | async applyToPipelines (pipelines: Pipelines, accountId: string) {
|
19 | const accountInfo = this.getInfo(accountId)
|
20 | if (!accountInfo) {
|
21 | throw new Error('could not load info for account. accountId=' + accountId)
|
22 | }
|
23 |
|
24 | if (accountInfo.throughput) {
|
25 | const {
|
26 | refillPeriod = DEFAULT_REFILL_PERIOD,
|
27 | incomingAmount = false,
|
28 | outgoingAmount = false
|
29 | } = accountInfo.throughput || {}
|
30 |
|
31 | if (incomingAmount) {
|
32 |
|
33 |
|
34 | const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(incomingAmount) })
|
35 | log.trace('created incoming amount limit token bucket for account. accountId=%s refillPeriod=%s incomingAmount=%s', accountId, refillPeriod, incomingAmount)
|
36 |
|
37 | pipelines.incomingData.insertLast({
|
38 | name: 'throughput',
|
39 | method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
|
40 | if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
|
41 | const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
|
42 |
|
43 |
|
44 | if (!incomingBucket.take(Number(parsedPacket.amount))) {
|
45 | throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
|
46 | }
|
47 |
|
48 | return next(data)
|
49 | } else {
|
50 | return next(data)
|
51 | }
|
52 | }
|
53 | })
|
54 | }
|
55 |
|
56 | if (outgoingAmount) {
|
57 |
|
58 |
|
59 | const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(outgoingAmount) })
|
60 | log.trace('created outgoing amount limit token bucket for account. accountId=%s refillPeriod=%s outgoingAmount=%s', accountId, refillPeriod, outgoingAmount)
|
61 |
|
62 | pipelines.outgoingData.insertLast({
|
63 | name: 'throughput',
|
64 | method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
|
65 | if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
|
66 | const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
|
67 |
|
68 |
|
69 | if (!incomingBucket.take(Number(parsedPacket.amount))) {
|
70 | throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
|
71 | }
|
72 |
|
73 | return next(data)
|
74 | } else {
|
75 | return next(data)
|
76 | }
|
77 | }
|
78 | })
|
79 | }
|
80 | }
|
81 | }
|
82 | }
|