1 | import { create as createLogger } from '../common/log'
|
2 | import { createHash } from 'crypto'
|
3 | import * as IlpPacket from 'ilp-packet'
|
4 | import { Middleware, MiddlewareCallback, MiddlewareServices, Pipelines } from '../types/middleware'
|
5 | import { AccountInfo } from '../types/accounts'
|
6 | import BigNumber from 'bignumber.js'
|
7 |
|
8 |
|
9 | const STATIC_DATA_OFFSET = 25
|
10 |
|
11 | const DEFAULT_CLEANUP_INTERVAL = 30000
|
12 | const DEFAULT_PACKET_LIFETIME = 30000
|
13 |
|
14 | interface CachedPacket {
|
15 | amount: string,
|
16 | expiresAt: Date,
|
17 | promise: Promise<Buffer>
|
18 | }
|
19 |
|
20 | export default class DeduplicateMiddleware implements Middleware {
|
21 | private packetCache: Map<string, CachedPacket> = new Map()
|
22 | private getInfo: (accountId: string) => AccountInfo
|
23 |
|
24 | constructor (opts: {}, { getInfo }: MiddlewareServices) {
|
25 | this.getInfo = getInfo
|
26 | }
|
27 |
|
28 | async applyToPipelines (pipelines: Pipelines, accountId: string) {
|
29 | const log = createLogger(`deduplicate-middleware[${accountId}]`)
|
30 | const accountInfo = this.getInfo(accountId)
|
31 | if (!accountInfo) {
|
32 | throw new Error('account info unavailable. accountId=' + accountId)
|
33 | }
|
34 |
|
35 | const {
|
36 | cleanupInterval,
|
37 | packetLifetime
|
38 | } = accountInfo.deduplicate || {
|
39 | cleanupInterval: DEFAULT_CLEANUP_INTERVAL,
|
40 | packetLifetime: DEFAULT_PACKET_LIFETIME
|
41 | }
|
42 |
|
43 | let interval: NodeJS.Timer
|
44 | pipelines.startup.insertLast({
|
45 | name: 'deduplicate',
|
46 | method: async (dummy: void, next: MiddlewareCallback<void, void>) => {
|
47 | interval = setInterval(() => this.cleanupCache(packetLifetime), cleanupInterval)
|
48 | return next(dummy)
|
49 | }
|
50 | })
|
51 |
|
52 | pipelines.teardown.insertLast({
|
53 | name: 'deduplicate',
|
54 | method: async (dummy: void, next: MiddlewareCallback<void, void>) => {
|
55 | clearInterval(interval)
|
56 | return next(dummy)
|
57 | }
|
58 | })
|
59 |
|
60 | pipelines.outgoingData.insertLast({
|
61 | name: 'deduplicate',
|
62 | method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
|
63 | if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
|
64 | const { contents } = IlpPacket.deserializeEnvelope(data)
|
65 |
|
66 | const index = createHash('sha256')
|
67 | .update(contents.slice(STATIC_DATA_OFFSET))
|
68 | .digest()
|
69 | .slice(0, 16)
|
70 | .toString('base64')
|
71 |
|
72 | const { amount, expiresAt } = IlpPacket.deserializeIlpPrepare(data)
|
73 |
|
74 | const cachedPacket = this.packetCache.get(index)
|
75 | if (cachedPacket) {
|
76 |
|
77 | if (new BigNumber(cachedPacket.amount).gte(amount) && cachedPacket.expiresAt >= expiresAt) {
|
78 | log.warn('deduplicate packet cache hit. accountId=%s elapsed=%s amount=%s', accountId, cachedPacket.expiresAt.getTime() - Date.now(), amount)
|
79 | return cachedPacket.promise
|
80 | }
|
81 | }
|
82 |
|
83 | const promise = next(data)
|
84 |
|
85 | this.packetCache.set(index, {
|
86 | amount,
|
87 | expiresAt,
|
88 | promise
|
89 | })
|
90 |
|
91 | return promise
|
92 | }
|
93 |
|
94 | return next(data)
|
95 | }
|
96 | })
|
97 | }
|
98 |
|
99 | private cleanupCache (packetLifetime: number) {
|
100 | const now = Date.now()
|
101 | for (const index of this.packetCache.keys()) {
|
102 | const cachedPacket = this.packetCache.get(index)
|
103 | if (!cachedPacket) continue
|
104 | const packetExpiry = cachedPacket.expiresAt.getTime() + packetLifetime
|
105 | if (packetExpiry < now) {
|
106 | this.packetCache.delete(index)
|
107 | }
|
108 | }
|
109 | }
|
110 | }
|