UNPKG

3.32 kBPlain TextView Raw
1import { create as createLogger } from '../common/log'
2const log = createLogger('throughput-middleware')
3import { Middleware, MiddlewareCallback, MiddlewareServices, Pipelines } from '../types/middleware'
4import { AccountInfo } from '../types/accounts'
5import TokenBucket from '../lib/token-bucket'
6import * as IlpPacket from 'ilp-packet'
7const { InsufficientLiquidityError } = IlpPacket.Errors
8
9const DEFAULT_REFILL_PERIOD = 1000 // 1 second
10
11export default class ThroughputMiddleware implements Middleware {
12 private getInfo: (accountId: string) => AccountInfo
13
14 constructor (opts: {}, { getInfo }: MiddlewareServices) {
15 this.getInfo = getInfo
16 }
17
18 async applyToPipelines (pipelines: Pipelines, accountId: string) {
19 const accountInfo = this.getInfo(accountId)
20 if (!accountInfo) {
21 throw new Error('could not load info for account. accountId=' + accountId)
22 }
23
24 if (accountInfo.throughput) {
25 const {
26 refillPeriod = DEFAULT_REFILL_PERIOD,
27 incomingAmount = false,
28 outgoingAmount = false
29 } = accountInfo.throughput || {}
30
31 if (incomingAmount) {
32 // TODO: When we add the ability to update middleware, our state will get
33 // reset every update, which may not be desired.
34 const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(incomingAmount) })
35 log.trace('created incoming amount limit token bucket for account. accountId=%s refillPeriod=%s incomingAmount=%s', accountId, refillPeriod, incomingAmount)
36
37 pipelines.incomingData.insertLast({
38 name: 'throughput',
39 method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
40 if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
41 const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
42
43 // TODO: Do we need a BigNumber-based token bucket?
44 if (!incomingBucket.take(Number(parsedPacket.amount))) {
45 throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
46 }
47
48 return next(data)
49 } else {
50 return next(data)
51 }
52 }
53 })
54 }
55
56 if (outgoingAmount) {
57 // TODO: When we add the ability to update middleware, our state will get
58 // reset every update, which may not be desired.
59 const incomingBucket = new TokenBucket({ refillPeriod, refillCount: Number(outgoingAmount) })
60 log.trace('created outgoing amount limit token bucket for account. accountId=%s refillPeriod=%s outgoingAmount=%s', accountId, refillPeriod, outgoingAmount)
61
62 pipelines.outgoingData.insertLast({
63 name: 'throughput',
64 method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
65 if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
66 const parsedPacket = IlpPacket.deserializeIlpPrepare(data)
67
68 // TODO: Do we need a BigNumber-based token bucket?
69 if (!incomingBucket.take(Number(parsedPacket.amount))) {
70 throw new InsufficientLiquidityError('exceeded money bandwidth, throttling.')
71 }
72
73 return next(data)
74 } else {
75 return next(data)
76 }
77 }
78 })
79 }
80 }
81 }
82}