1 | import PrefixMap from './prefix-map'
|
2 | import { IncomingRoute } from '../types/routing'
|
3 | import { create as createLogger, ConnectorLogger } from '../common/log'
|
4 | import { Type, deserializeIlpReject } from 'ilp-packet'
|
5 | import {
|
6 | CcpRouteControlRequest,
|
7 | CcpRouteUpdateRequest,
|
8 | Mode,
|
9 | serializeCcpRouteControlRequest
|
10 | } from 'ilp-protocol-ccp'
|
11 | import { PluginInstance } from '../types/plugin'
|
12 |
|
13 | export interface CcpReceiverOpts {
|
14 | plugin: PluginInstance
|
15 | accountId: string
|
16 | }
|
17 |
|
18 | const ROUTE_CONTROL_RETRY_INTERVAL = 30000
|
19 |
|
20 | export default class CcpReceiver {
|
21 | private plugin: PluginInstance
|
22 | private log: ConnectorLogger
|
23 | private accountId: string
|
24 | private routes: PrefixMap<IncomingRoute>
|
25 | private expiry: number = 0
|
26 |
|
27 | |
28 |
|
29 |
|
30 |
|
31 |
|
32 | private routingTableId: string = '00000000-0000-0000-0000-000000000000'
|
33 | |
34 |
|
35 |
|
36 | private epoch: number = 0
|
37 |
|
38 | constructor ({ plugin, accountId }: CcpReceiverOpts) {
|
39 | this.plugin = plugin
|
40 | this.log = createLogger(`ccp-receiver[${accountId}]`)
|
41 | this.accountId = accountId
|
42 | this.routes = new PrefixMap()
|
43 | }
|
44 |
|
45 | bump (holdDownTime: number) {
|
46 | this.expiry = Math.max(Date.now() + holdDownTime, this.expiry)
|
47 | }
|
48 |
|
49 | getAccountId () {
|
50 | return this.accountId
|
51 | }
|
52 |
|
53 | getExpiry () {
|
54 | return this.expiry
|
55 | }
|
56 |
|
57 | getPrefixes () {
|
58 | return this.routes.keys()
|
59 | }
|
60 |
|
61 | getRoutingTableId () {
|
62 | return this.routingTableId
|
63 | }
|
64 |
|
65 | getEpoch () {
|
66 | return this.epoch
|
67 | }
|
68 |
|
69 | getStatus () {
|
70 | return {
|
71 | routingTableId: this.routingTableId,
|
72 | epoch: this.epoch
|
73 | }
|
74 | }
|
75 |
|
76 | handleRouteUpdate ({
|
77 | speaker,
|
78 | routingTableId,
|
79 | fromEpochIndex,
|
80 | toEpochIndex,
|
81 | holdDownTime,
|
82 | newRoutes,
|
83 | withdrawnRoutes
|
84 | }: CcpRouteUpdateRequest): string[] {
|
85 | this.bump(holdDownTime)
|
86 |
|
87 | if (this.routingTableId !== routingTableId) {
|
88 | this.log.trace('saw new routing table. oldId=%s newId=%s', this.routingTableId, routingTableId)
|
89 | this.routingTableId = routingTableId
|
90 | this.epoch = 0
|
91 | }
|
92 |
|
93 | if (fromEpochIndex > this.epoch) {
|
94 |
|
95 | this.log.trace('gap in routing updates. expectedEpoch=%s actualFromEpoch=%s', this.epoch, fromEpochIndex)
|
96 | return []
|
97 | }
|
98 | if (this.epoch > toEpochIndex) {
|
99 |
|
100 | this.log.trace('old routing update, ignoring. expectedEpoch=%s actualToEpoch=%s', this.epoch, toEpochIndex)
|
101 | return []
|
102 | }
|
103 |
|
104 |
|
105 | if (newRoutes.length === 0 && withdrawnRoutes.length === 0) {
|
106 | this.log.trace('pure heartbeat. fromEpoch=%s toEpoch=%s', fromEpochIndex, toEpochIndex)
|
107 | this.epoch = toEpochIndex
|
108 | return []
|
109 | }
|
110 |
|
111 | const changedPrefixes: string[] = []
|
112 | if (withdrawnRoutes.length > 0) {
|
113 | this.log.trace('informed of no longer reachable routes. count=%s routes=%s', withdrawnRoutes.length, withdrawnRoutes)
|
114 | for (const prefix of withdrawnRoutes) {
|
115 | if (this.deleteRoute(prefix)) {
|
116 | changedPrefixes.push(prefix)
|
117 | }
|
118 | }
|
119 | }
|
120 |
|
121 | for (const route of newRoutes) {
|
122 | if (this.addRoute({
|
123 | peer: this.accountId,
|
124 | prefix: route.prefix,
|
125 | path: route.path,
|
126 | auth: route.auth
|
127 | })) {
|
128 | changedPrefixes.push(route.prefix)
|
129 | }
|
130 | }
|
131 |
|
132 | this.epoch = toEpochIndex
|
133 |
|
134 | this.log.trace('applied route update. changedPrefixesCount=%s fromEpoch=%s toEpoch=%s', changedPrefixes.length, fromEpochIndex, toEpochIndex)
|
135 |
|
136 | return changedPrefixes
|
137 | }
|
138 |
|
139 | getPrefix (prefix: string) {
|
140 | return this.routes.get(prefix)
|
141 | }
|
142 |
|
143 | sendRouteControl = () => {
|
144 | if (!this.plugin.isConnected()) {
|
145 | this.log.debug('cannot send route control message, plugin not connected (yet).')
|
146 | return
|
147 | }
|
148 |
|
149 | const routeControl: CcpRouteControlRequest = {
|
150 | mode: Mode.MODE_SYNC,
|
151 | lastKnownRoutingTableId: this.routingTableId,
|
152 | lastKnownEpoch: this.epoch,
|
153 | features: []
|
154 | }
|
155 |
|
156 | this.plugin.sendData(serializeCcpRouteControlRequest(routeControl))
|
157 | .then(data => {
|
158 | if (data[0] === Type.TYPE_ILP_FULFILL) {
|
159 | this.log.trace('successfully sent route control message.')
|
160 | } else if (data[0] === Type.TYPE_ILP_REJECT) {
|
161 | this.log.debug('route control message was rejected. rejection=%j', deserializeIlpReject(data))
|
162 | throw new Error('route control message rejected.')
|
163 | } else {
|
164 | this.log.debug('unknown response packet type. type=' + data[0])
|
165 | throw new Error('route control message returned unknown response.')
|
166 | }
|
167 | })
|
168 | .catch((err: any) => {
|
169 | const errInfo = (err instanceof Object && err.stack) ? err.stack : err
|
170 | this.log.debug('failed to set route control information on peer. error=%s', errInfo)
|
171 |
|
172 | const retryTimeout = setTimeout(this.sendRouteControl, ROUTE_CONTROL_RETRY_INTERVAL)
|
173 |
|
174 | retryTimeout.unref()
|
175 | })
|
176 | }
|
177 |
|
178 | private addRoute (route: IncomingRoute) {
|
179 | this.routes.insert(route.prefix, route)
|
180 |
|
181 |
|
182 | return true
|
183 | }
|
184 |
|
185 | private deleteRoute (prefix: string) {
|
186 | this.routes.delete(prefix)
|
187 |
|
188 |
|
189 | return true
|
190 | }
|
191 | }
|