UNPKG

16.2 kBJavaScriptView Raw
1const p = require('path')
2const { EventEmitter } = require('events')
3
4const mkdirp = require('mkdirp')
5const sub = require('subleveldown')
6const grpc = require('@grpc/grpc-js')
7const bjson = require('buffer-json-encoding')
8const processTop = require('process-top')
9const varint = require('varint')
10const Corestore = require('corestore')
11const HypercoreCache = require('hypercore-cache')
12const SwarmNetworker = require('corestore-swarm-networking')
13const HypercoreProtocol = require('hypercore-protocol')
14const Peersockets = require('peersockets')
15
16const { rpc, apiVersion } = require('hyperdrive-daemon-client')
17const { createMetadata } = require('./lib/metadata')
18const constants = require('hyperdrive-daemon-client/lib/constants')
19
20const DriveManager = require('./lib/drives')
21const PeersocketManager = require('./lib/peersockets')
22const PeersManager = require('./lib/peers')
23const DebugManager = require('./lib/debug')
24const FuseManager = require('./lib/fuse')
25const { serverError } = require('./lib/errors')
26const { getHandlers } = require('./lib/common')
27
28const log = require('./lib/log').child({ component: 'server' })
29
30const NAMESPACE = 'hyperdrive-daemon'
31const STOP_EVENTS = ['SIGINT', 'SIGTERM', 'unhandledRejection', 'uncaughtException']
32const WATCH_LIMIT = 300
33const MAX_PEERS = 128
34const SWARM_PORT = 49737
35
36const TOTAL_CACHE_SIZE = 1024 * 1024 * 512
37const CACHE_RATIO = 0.5
38const TREE_CACHE_SIZE = TOTAL_CACHE_SIZE * CACHE_RATIO
39const DATA_CACHE_SIZE = TOTAL_CACHE_SIZE * (1 - CACHE_RATIO)
40
41// This is set dynamically in refreshFuse.
42try {
43 var hyperfuse = require('hyperdrive-fuse')
44} catch (err) {}
45
46class HyperdriveDaemon extends EventEmitter {
47 constructor (opts = {}) {
48 super()
49
50 this.opts = opts
51 this.root = opts.storage || constants.root
52 this.storage = p.join(this.root, 'storage')
53
54 this.port = opts.port || constants.port
55 this.memoryOnly = !!opts.memoryOnly
56 this.noAnnounce = !!opts.noAnnounce
57 this.noDebug = !!opts.noDebug
58
59 log.info('memory only?', this.memoryOnly, 'no announce?', this.noAnnounce)
60 this._storageProvider = this.memoryOnly ? require('random-access-memory') : require('hypercore-default-storage')
61 this._dbProvider = this.memoryOnly ? require('level-mem') : require('level')
62
63 const corestoreOpts = {
64 storage: path => this._storageProvider(`${this.storage}/cores/${path}`),
65 sparse: true,
66 // Collect networking statistics.
67 stats: true,
68 cache: {
69 data: new HypercoreCache({
70 maxByteSize: DATA_CACHE_SIZE,
71 estimateSize: val => val.length
72 }),
73 tree: new HypercoreCache({
74 maxByteSize: TREE_CACHE_SIZE,
75 estimateSize: val => 40
76 })
77 },
78 ifAvailable: true
79 }
80 this.corestore = new Corestore(corestoreOpts.storage, corestoreOpts)
81
82 this._networkOpts = {
83 announceLocalAddress: true,
84 preferredPort: opts.swarmPort || SWARM_PORT,
85 maxPeers: opts.maxPeers || MAX_PEERS
86 }
87 const bootstrapOpts = opts.bootstrap || constants.bootstrap
88 if (bootstrapOpts && bootstrapOpts.length && bootstrapOpts[0] !== '') {
89 if (bootstrapOpts === false || bootstrapOpts[0] === 'false') {
90 this._networkOpts.bootstrap = false
91 } else {
92 this._networkOpts.bootstrap = bootstrapOpts
93 }
94 }
95 if (opts.latency !== undefined) this._networkOpts.latency = +opts.latency
96
97 // Set in ready.
98 this.networking = null
99 this.db = null
100 this.drives = null
101 this.fuse = null
102 this.peersockets = null
103 this.debug = null
104 this.metadata = null
105 this._startTime = null
106
107 // Set in start.
108 this.server = null
109 this._topTimer = null
110 this._dbs = null
111 this._isMain = !!opts.main
112 this._cleanup = null
113
114 this._isClosed = false
115 this._readyPromise = false
116
117 this._versions = null
118
119 this.ready = () => {
120 if (this._isClosed) return Promise.resolve()
121 if (this._readyPromise) return this._readyPromise
122 this._readyPromise = this._ready()
123 return this._readyPromise.catch(err => {
124 log.error({ error: err, stack: err.stack }, 'error in daemon ready function -- cleaning up')
125 return this.stop(err)
126 })
127 }
128 }
129
130 async _ready () {
131 // Always rotate the auth token when the daemon's restarted to prevent session mismatches.
132 this.metadata = this.opts.metadata || await createMetadata(this.root, `localhost:${this.port}`)
133 await this._ensureStorage()
134
135 this._cleanup = this.stop.bind(this)
136 for (const event of STOP_EVENTS) {
137 process.on(event, this._cleanup)
138 }
139
140 this.db = this._dbProvider(`${this.storage}/db`, { valueEncoding: 'json' })
141 const dbs = {
142 fuse: sub(this.db, 'fuse', { valueEncoding: bjson }),
143 drives: sub(this.db, 'drives', { valueEncoding: bjson }),
144 network: sub(this.db, 'network', { valueEncoding: 'json'})
145 }
146 this._dbs = dbs
147
148 await this.corestore.ready()
149
150 const seed = this.corestore._deriveSecret(NAMESPACE, 'replication-keypair')
151 const swarmId = this.corestore._deriveSecret(NAMESPACE, 'swarm-id')
152 this._networkOpts.keyPair = HypercoreProtocol.keyPair(seed)
153 this._networkOpts.id = swarmId
154
155 this.networking = new SwarmNetworker(this.corestore, this._networkOpts)
156 this.networking.on('replication-error', err => {
157 log.trace({ error: err.message, stack: err.stack }, 'replication error')
158 if (err.message && err.message.indexOf('Remote signature could not be verified') !== -1) {
159 log.warn('Remote signature verification is failing -- one of your hypercores appears to be forked or corrupted.')
160 }
161 })
162 this.networking.on('stream-opened', stream => {
163 log.trace({ remoteType: stream.remoteType, remoteAddress: stream.remoteAddress }, 'replication stream opened')
164 })
165 this.networking.on('stream-closed', stream => {
166 log.trace({ remoteType: stream.remoteType, remoteAddress: stream.remoteAddress }, 'replication stream closed')
167 })
168 await this.networking.listen()
169
170 // Register the Hyperswarm timeout heuristics on all cores generated by our corestore.
171 this._registerCoreTimeouts()
172
173 const peersockets = new Peersockets(this.networking)
174 this.peers = new PeersManager(this.networking, peersockets)
175 this.peersockets = new PeersocketManager(this.networking, this.peers, peersockets)
176 if (!this.noDebug) this.debug = new DebugManager(this)
177
178 this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, {
179 ...this.opts,
180 memoryOnly: this.memoryOnly,
181 watchLimit: this.opts.watchLimit || WATCH_LIMIT
182 })
183 this.drives.on('error', err => this.emit('error', err))
184 await this.drives.ready()
185
186 this.fuse = new FuseManager(this.drives, this._dbs.fuse, this.opts)
187 this.fuse.on('error', err => this.emit('error', err))
188 await this.fuse.ready()
189
190 this._isReady = true
191 this._startTime = Date.now()
192 this._versions = {
193 daemon: require('./package.json').version,
194 client: require('hyperdrive-daemon-client/package.json').version,
195 schema: require('hyperdrive-schemas/package.json').version,
196 hyperdrive: require('hyperdrive/package.json').version
197 }
198 if (this.fuse && this.fuse.fuseConfigured) {
199 this._versions.fuseNative = require('fuse-native/package.json').version
200 this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version
201 }
202 }
203
204 _ensureStorage () {
205 return new Promise((resolve, reject) => {
206 mkdirp(this.storage, err => {
207 if (err) return reject(err)
208 return resolve()
209 })
210 })
211 }
212
213 /**
214 * This is where we define our main heuristic for allowing hypercore gets/updates to proceed.
215 */
216 _registerCoreTimeouts () {
217 const flushSets = new Map()
218
219 this.networking.on('flushed', dkey => {
220 const keyString = dkey.toString('hex')
221 if (!flushSets.has(keyString)) return
222 const { flushSet, peerAddSet } = flushSets.get(keyString)
223 callAllInSet(flushSet)
224 callAllInSet(peerAddSet)
225 })
226
227 this.corestore.on('feed', core => {
228 const discoveryKey = core.discoveryKey
229 const peerAddSet = new Set()
230 const flushSet = new Set()
231 var globalFlushed = false
232
233 this.networking.swarm.flush(() => {
234 if (this.networking.joined(discoveryKey)) return
235 globalFlushed = true
236 callAllInSet(flushSet)
237 callAllInSet(peerAddSet)
238 })
239
240 flushSets.set(discoveryKey.toString('hex'), { flushSet, peerAddSet })
241 core.once('peer-add', () => callAllInSet(peerAddSet))
242
243 const timeouts = {
244 get: (cb) => {
245 if (this.networking.joined(discoveryKey)) {
246 if (this.networking.flushed(discoveryKey)) return cb()
247 return flushSet.add(cb)
248 }
249 if (globalFlushed) return cb()
250 return flushSet.add(cb)
251 },
252 update: (cb) => {
253 if (core.peers.length) return cb()
254 if (this.networking.joined(discoveryKey)) {
255 if (this.networking.flushed(discoveryKey) && !core.peers.length) return cb()
256 return peerAddSet.add(cb)
257 }
258 if (globalFlushed) return cb()
259 return peerAddSet.add(cb)
260 }
261 }
262 core.timeouts = timeouts
263 })
264 }
265
266 // RPC Methods
267
268 async _rpcStatus (call) {
269 const rsp = new rpc.main.messages.StatusResponse()
270 rsp.setApiversion(apiVersion)
271 rsp.setUptime(Date.now() - this._startTime)
272 if (this._versions) {
273 rsp.setDaemonversion(this._versions.daemon)
274 rsp.setClientversion(this._versions.client)
275 rsp.setSchemaversion(this._versions.schema)
276 rsp.setHyperdriveversion(this._versions.hyperdrive)
277 rsp.setNoisekey(this.noiseKeyPair.publicKey)
278
279 const swarm = this.networking && this.networking.swarm
280 if (swarm) {
281 const remoteAddress = swarm.remoteAddress()
282 rsp.setHolepunchable(swarm.holepunchable())
283 rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '')
284 }
285
286 if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative)
287 if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse)
288
289 if (hyperfuse) {
290 rsp.setFuseavailable(true)
291 rsp.setFuseconfigured(this.fuse.fuseConfigured)
292 } else {
293 rsp.setFuseavailable(false)
294 rsp.setFuseconfigured(false)
295 }
296 }
297 return rsp
298 }
299
300 async _rpcRefreshFuse (call) {
301 await this.fuse.ready()
302 if (this.fuse && this.fuse.fuseConfigured) {
303 hyperfuse = require('hyperdrive-fuse')
304 this._versions.fuseNative = require('fuse-native/package.json').version
305 this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version
306 }
307 return new rpc.main.messages.FuseRefreshResponse()
308 }
309
310 // Public Methods
311
312 get uptime () {
313 if (!this._startTime) return 0
314 return Date.now() - this._startTime
315 }
316
317 get noiseKeyPair () {
318 if (!this.networking) return null
319 return this.networking.keyPair
320 }
321
322 async stop (err) {
323 // Couldn't tell you why these propagate as uncaughtExceptions (gRPC is a PITA), but we should ignore them.
324 if (err && ((err.code === 1) || (err.code === 'ERR_HTTP2_INVALID_STREAM'))) return
325 if (err) log.error({ error: true, err, message: err.message, stack: err.stack, errno: err.errno }, 'stopping daemon due to error')
326 if (this._isClosed) {
327 log.info('force killing the process because stop has been called twice')
328 if (this._isMain) return process.exit(0)
329 return null
330 }
331 this._isClosed = true
332
333 try {
334 if (this._topTimer) {
335 clearInterval(this._topTimer)
336 this._topTimer = null
337 }
338 if (this.server) this.server.forceShutdown()
339 log.info('waiting for fuse to unmount')
340 if (this.fuse && this.fuse.fuseConfigured) await this.fuse.unmount()
341 log.info('waiting for networking to close')
342 if (this.networking) await this.networking.close()
343 log.info('waiting for corestore to close')
344 if (this.corestore) {
345 await new Promise((resolve, reject) => {
346 this.corestore.close(err => {
347 if (err) return reject(err)
348 return resolve()
349 })
350 })
351 }
352 log.info('waiting for db to close')
353 if (this.db) await this.db.close()
354 if (this._isMain) return process.exit(0)
355 } catch (err) {
356 log.error({ error: err.message, stack: err.stack }, 'error in cleanup')
357 if (this._isMain) return process.exit(1)
358 throw err
359 }
360 log.info('finished cleanup -- shutting down')
361
362 for (const event of STOP_EVENTS) {
363 process.removeListener(event, this._cleanup)
364 }
365 }
366
367 async start () {
368 await this.ready()
369 this._topTimer = setInterval(() => {
370 log.info(processTop().toJSON(), 'process stats')
371 }, 1000 * 60)
372
373 this.server = new grpc.Server()
374
375 this.server.addService(rpc.fuse.services.FuseService, {
376 ...wrap(this.metadata, getHandlers(this.fuse), { authenticate: true })
377 })
378 this.server.addService(rpc.drive.services.DriveService, {
379 ...wrap(this.metadata, getHandlers(this.drives), { authenticate: true })
380 })
381 this.server.addService(rpc.peersockets.services.PeersocketsService, {
382 ...wrap(this.metadata, getHandlers(this.peersockets), { authenticate: true })
383 })
384 this.server.addService(rpc.peers.services.PeersService, {
385 ...wrap(this.metadata, getHandlers(this.peers), { authenticate: true })
386 })
387 if (this.debug) {
388 this.server.addService(rpc.debug.services.DebugService, {
389 ...wrap(this.metadata, getHandlers(this.debug), { authenticate: true })
390 })
391 }
392 this.server.addService(rpc.main.services.HyperdriveService, {
393 ...wrap(this.metadata, getHandlers(this), { authenticate: true })
394 })
395
396 await new Promise((resolve, reject) => {
397 this.server.bindAsync(`0.0.0.0:${this.port}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
398 if (err) return reject(err)
399 log.info({ port: port }, 'server listening')
400 this.server.start()
401 return resolve()
402 })
403 })
404 }
405}
406
407function extractArguments () {
408 const argv = require('minimist')(process.argv.slice(2), {
409 string: ['storage', 'log-level', 'bootstrap'],
410 boolean: ['announce', 'memory-only', 'debug'],
411 default: {
412 bootstrap: '',
413 'memory-only': false,
414 announce: true,
415 debug: true
416 }
417 })
418 if (argv.bootstrap === 'false') argv.bootstrap = false
419 else if (argv.bootstrap) argv.bootstrap = argv.bootstrap.split(',')
420 return argv
421}
422
423function wrap (metadata, methods, opts) {
424 const wrapped = {}
425 const authenticate = opts && opts.authenticate
426 for (const methodName of Object.keys(methods)) {
427 const method = methods[methodName]
428 wrapped[methodName] = function (call, ...args) {
429 const tag = { method: methodName, received: Date.now() }
430 const cb = args.length ? args[args.length - 1] : null
431 if (authenticate) {
432 let token = call.metadata && call.metadata.get('token')
433 if (token) token = token[0]
434 log.trace({ ...tag, token }, 'received token')
435 if (!token || token !== metadata.token) {
436 log.warn(tag, 'request authentication failed')
437 const err = {
438 code: grpc.status.UNAUTHENTICATED,
439 message: 'Invalid auth token.'
440 }
441 if (cb) return cb(err)
442 return call.destroy(err)
443 }
444 log.trace(tag, 'request authentication succeeded')
445 }
446 method(call)
447 .then(rsp => {
448 log.trace(tag, 'request was successful')
449 if (cb) process.nextTick(cb, null, rsp)
450 })
451 .catch(err => {
452 log.trace({ ...tag, error: err.toString() }, 'request failed')
453 if (cb) return cb(serverError(err))
454 return call.destroy(err)
455 })
456 }
457 }
458 return wrapped
459}
460
461function callAllInSet (set) {
462 for (const cb of set) {
463 cb()
464 }
465 set.clear()
466}
467
468if (require.main === module) {
469 const opts = extractArguments()
470 const daemon = new HyperdriveDaemon({ ...opts, main: true })
471 process.title = 'hyperdrive'
472 daemon.start()
473} else {
474 module.exports = HyperdriveDaemon
475}