1 | const p = require('path')
|
2 | const { EventEmitter } = require('events')
|
3 |
|
4 | const mkdirp = require('mkdirp')
|
5 | const sub = require('subleveldown')
|
6 | const grpc = require('@grpc/grpc-js')
|
7 | const bjson = require('buffer-json-encoding')
|
8 | const processTop = require('process-top')
|
9 | const varint = require('varint')
|
10 | const Corestore = require('corestore')
|
11 | const HypercoreCache = require('hypercore-cache')
|
12 | const SwarmNetworker = require('corestore-swarm-networking')
|
13 | const HypercoreProtocol = require('hypercore-protocol')
|
14 | const Peersockets = require('peersockets')
|
15 |
|
16 | const { rpc, apiVersion } = require('hyperdrive-daemon-client')
|
17 | const { createMetadata } = require('./lib/metadata')
|
18 | const constants = require('hyperdrive-daemon-client/lib/constants')
|
19 |
|
20 | const DriveManager = require('./lib/drives')
|
21 | const PeersocketManager = require('./lib/peersockets')
|
22 | const PeersManager = require('./lib/peers')
|
23 | const DebugManager = require('./lib/debug')
|
24 | const FuseManager = require('./lib/fuse')
|
25 | const { serverError } = require('./lib/errors')
|
26 | const { getHandlers } = require('./lib/common')
|
27 |
|
28 | const log = require('./lib/log').child({ component: 'server' })
|
29 |
|
30 | const NAMESPACE = 'hyperdrive-daemon'
|
31 | const STOP_EVENTS = ['SIGINT', 'SIGTERM', 'unhandledRejection', 'uncaughtException']
|
32 | const WATCH_LIMIT = 300
|
33 | const MAX_PEERS = 128
|
34 | const SWARM_PORT = 49737
|
35 |
|
36 | const TOTAL_CACHE_SIZE = 1024 * 1024 * 512
|
37 | const CACHE_RATIO = 0.5
|
38 | const TREE_CACHE_SIZE = TOTAL_CACHE_SIZE * CACHE_RATIO
|
39 | const DATA_CACHE_SIZE = TOTAL_CACHE_SIZE * (1 - CACHE_RATIO)
|
40 |
|
41 |
|
42 | try {
|
43 | var hyperfuse = require('hyperdrive-fuse')
|
44 | } catch (err) {}
|
45 |
|
46 | class HyperdriveDaemon extends EventEmitter {
|
47 | constructor (opts = {}) {
|
48 | super()
|
49 |
|
50 | this.opts = opts
|
51 | this.root = opts.storage || constants.root
|
52 | this.storage = p.join(this.root, 'storage')
|
53 |
|
54 | this.port = opts.port || constants.port
|
55 | this.memoryOnly = !!opts.memoryOnly
|
56 | this.noAnnounce = !!opts.noAnnounce
|
57 | this.noDebug = !!opts.noDebug
|
58 |
|
59 | log.info('memory only?', this.memoryOnly, 'no announce?', this.noAnnounce)
|
60 | this._storageProvider = this.memoryOnly ? require('random-access-memory') : require('hypercore-default-storage')
|
61 | this._dbProvider = this.memoryOnly ? require('level-mem') : require('level')
|
62 |
|
63 | const corestoreOpts = {
|
64 | storage: path => this._storageProvider(`${this.storage}/cores/${path}`),
|
65 | sparse: true,
|
66 |
|
67 | stats: true,
|
68 | cache: {
|
69 | data: new HypercoreCache({
|
70 | maxByteSize: DATA_CACHE_SIZE,
|
71 | estimateSize: val => val.length
|
72 | }),
|
73 | tree: new HypercoreCache({
|
74 | maxByteSize: TREE_CACHE_SIZE,
|
75 | estimateSize: val => 40
|
76 | })
|
77 | },
|
78 | ifAvailable: true
|
79 | }
|
80 | this.corestore = new Corestore(corestoreOpts.storage, corestoreOpts)
|
81 |
|
82 | this._networkOpts = {
|
83 | announceLocalAddress: true,
|
84 | preferredPort: opts.swarmPort || SWARM_PORT,
|
85 | maxPeers: opts.maxPeers || MAX_PEERS
|
86 | }
|
87 | const bootstrapOpts = opts.bootstrap || constants.bootstrap
|
88 | if (bootstrapOpts && bootstrapOpts.length && bootstrapOpts[0] !== '') {
|
89 | if (bootstrapOpts === false || bootstrapOpts[0] === 'false') {
|
90 | this._networkOpts.bootstrap = false
|
91 | } else {
|
92 | this._networkOpts.bootstrap = bootstrapOpts
|
93 | }
|
94 | }
|
95 | if (opts.latency !== undefined) this._networkOpts.latency = +opts.latency
|
96 |
|
97 |
|
98 | this.networking = null
|
99 | this.db = null
|
100 | this.drives = null
|
101 | this.fuse = null
|
102 | this.peersockets = null
|
103 | this.debug = null
|
104 | this.metadata = null
|
105 | this._startTime = null
|
106 |
|
107 |
|
108 | this.server = null
|
109 | this._topTimer = null
|
110 | this._dbs = null
|
111 | this._isMain = !!opts.main
|
112 | this._cleanup = null
|
113 |
|
114 | this._isClosed = false
|
115 | this._readyPromise = false
|
116 |
|
117 | this._versions = null
|
118 |
|
119 | this.ready = () => {
|
120 | if (this._isClosed) return Promise.resolve()
|
121 | if (this._readyPromise) return this._readyPromise
|
122 | this._readyPromise = this._ready()
|
123 | return this._readyPromise.catch(err => {
|
124 | log.error({ error: err, stack: err.stack }, 'error in daemon ready function -- cleaning up')
|
125 | return this.stop(err)
|
126 | })
|
127 | }
|
128 | }
|
129 |
|
130 | async _ready () {
|
131 |
|
132 | this.metadata = this.opts.metadata || await createMetadata(this.root, `localhost:${this.port}`)
|
133 | await this._ensureStorage()
|
134 |
|
135 | this._cleanup = this.stop.bind(this)
|
136 | for (const event of STOP_EVENTS) {
|
137 | process.on(event, this._cleanup)
|
138 | }
|
139 |
|
140 | this.db = this._dbProvider(`${this.storage}/db`, { valueEncoding: 'json' })
|
141 | const dbs = {
|
142 | fuse: sub(this.db, 'fuse', { valueEncoding: bjson }),
|
143 | drives: sub(this.db, 'drives', { valueEncoding: bjson }),
|
144 | network: sub(this.db, 'network', { valueEncoding: 'json'})
|
145 | }
|
146 | this._dbs = dbs
|
147 |
|
148 | await this.corestore.ready()
|
149 |
|
150 | const seed = this.corestore._deriveSecret(NAMESPACE, 'replication-keypair')
|
151 | const swarmId = this.corestore._deriveSecret(NAMESPACE, 'swarm-id')
|
152 | this._networkOpts.keyPair = HypercoreProtocol.keyPair(seed)
|
153 | this._networkOpts.id = swarmId
|
154 |
|
155 | this.networking = new SwarmNetworker(this.corestore, this._networkOpts)
|
156 | this.networking.on('replication-error', err => {
|
157 | log.trace({ error: err.message, stack: err.stack }, 'replication error')
|
158 | if (err.message && err.message.indexOf('Remote signature could not be verified') !== -1) {
|
159 | log.warn('Remote signature verification is failing -- one of your hypercores appears to be forked or corrupted.')
|
160 | }
|
161 | })
|
162 | this.networking.on('stream-opened', stream => {
|
163 | log.trace({ remoteType: stream.remoteType, remoteAddress: stream.remoteAddress }, 'replication stream opened')
|
164 | })
|
165 | this.networking.on('stream-closed', stream => {
|
166 | log.trace({ remoteType: stream.remoteType, remoteAddress: stream.remoteAddress }, 'replication stream closed')
|
167 | })
|
168 | await this.networking.listen()
|
169 |
|
170 |
|
171 | this._registerCoreTimeouts()
|
172 |
|
173 | const peersockets = new Peersockets(this.networking)
|
174 | this.peers = new PeersManager(this.networking, peersockets)
|
175 | this.peersockets = new PeersocketManager(this.networking, this.peers, peersockets)
|
176 | if (!this.noDebug) this.debug = new DebugManager(this)
|
177 |
|
178 | this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, {
|
179 | ...this.opts,
|
180 | memoryOnly: this.memoryOnly,
|
181 | watchLimit: this.opts.watchLimit || WATCH_LIMIT
|
182 | })
|
183 | this.drives.on('error', err => this.emit('error', err))
|
184 | await this.drives.ready()
|
185 |
|
186 | this.fuse = new FuseManager(this.drives, this._dbs.fuse, this.opts)
|
187 | this.fuse.on('error', err => this.emit('error', err))
|
188 | await this.fuse.ready()
|
189 |
|
190 | this._isReady = true
|
191 | this._startTime = Date.now()
|
192 | this._versions = {
|
193 | daemon: require('./package.json').version,
|
194 | client: require('hyperdrive-daemon-client/package.json').version,
|
195 | schema: require('hyperdrive-schemas/package.json').version,
|
196 | hyperdrive: require('hyperdrive/package.json').version
|
197 | }
|
198 | if (this.fuse && this.fuse.fuseConfigured) {
|
199 | this._versions.fuseNative = require('fuse-native/package.json').version
|
200 | this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version
|
201 | }
|
202 | }
|
203 |
|
204 | _ensureStorage () {
|
205 | return new Promise((resolve, reject) => {
|
206 | mkdirp(this.storage, err => {
|
207 | if (err) return reject(err)
|
208 | return resolve()
|
209 | })
|
210 | })
|
211 | }
|
212 |
|
213 | |
214 |
|
215 |
|
216 | _registerCoreTimeouts () {
|
217 | const flushSets = new Map()
|
218 |
|
219 | this.networking.on('flushed', dkey => {
|
220 | const keyString = dkey.toString('hex')
|
221 | if (!flushSets.has(keyString)) return
|
222 | const { flushSet, peerAddSet } = flushSets.get(keyString)
|
223 | callAllInSet(flushSet)
|
224 | callAllInSet(peerAddSet)
|
225 | })
|
226 |
|
227 | this.corestore.on('feed', core => {
|
228 | const discoveryKey = core.discoveryKey
|
229 | const peerAddSet = new Set()
|
230 | const flushSet = new Set()
|
231 | var globalFlushed = false
|
232 |
|
233 | this.networking.swarm.flush(() => {
|
234 | if (this.networking.joined(discoveryKey)) return
|
235 | globalFlushed = true
|
236 | callAllInSet(flushSet)
|
237 | callAllInSet(peerAddSet)
|
238 | })
|
239 |
|
240 | flushSets.set(discoveryKey.toString('hex'), { flushSet, peerAddSet })
|
241 | core.once('peer-add', () => callAllInSet(peerAddSet))
|
242 |
|
243 | const timeouts = {
|
244 | get: (cb) => {
|
245 | if (this.networking.joined(discoveryKey)) {
|
246 | if (this.networking.flushed(discoveryKey)) return cb()
|
247 | return flushSet.add(cb)
|
248 | }
|
249 | if (globalFlushed) return cb()
|
250 | return flushSet.add(cb)
|
251 | },
|
252 | update: (cb) => {
|
253 | if (core.peers.length) return cb()
|
254 | if (this.networking.joined(discoveryKey)) {
|
255 | if (this.networking.flushed(discoveryKey) && !core.peers.length) return cb()
|
256 | return peerAddSet.add(cb)
|
257 | }
|
258 | if (globalFlushed) return cb()
|
259 | return peerAddSet.add(cb)
|
260 | }
|
261 | }
|
262 | core.timeouts = timeouts
|
263 | })
|
264 | }
|
265 |
|
266 |
|
267 |
|
268 | async _rpcStatus (call) {
|
269 | const rsp = new rpc.main.messages.StatusResponse()
|
270 | rsp.setApiversion(apiVersion)
|
271 | rsp.setUptime(Date.now() - this._startTime)
|
272 | if (this._versions) {
|
273 | rsp.setDaemonversion(this._versions.daemon)
|
274 | rsp.setClientversion(this._versions.client)
|
275 | rsp.setSchemaversion(this._versions.schema)
|
276 | rsp.setHyperdriveversion(this._versions.hyperdrive)
|
277 | rsp.setNoisekey(this.noiseKeyPair.publicKey)
|
278 |
|
279 | const swarm = this.networking && this.networking.swarm
|
280 | if (swarm) {
|
281 | const remoteAddress = swarm.remoteAddress()
|
282 | rsp.setHolepunchable(swarm.holepunchable())
|
283 | rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '')
|
284 | }
|
285 |
|
286 | if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative)
|
287 | if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse)
|
288 |
|
289 | if (hyperfuse) {
|
290 | rsp.setFuseavailable(true)
|
291 | rsp.setFuseconfigured(this.fuse.fuseConfigured)
|
292 | } else {
|
293 | rsp.setFuseavailable(false)
|
294 | rsp.setFuseconfigured(false)
|
295 | }
|
296 | }
|
297 | return rsp
|
298 | }
|
299 |
|
300 | async _rpcRefreshFuse (call) {
|
301 | await this.fuse.ready()
|
302 | if (this.fuse && this.fuse.fuseConfigured) {
|
303 | hyperfuse = require('hyperdrive-fuse')
|
304 | this._versions.fuseNative = require('fuse-native/package.json').version
|
305 | this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version
|
306 | }
|
307 | return new rpc.main.messages.FuseRefreshResponse()
|
308 | }
|
309 |
|
310 |
|
311 |
|
312 | get uptime () {
|
313 | if (!this._startTime) return 0
|
314 | return Date.now() - this._startTime
|
315 | }
|
316 |
|
317 | get noiseKeyPair () {
|
318 | if (!this.networking) return null
|
319 | return this.networking.keyPair
|
320 | }
|
321 |
|
322 | async stop (err) {
|
323 |
|
324 | if (err && ((err.code === 1) || (err.code === 'ERR_HTTP2_INVALID_STREAM'))) return
|
325 | if (err) log.error({ error: true, err, message: err.message, stack: err.stack, errno: err.errno }, 'stopping daemon due to error')
|
326 | if (this._isClosed) {
|
327 | log.info('force killing the process because stop has been called twice')
|
328 | if (this._isMain) return process.exit(0)
|
329 | return null
|
330 | }
|
331 | this._isClosed = true
|
332 |
|
333 | try {
|
334 | if (this._topTimer) {
|
335 | clearInterval(this._topTimer)
|
336 | this._topTimer = null
|
337 | }
|
338 | if (this.server) this.server.forceShutdown()
|
339 | log.info('waiting for fuse to unmount')
|
340 | if (this.fuse && this.fuse.fuseConfigured) await this.fuse.unmount()
|
341 | log.info('waiting for networking to close')
|
342 | if (this.networking) await this.networking.close()
|
343 | log.info('waiting for corestore to close')
|
344 | if (this.corestore) {
|
345 | await new Promise((resolve, reject) => {
|
346 | this.corestore.close(err => {
|
347 | if (err) return reject(err)
|
348 | return resolve()
|
349 | })
|
350 | })
|
351 | }
|
352 | log.info('waiting for db to close')
|
353 | if (this.db) await this.db.close()
|
354 | if (this._isMain) return process.exit(0)
|
355 | } catch (err) {
|
356 | log.error({ error: err.message, stack: err.stack }, 'error in cleanup')
|
357 | if (this._isMain) return process.exit(1)
|
358 | throw err
|
359 | }
|
360 | log.info('finished cleanup -- shutting down')
|
361 |
|
362 | for (const event of STOP_EVENTS) {
|
363 | process.removeListener(event, this._cleanup)
|
364 | }
|
365 | }
|
366 |
|
367 | async start () {
|
368 | await this.ready()
|
369 | this._topTimer = setInterval(() => {
|
370 | log.info(processTop().toJSON(), 'process stats')
|
371 | }, 1000 * 60)
|
372 |
|
373 | this.server = new grpc.Server()
|
374 |
|
375 | this.server.addService(rpc.fuse.services.FuseService, {
|
376 | ...wrap(this.metadata, getHandlers(this.fuse), { authenticate: true })
|
377 | })
|
378 | this.server.addService(rpc.drive.services.DriveService, {
|
379 | ...wrap(this.metadata, getHandlers(this.drives), { authenticate: true })
|
380 | })
|
381 | this.server.addService(rpc.peersockets.services.PeersocketsService, {
|
382 | ...wrap(this.metadata, getHandlers(this.peersockets), { authenticate: true })
|
383 | })
|
384 | this.server.addService(rpc.peers.services.PeersService, {
|
385 | ...wrap(this.metadata, getHandlers(this.peers), { authenticate: true })
|
386 | })
|
387 | if (this.debug) {
|
388 | this.server.addService(rpc.debug.services.DebugService, {
|
389 | ...wrap(this.metadata, getHandlers(this.debug), { authenticate: true })
|
390 | })
|
391 | }
|
392 | this.server.addService(rpc.main.services.HyperdriveService, {
|
393 | ...wrap(this.metadata, getHandlers(this), { authenticate: true })
|
394 | })
|
395 |
|
396 | await new Promise((resolve, reject) => {
|
397 | this.server.bindAsync(`0.0.0.0:${this.port}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
|
398 | if (err) return reject(err)
|
399 | log.info({ port: port }, 'server listening')
|
400 | this.server.start()
|
401 | return resolve()
|
402 | })
|
403 | })
|
404 | }
|
405 | }
|
406 |
|
407 | function extractArguments () {
|
408 | const argv = require('minimist')(process.argv.slice(2), {
|
409 | string: ['storage', 'log-level', 'bootstrap'],
|
410 | boolean: ['announce', 'memory-only', 'debug'],
|
411 | default: {
|
412 | bootstrap: '',
|
413 | 'memory-only': false,
|
414 | announce: true,
|
415 | debug: true
|
416 | }
|
417 | })
|
418 | if (argv.bootstrap === 'false') argv.bootstrap = false
|
419 | else if (argv.bootstrap) argv.bootstrap = argv.bootstrap.split(',')
|
420 | return argv
|
421 | }
|
422 |
|
423 | function wrap (metadata, methods, opts) {
|
424 | const wrapped = {}
|
425 | const authenticate = opts && opts.authenticate
|
426 | for (const methodName of Object.keys(methods)) {
|
427 | const method = methods[methodName]
|
428 | wrapped[methodName] = function (call, ...args) {
|
429 | const tag = { method: methodName, received: Date.now() }
|
430 | const cb = args.length ? args[args.length - 1] : null
|
431 | if (authenticate) {
|
432 | let token = call.metadata && call.metadata.get('token')
|
433 | if (token) token = token[0]
|
434 | log.trace({ ...tag, token }, 'received token')
|
435 | if (!token || token !== metadata.token) {
|
436 | log.warn(tag, 'request authentication failed')
|
437 | const err = {
|
438 | code: grpc.status.UNAUTHENTICATED,
|
439 | message: 'Invalid auth token.'
|
440 | }
|
441 | if (cb) return cb(err)
|
442 | return call.destroy(err)
|
443 | }
|
444 | log.trace(tag, 'request authentication succeeded')
|
445 | }
|
446 | method(call)
|
447 | .then(rsp => {
|
448 | log.trace(tag, 'request was successful')
|
449 | if (cb) process.nextTick(cb, null, rsp)
|
450 | })
|
451 | .catch(err => {
|
452 | log.trace({ ...tag, error: err.toString() }, 'request failed')
|
453 | if (cb) return cb(serverError(err))
|
454 | return call.destroy(err)
|
455 | })
|
456 | }
|
457 | }
|
458 | return wrapped
|
459 | }
|
460 |
|
461 | function callAllInSet (set) {
|
462 | for (const cb of set) {
|
463 | cb()
|
464 | }
|
465 | set.clear()
|
466 | }
|
467 |
|
468 | if (require.main === module) {
|
469 | const opts = extractArguments()
|
470 | const daemon = new HyperdriveDaemon({ ...opts, main: true })
|
471 | process.title = 'hyperdrive'
|
472 | daemon.start()
|
473 | } else {
|
474 | module.exports = HyperdriveDaemon
|
475 | }
|