1 | import reduct = require('reduct')
|
2 |
|
3 | import { loadModuleOfType, composeMiddleware } from '../lib/utils'
|
4 | import { create as createLogger } from '../common/log'
|
5 | const log = createLogger('middleware-manager')
|
6 |
|
7 | import Config from './config'
|
8 | import Accounts from './accounts'
|
9 | import Core from './core'
|
10 | import Stats from './stats'
|
11 | import {
|
12 | Middleware,
|
13 | MiddlewareDefinition,
|
14 | MiddlewareMethod,
|
15 | MiddlewareConstructor,
|
16 | Pipeline,
|
17 | Pipelines
|
18 | } from '../types/middleware'
|
19 | import { PluginInstance, DataHandler, MoneyHandler } from '../types/plugin'
|
20 | import MiddlewarePipeline from '../lib/middleware-pipeline'
|
21 | import { Errors } from 'ilp-packet'
|
22 | const { codes, UnreachableError } = Errors
|
23 |
|
24 | interface VoidHandler {
|
25 | (dummy: void): Promise<void>
|
26 | }
|
27 |
|
28 | const BUILTIN_MIDDLEWARES: { [key: string]: MiddlewareDefinition } = {
|
29 | errorHandler: {
|
30 | type: 'error-handler'
|
31 | },
|
32 | rateLimit: {
|
33 | type: 'rate-limit'
|
34 | },
|
35 | maxPacketAmount: {
|
36 | type: 'max-packet-amount'
|
37 | },
|
38 | throughput: {
|
39 | type: 'throughput'
|
40 | },
|
41 | balance: {
|
42 | type: 'balance'
|
43 | },
|
44 | deduplicate: {
|
45 | type: 'deduplicate'
|
46 | },
|
47 | expire: {
|
48 | type: 'expire'
|
49 | },
|
50 | validateFulfillment: {
|
51 | type: 'validate-fulfillment'
|
52 | },
|
53 | stats: {
|
54 | type: 'stats'
|
55 | },
|
56 | alert: {
|
57 | type: 'alert'
|
58 | }
|
59 | }
|
60 |
|
61 | export default class MiddlewareManager {
|
62 | protected config: Config
|
63 | protected accounts: Accounts
|
64 | protected core: Core
|
65 | protected middlewares: { [key: string]: Middleware }
|
66 | protected stats: Stats
|
67 | private startupHandlers: Map<string, VoidHandler> = new Map()
|
68 | private teardownHandlers: Map<string, VoidHandler> = new Map()
|
69 | private outgoingDataHandlers: Map<string, DataHandler> = new Map()
|
70 | private outgoingMoneyHandlers: Map<string, MoneyHandler> = new Map()
|
71 | private started: boolean = false
|
72 |
|
73 | constructor (deps: reduct.Injector) {
|
74 | this.config = deps(Config)
|
75 | this.accounts = deps(Accounts)
|
76 | this.core = deps(Core)
|
77 | this.stats = deps(Stats)
|
78 |
|
79 | const disabledMiddlewareConfig: string[] = this.config.disableMiddleware || []
|
80 | const customMiddlewareConfig: { [key: string]: MiddlewareDefinition } = this.config.middlewares || {}
|
81 |
|
82 | this.middlewares = {}
|
83 |
|
84 | for (const name of Object.keys(BUILTIN_MIDDLEWARES)) {
|
85 | if (disabledMiddlewareConfig.includes(name)) {
|
86 | continue
|
87 | }
|
88 |
|
89 | this.middlewares[name] = this.construct(name, BUILTIN_MIDDLEWARES[name])
|
90 | }
|
91 |
|
92 | for (const name of Object.keys(customMiddlewareConfig)) {
|
93 | if (this.middlewares[name]) {
|
94 | throw new Error('custom middleware has same name as built-in middleware. name=' + name)
|
95 | }
|
96 |
|
97 | this.middlewares[name] = this.construct(name, customMiddlewareConfig[name])
|
98 | }
|
99 | }
|
100 |
|
101 | construct (name: string, definition: MiddlewareDefinition): Middleware {
|
102 |
|
103 | const Middleware: MiddlewareConstructor =
|
104 | loadModuleOfType('middleware', definition.type)
|
105 |
|
106 | return new Middleware(definition.options || {}, {
|
107 | getInfo: accountId => this.accounts.getInfo(accountId),
|
108 | getOwnAddress: () => this.accounts.getOwnAddress(),
|
109 | sendData: this.sendData.bind(this),
|
110 | sendMoney: this.sendMoney.bind(this),
|
111 | stats: this.stats
|
112 | })
|
113 | }
|
114 |
|
115 | async setup () {
|
116 | for (const accountId of this.accounts.getAccountIds()) {
|
117 | const plugin = this.accounts.getPlugin(accountId)
|
118 |
|
119 | await this.addPlugin(accountId, plugin)
|
120 | }
|
121 | }
|
122 |
|
123 | |
124 |
|
125 |
|
126 |
|
127 |
|
128 | async startup () {
|
129 | this.started = true
|
130 | for (const handler of this.startupHandlers.values()) {
|
131 | await handler(undefined)
|
132 | }
|
133 | }
|
134 |
|
135 | async addPlugin (accountId: string, plugin: PluginInstance) {
|
136 | const pipelines: Pipelines = {
|
137 | startup: new MiddlewarePipeline<void, void>(),
|
138 | teardown: new MiddlewarePipeline<void, void>(),
|
139 | incomingData: new MiddlewarePipeline<Buffer, Buffer>(),
|
140 | incomingMoney: new MiddlewarePipeline<string, void>(),
|
141 | outgoingData: new MiddlewarePipeline<Buffer, Buffer>(),
|
142 | outgoingMoney: new MiddlewarePipeline<string, void>()
|
143 | }
|
144 | for (const middlewareName of Object.keys(this.middlewares)) {
|
145 | const middleware = this.middlewares[middlewareName]
|
146 | try {
|
147 | await middleware.applyToPipelines(pipelines, accountId)
|
148 | } catch (err) {
|
149 | const errInfo = (err && typeof err === 'object' && err.stack) ? err.stack : String(err)
|
150 |
|
151 | log.error('failed to apply middleware to account. middlewareName=%s accountId=%s error=%s', middlewareName, accountId, errInfo)
|
152 | throw new Error('failed to apply middleware. middlewareName=' + middlewareName)
|
153 | }
|
154 | }
|
155 |
|
156 |
|
157 | const submitData = async (data: Buffer) => {
|
158 | try {
|
159 | return await plugin.sendData(data)
|
160 | } catch (e) {
|
161 | let err = e
|
162 | if (!err || typeof err !== 'object') {
|
163 | err = new Error('non-object thrown. value=' + e)
|
164 | }
|
165 |
|
166 | if (!err.ilpErrorCode) {
|
167 | err.ilpErrorCode = codes.F02_UNREACHABLE
|
168 | }
|
169 |
|
170 | err.message = 'failed to send packet: ' + err.message
|
171 |
|
172 | throw err
|
173 | }
|
174 | }
|
175 | const submitMoney = plugin.sendMoney.bind(plugin)
|
176 | const startupHandler = this.createHandler(pipelines.startup, accountId, async () => { return })
|
177 | const teardownHandler = this.createHandler(pipelines.teardown, accountId, async () => { return })
|
178 | const outgoingDataHandler: DataHandler =
|
179 | this.createHandler(pipelines.outgoingData, accountId, submitData)
|
180 | const outgoingMoneyHandler: MoneyHandler =
|
181 | this.createHandler(pipelines.outgoingMoney, accountId, submitMoney)
|
182 |
|
183 | this.startupHandlers.set(accountId, startupHandler)
|
184 | this.teardownHandlers.set(accountId, teardownHandler)
|
185 | this.outgoingDataHandlers.set(accountId, outgoingDataHandler)
|
186 | this.outgoingMoneyHandlers.set(accountId, outgoingMoneyHandler)
|
187 |
|
188 |
|
189 | const handleData: DataHandler = (data: Buffer) => this.core.processData(data, accountId, this.sendData.bind(this))
|
190 | const handleMoney: MoneyHandler = async () => void 0
|
191 | const incomingDataHandler: DataHandler =
|
192 | this.createHandler(pipelines.incomingData, accountId, handleData)
|
193 | const incomingMoneyHandler: MoneyHandler =
|
194 | this.createHandler(pipelines.incomingMoney, accountId, handleMoney)
|
195 |
|
196 | plugin.registerDataHandler(incomingDataHandler)
|
197 | plugin.registerMoneyHandler(incomingMoneyHandler)
|
198 |
|
199 | if (this.started) {
|
200 |
|
201 |
|
202 | await startupHandler(undefined)
|
203 | }
|
204 | }
|
205 |
|
206 | async removePlugin (accountId: string, plugin: PluginInstance) {
|
207 | plugin.deregisterDataHandler()
|
208 | plugin.deregisterMoneyHandler()
|
209 |
|
210 | this.startupHandlers.delete(accountId)
|
211 | const teardownHandler = this.teardownHandlers.get(accountId)
|
212 | if (teardownHandler) await teardownHandler(undefined)
|
213 | this.teardownHandlers.delete(accountId)
|
214 | this.outgoingDataHandlers.delete(accountId)
|
215 | this.outgoingMoneyHandlers.delete(accountId)
|
216 | }
|
217 |
|
218 | async sendData (data: Buffer, accountId: string) {
|
219 | const handler = this.outgoingDataHandlers.get(accountId)
|
220 |
|
221 | if (!handler) {
|
222 | throw new UnreachableError('tried to send data to non-existent account. accountId=' + accountId)
|
223 | }
|
224 |
|
225 | return handler(data)
|
226 | }
|
227 |
|
228 | async sendMoney (amount: string, accountId: string) {
|
229 | const handler = this.outgoingMoneyHandlers.get(accountId)
|
230 |
|
231 | if (!handler) {
|
232 | throw new UnreachableError('tried to send money to non-existent account. accountId=' + accountId)
|
233 | }
|
234 |
|
235 | return handler(amount)
|
236 | }
|
237 |
|
238 | getMiddleware (name: string): Middleware | undefined {
|
239 | return this.middlewares[name]
|
240 | }
|
241 |
|
242 | private createHandler<T,U> (pipeline: Pipeline<T,U>, accountId: string, next: (param: T) => Promise<U>): (param: T) => Promise<U> {
|
243 | const middleware: MiddlewareMethod<T,U> = composeMiddleware(pipeline.getMethods())
|
244 |
|
245 | return (param: T) => middleware(param, next)
|
246 | }
|
247 | }
|