UNPKG

8.01 kBPlain TextView Raw
1import reduct = require('reduct')
2
3import { loadModuleOfType, composeMiddleware } from '../lib/utils'
4import { create as createLogger } from '../common/log'
5const log = createLogger('middleware-manager')
6
7import Config from './config'
8import Accounts from './accounts'
9import Core from './core'
10import Stats from './stats'
11import {
12 Middleware,
13 MiddlewareDefinition,
14 MiddlewareMethod,
15 MiddlewareConstructor,
16 Pipeline,
17 Pipelines
18} from '../types/middleware'
19import { PluginInstance, DataHandler, MoneyHandler } from '../types/plugin'
20import MiddlewarePipeline from '../lib/middleware-pipeline'
21import { Errors } from 'ilp-packet'
22const { codes, UnreachableError } = Errors
23
24interface VoidHandler {
25 (dummy: void): Promise<void>
26}
27
28const BUILTIN_MIDDLEWARES: { [key: string]: MiddlewareDefinition } = {
29 errorHandler: {
30 type: 'error-handler'
31 },
32 rateLimit: {
33 type: 'rate-limit'
34 },
35 maxPacketAmount: {
36 type: 'max-packet-amount'
37 },
38 throughput: {
39 type: 'throughput'
40 },
41 balance: {
42 type: 'balance'
43 },
44 deduplicate: {
45 type: 'deduplicate'
46 },
47 expire: {
48 type: 'expire'
49 },
50 validateFulfillment: {
51 type: 'validate-fulfillment'
52 },
53 stats: {
54 type: 'stats'
55 },
56 alert: {
57 type: 'alert'
58 }
59}
60
61export default class MiddlewareManager {
62 protected config: Config
63 protected accounts: Accounts
64 protected core: Core
65 protected middlewares: { [key: string]: Middleware }
66 protected stats: Stats
67 private startupHandlers: Map<string, VoidHandler> = new Map()
68 private teardownHandlers: Map<string, VoidHandler> = new Map()
69 private outgoingDataHandlers: Map<string, DataHandler> = new Map()
70 private outgoingMoneyHandlers: Map<string, MoneyHandler> = new Map()
71 private started: boolean = false
72
73 constructor (deps: reduct.Injector) {
74 this.config = deps(Config)
75 this.accounts = deps(Accounts)
76 this.core = deps(Core)
77 this.stats = deps(Stats)
78
79 const disabledMiddlewareConfig: string[] = this.config.disableMiddleware || []
80 const customMiddlewareConfig: { [key: string]: MiddlewareDefinition } = this.config.middlewares || {}
81
82 this.middlewares = {}
83
84 for (const name of Object.keys(BUILTIN_MIDDLEWARES)) {
85 if (disabledMiddlewareConfig.includes(name)) {
86 continue
87 }
88
89 this.middlewares[name] = this.construct(name, BUILTIN_MIDDLEWARES[name])
90 }
91
92 for (const name of Object.keys(customMiddlewareConfig)) {
93 if (this.middlewares[name]) {
94 throw new Error('custom middleware has same name as built-in middleware. name=' + name)
95 }
96
97 this.middlewares[name] = this.construct(name, customMiddlewareConfig[name])
98 }
99 }
100
101 construct (name: string, definition: MiddlewareDefinition): Middleware {
102 // Custom middleware
103 const Middleware: MiddlewareConstructor =
104 loadModuleOfType('middleware', definition.type)
105
106 return new Middleware(definition.options || {}, {
107 getInfo: accountId => this.accounts.getInfo(accountId),
108 getOwnAddress: () => this.accounts.getOwnAddress(),
109 sendData: this.sendData.bind(this),
110 sendMoney: this.sendMoney.bind(this),
111 stats: this.stats
112 })
113 }
114
115 async setup () {
116 for (const accountId of this.accounts.getAccountIds()) {
117 const plugin = this.accounts.getPlugin(accountId)
118
119 await this.addPlugin(accountId, plugin)
120 }
121 }
122
123 /**
124 * Executes middleware hooks for connector startup.
125 *
126 * This should be called after the plugins are connected
127 */
128 async startup () {
129 this.started = true
130 for (const handler of this.startupHandlers.values()) {
131 await handler(undefined)
132 }
133 }
134
135 async addPlugin (accountId: string, plugin: PluginInstance) {
136 const pipelines: Pipelines = {
137 startup: new MiddlewarePipeline<void, void>(),
138 teardown: new MiddlewarePipeline<void, void>(),
139 incomingData: new MiddlewarePipeline<Buffer, Buffer>(),
140 incomingMoney: new MiddlewarePipeline<string, void>(),
141 outgoingData: new MiddlewarePipeline<Buffer, Buffer>(),
142 outgoingMoney: new MiddlewarePipeline<string, void>()
143 }
144 for (const middlewareName of Object.keys(this.middlewares)) {
145 const middleware = this.middlewares[middlewareName]
146 try {
147 await middleware.applyToPipelines(pipelines, accountId)
148 } catch (err) {
149 const errInfo = (err && typeof err === 'object' && err.stack) ? err.stack : String(err)
150
151 log.error('failed to apply middleware to account. middlewareName=%s accountId=%s error=%s', middlewareName, accountId, errInfo)
152 throw new Error('failed to apply middleware. middlewareName=' + middlewareName)
153 }
154 }
155
156 // Generate outgoing middleware
157 const submitData = async (data: Buffer) => {
158 try {
159 return await plugin.sendData(data)
160 } catch (e) {
161 let err = e
162 if (!err || typeof err !== 'object') {
163 err = new Error('non-object thrown. value=' + e)
164 }
165
166 if (!err.ilpErrorCode) {
167 err.ilpErrorCode = codes.F02_UNREACHABLE
168 }
169
170 err.message = 'failed to send packet: ' + err.message
171
172 throw err
173 }
174 }
175 const submitMoney = plugin.sendMoney.bind(plugin)
176 const startupHandler = this.createHandler(pipelines.startup, accountId, async () => { return })
177 const teardownHandler = this.createHandler(pipelines.teardown, accountId, async () => { return })
178 const outgoingDataHandler: DataHandler =
179 this.createHandler(pipelines.outgoingData, accountId, submitData)
180 const outgoingMoneyHandler: MoneyHandler =
181 this.createHandler(pipelines.outgoingMoney, accountId, submitMoney)
182
183 this.startupHandlers.set(accountId, startupHandler)
184 this.teardownHandlers.set(accountId, teardownHandler)
185 this.outgoingDataHandlers.set(accountId, outgoingDataHandler)
186 this.outgoingMoneyHandlers.set(accountId, outgoingMoneyHandler)
187
188 // Generate incoming middleware
189 const handleData: DataHandler = (data: Buffer) => this.core.processData(data, accountId, this.sendData.bind(this))
190 const handleMoney: MoneyHandler = async () => void 0
191 const incomingDataHandler: DataHandler =
192 this.createHandler(pipelines.incomingData, accountId, handleData)
193 const incomingMoneyHandler: MoneyHandler =
194 this.createHandler(pipelines.incomingMoney, accountId, handleMoney)
195
196 plugin.registerDataHandler(incomingDataHandler)
197 plugin.registerMoneyHandler(incomingMoneyHandler)
198
199 if (this.started) {
200 // If the plugin is being added dynamically (after connector init),
201 // make sure it's startup pipeline is run.
202 await startupHandler(undefined)
203 }
204 }
205
206 async removePlugin (accountId: string, plugin: PluginInstance) {
207 plugin.deregisterDataHandler()
208 plugin.deregisterMoneyHandler()
209
210 this.startupHandlers.delete(accountId)
211 const teardownHandler = this.teardownHandlers.get(accountId)
212 if (teardownHandler) await teardownHandler(undefined)
213 this.teardownHandlers.delete(accountId)
214 this.outgoingDataHandlers.delete(accountId)
215 this.outgoingMoneyHandlers.delete(accountId)
216 }
217
218 async sendData (data: Buffer, accountId: string) {
219 const handler = this.outgoingDataHandlers.get(accountId)
220
221 if (!handler) {
222 throw new UnreachableError('tried to send data to non-existent account. accountId=' + accountId)
223 }
224
225 return handler(data)
226 }
227
228 async sendMoney (amount: string, accountId: string) {
229 const handler = this.outgoingMoneyHandlers.get(accountId)
230
231 if (!handler) {
232 throw new UnreachableError('tried to send money to non-existent account. accountId=' + accountId)
233 }
234
235 return handler(amount)
236 }
237
238 getMiddleware (name: string): Middleware | undefined {
239 return this.middlewares[name]
240 }
241
242 private createHandler<T,U> (pipeline: Pipeline<T,U>, accountId: string, next: (param: T) => Promise<U>): (param: T) => Promise<U> {
243 const middleware: MiddlewareMethod<T,U> = composeMiddleware(pipeline.getMethods())
244
245 return (param: T) => middleware(param, next)
246 }
247}