UNPKG

3.66 kBPlain TextView Raw
1import { create as createLogger } from '../common/log'
2import { createHash } from 'crypto'
3import * as IlpPacket from 'ilp-packet'
4import { Middleware, MiddlewareCallback, MiddlewareServices, Pipelines } from '../types/middleware'
5import { AccountInfo } from '../types/accounts'
6import BigNumber from 'bignumber.js'
7
8// Where in the ILP packet does the static data begin (i.e. the data that is not modified hop-to-hop)
9const STATIC_DATA_OFFSET = 25 // 8 byte amount + 17 byte expiry date
10
11const DEFAULT_CLEANUP_INTERVAL = 30000
12const DEFAULT_PACKET_LIFETIME = 30000
13
14interface CachedPacket {
15 amount: string,
16 expiresAt: Date,
17 promise: Promise<Buffer>
18}
19
20export default class DeduplicateMiddleware implements Middleware {
21 private packetCache: Map<string, CachedPacket> = new Map()
22 private getInfo: (accountId: string) => AccountInfo
23
24 constructor (opts: {}, { getInfo }: MiddlewareServices) {
25 this.getInfo = getInfo
26 }
27
28 async applyToPipelines (pipelines: Pipelines, accountId: string) {
29 const log = createLogger(`deduplicate-middleware[${accountId}]`)
30 const accountInfo = this.getInfo(accountId)
31 if (!accountInfo) {
32 throw new Error('account info unavailable. accountId=' + accountId)
33 }
34
35 const {
36 cleanupInterval,
37 packetLifetime
38 } = accountInfo.deduplicate || {
39 cleanupInterval: DEFAULT_CLEANUP_INTERVAL,
40 packetLifetime: DEFAULT_PACKET_LIFETIME
41 }
42
43 let interval: NodeJS.Timer
44 pipelines.startup.insertLast({
45 name: 'deduplicate',
46 method: async (dummy: void, next: MiddlewareCallback<void, void>) => {
47 interval = setInterval(() => this.cleanupCache(packetLifetime), cleanupInterval)
48 return next(dummy)
49 }
50 })
51
52 pipelines.teardown.insertLast({
53 name: 'deduplicate',
54 method: async (dummy: void, next: MiddlewareCallback<void, void>) => {
55 clearInterval(interval)
56 return next(dummy)
57 }
58 })
59
60 pipelines.outgoingData.insertLast({
61 name: 'deduplicate',
62 method: async (data: Buffer, next: MiddlewareCallback<Buffer, Buffer>) => {
63 if (data[0] === IlpPacket.Type.TYPE_ILP_PREPARE) {
64 const { contents } = IlpPacket.deserializeEnvelope(data)
65
66 const index = createHash('sha256')
67 .update(contents.slice(STATIC_DATA_OFFSET))
68 .digest()
69 .slice(0, 16) // 128 bits is enough and saves some memory
70 .toString('base64')
71
72 const { amount, expiresAt } = IlpPacket.deserializeIlpPrepare(data)
73
74 const cachedPacket = this.packetCache.get(index)
75 if (cachedPacket) {
76 // We have seen this packet before, let's check if previous amount and expiresAt were larger
77 if (new BigNumber(cachedPacket.amount).gte(amount) && cachedPacket.expiresAt >= expiresAt) {
78 log.warn('deduplicate packet cache hit. accountId=%s elapsed=%s amount=%s', accountId, cachedPacket.expiresAt.getTime() - Date.now(), amount)
79 return cachedPacket.promise
80 }
81 }
82
83 const promise = next(data)
84
85 this.packetCache.set(index, {
86 amount,
87 expiresAt,
88 promise
89 })
90
91 return promise
92 }
93
94 return next(data)
95 }
96 })
97 }
98
99 private cleanupCache (packetLifetime: number) {
100 const now = Date.now()
101 for (const index of this.packetCache.keys()) {
102 const cachedPacket = this.packetCache.get(index)
103 if (!cachedPacket) continue
104 const packetExpiry = cachedPacket.expiresAt.getTime() + packetLifetime
105 if (packetExpiry < now) {
106 this.packetCache.delete(index)
107 }
108 }
109 }
110}