UNPKG

30.7 kBJavaScriptView Raw
1const pathRoot = require('path')
2const path = pathRoot.posix || pathRoot
3const { EventEmitter } = require('events')
4
5const collect = require('stream-collector')
6const thunky = require('thunky')
7const ThunkyMap = require('thunky-map')
8const unixify = require('unixify')
9const duplexify = require('duplexify')
10const through = require('through2')
11const pumpify = require('pumpify')
12const pump = require('pump')
13
14const coreByteStream = require('hypercore-byte-stream')
15const Nanoresource = require('nanoresource/emitter')
16const HypercoreProtocol = require('hypercore-protocol')
17const MountableHypertrie = require('mountable-hypertrie')
18const Corestore = require('corestore')
19const { Stat } = require('hyperdrive-schemas')
20
21const createFileDescriptor = require('./lib/fd')
22const errors = require('./lib/errors')
23const defaultCorestore = require('./lib/storage')
24const { contentKeyPair, contentOptions, ContentState } = require('./lib/content')
25const { statIterator, createStatStream, createMountStream, createReaddirStream, readdirIterator } = require('./lib/iterator')
26
27// 20 is arbitrary, just to make the fds > stdio etc
28const STDIO_CAP = 20
29
30module.exports = (...args) => new Hyperdrive(...args)
31module.exports.constants = require('filesystem-constants').linux
32
33class Hyperdrive extends Nanoresource {
34 constructor (storage, key, opts) {
35 super()
36
37 if (isObject(key)) {
38 opts = key
39 key = null
40 }
41 if (!opts) opts = {}
42
43 this.opts = opts
44 this.key = key
45 this.discoveryKey = null
46 this.live = true
47 this.sparse = opts.sparse !== false
48 this.sparseMetadata = opts.sparseMetadata !== false
49
50 this._namespace = opts.namespace
51 this.corestore = defaultCorestore(storage, {
52 ...opts,
53 valueEncoding: 'binary',
54 // TODO: Support mixed sparsity.
55 sparse: this.sparse || this.sparseMetadata,
56 extensions: opts.extensions
57 })
58
59 if (this.corestore !== storage) this.corestore.on('error', err => this.emit('error', err))
60 if (opts.namespace) {
61 this.corestore = this.corestore.namespace(opts.namespace)
62 }
63
64 // Set in ready.
65 this.metadata = null
66 this.db = opts._db
67 this.isCheckout = !!this.db
68
69 this._contentStates = opts._contentStates || new ThunkyMap(this._contentStateFromMetadata.bind(this))
70 this._fds = []
71 this._writingFds = new Map()
72
73 this._metadataOpts = {
74 key,
75 sparse: this.sparseMetadata,
76 keyPair: opts.keyPair,
77 extensions: opts.extensions
78 }
79
80 this.ready(onReady)
81
82 const self = this
83
84 function onReady (err) {
85 if (err) return self.emit('error', err)
86 self.emit('ready')
87 }
88 }
89
90 get version () {
91 // TODO: The trie version starts at 1, so the empty hyperdrive version is also 1. This should be 0.
92 return this.db.version
93 }
94
95 get writable () {
96 return this.metadata.writable
97 }
98
99 get contentWritable () {
100 const contentState = this._contentStates.cache.get(this.db.feed)
101 if (!contentState) return false
102 return contentState.feed.writable
103 }
104
105 ready (cb) {
106 return this.open(cb)
107 }
108
109 _open (cb) {
110 const self = this
111 return this.corestore.ready(err => {
112 if (err) return cb(err)
113 this.metadata = this.corestore.default(this._metadataOpts)
114 this.metadata.ifAvailable.wait()
115 this.db = this.db || new MountableHypertrie(this.corestore, this.key, {
116 feed: this.metadata,
117 sparse: this.sparseMetadata
118 })
119 this.db.on('hypertrie', trie => {
120 this.emit('metadata-feed', trie.feed)
121 this.emit('mount', trie)
122 })
123 this.db.on('error', err => this.emit('error', err))
124
125 self.metadata.on('error', onerror)
126 self.metadata.on('append', update)
127 self.metadata.on('extension', extension)
128 self.metadata.on('peer-add', peeradd)
129 self.metadata.on('peer-remove', peerremove)
130
131 return self.metadata.ready(err => {
132 if (err) return done(err)
133
134 /**
135 * TODO: Update comment to reflect mounts.
136 *
137 * If a db is provided as input, ensure that a contentFeed is also provided, then return (this is a checkout).
138 * If the metadata feed is writable:
139 * If the metadata feed has length 0, then the db should be initialized with the content feed key as metadata.
140 * Else, initialize the db without metadata and load the content feed key from the header.
141 * If the metadata feed is readable:
142 * Initialize the db without metadata and load the content feed key from the header.
143 */
144 if (self.metadata.writable && !self.metadata.length && !self.isCheckout) {
145 initialize()
146 } else {
147 restore()
148 }
149 })
150 })
151
152 /**
153 * The first time the hyperdrive is created, we initialize both the db (metadata feed) and the content feed here.
154 */
155 function initialize () {
156 self._contentStateFromKey(null, (err, contentState) => {
157 if (err) return done(err)
158 // warm up the thunky map
159 self._contentStates.cache.set(self.db.feed, contentState)
160 self.db.setMetadata(contentState.feed.key)
161 self.db.ready(err => {
162 if (err) return done(err)
163 return done(null)
164 })
165 })
166 }
167
168 /**
169 * If the hyperdrive has already been created, wait for the db (metadata feed) to load.
170 * If the metadata feed is writable, we can immediately load the content feed from its private key.
171 * (Otherwise, we need to read the feed's metadata block first)
172 */
173 function restore (keyPair) {
174 self.db.ready(err => {
175 if (err) return done(err)
176 if (self.metadata.has(0)) self._getContent(self.db.feed, done)
177 else done(null)
178 })
179 }
180
181 function done (err) {
182 self.metadata.ifAvailable.continue()
183 if (err) return cb(err)
184 self.key = self.metadata.key
185 self.discoveryKey = self.metadata.discoveryKey
186 return cb(null)
187 }
188
189 function onerror (err) {
190 if (err) self.emit('error', err)
191 return cb(err)
192 }
193
194 function update () {
195 self.emit('update')
196 }
197
198 function extension(name, message, peer) {
199 self.emit('extension', name, message, peer)
200 }
201
202 function peeradd(peer) {
203 self.emit('peer-add', peer)
204 }
205
206 function peerremove(peer) {
207 self.emit('peer-remove', peer)
208 }
209 }
210
211 _getContent (metadata, cb) {
212 this._contentStates.get(metadata, cb)
213 }
214
215 _contentStateFromMetadata (metadata, cb) {
216 MountableHypertrie.getMetadata(metadata, (err, publicKey) => {
217 if (err) return cb(err)
218 this._contentStateFromKey(publicKey, cb)
219 })
220 }
221
222 _contentStateFromKey (publicKey, cb) {
223 const contentOpts = { key: publicKey, ...contentOptions(this), cache: { data: false } }
224 try {
225 var feed = this.corestore.get(contentOpts)
226 } catch (err) {
227 return cb(err)
228 }
229
230 feed.on('error', err => this.emit('error', err))
231 feed.ready(err => {
232 if (err) return cb(err)
233 this.emit('content-feed', feed)
234 return cb(null, new ContentState(feed))
235 })
236 }
237
238 _putStat (name, stat, opts, cb) {
239 if (typeof opts === 'function') return this._putStat(name, stat, null, opts)
240 try {
241 var encoded = stat.encode()
242 } catch (err) {
243 return cb(err)
244 }
245 this.db.put(name, encoded, opts, err => {
246 if (err) return cb(err)
247 return cb(null, stat)
248 })
249 }
250
251 _update (name, stat, cb) {
252 name = fixName(name)
253
254 this.db.get(name, (err, st) => {
255 if (err) return cb(err)
256 if (!st) return cb(new errors.FileNotFound(name))
257 try {
258 var decoded = Stat.decode(st.value)
259 } catch (err) {
260 return cb(err)
261 }
262 const oldMetadata = decoded.metadata
263 const newStat = Object.assign(decoded, stat)
264 if (stat.metadata) {
265 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
266 }
267 return this._putStat(name, newStat, { flags: st.flags }, cb)
268 })
269 }
270
271 _upsert (name, stat, cb) {
272 name = fixName(name)
273
274 this.db.get(name, (err, st) => {
275 if (err) return cb(err)
276 if (!st) {
277 var decoded = Stat.file()
278 }
279 else {
280 try {
281 var decoded = Stat.decode(st.value)
282 } catch (err) {
283 return cb(err)
284 }
285 }
286 const oldMetadata = decoded.metadata
287 const newStat = Object.assign(decoded, stat)
288 if (stat.metadata) {
289 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
290 }
291 return this._putStat(name, newStat, { flags: st ? st.flags : 0 }, cb)
292 })
293 }
294
295 open (name, flags, cb) {
296 if (!name || typeof name === 'function') return super.open(name)
297 name = fixName(name)
298
299 this.ready(err => {
300 if (err) return cb(err)
301 createFileDescriptor(this, name, flags, (err, fd) => {
302 if (err) return cb(err)
303 cb(null, STDIO_CAP + (this._fds.push(fd) - 1) * 2)
304 })
305 })
306 }
307
308 read (fd, buf, offset, len, pos, cb) {
309 if (typeof pos === 'function') {
310 cb = pos
311 pos = null
312 }
313
314 const desc = this._fds[(fd - STDIO_CAP) / 2]
315 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
316 if (pos == null) pos = desc.position
317 desc.read(buf, offset, len, pos, cb)
318 }
319
320 write (fd, buf, offset, len, pos, cb) {
321 if (typeof pos === 'function') {
322 cb = pos
323 pos = null
324 }
325
326 const desc = this._fds[(fd - STDIO_CAP) / 2]
327 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
328 if (pos == null) pos = desc.position
329 desc.write(buf, offset, len, pos, cb)
330 }
331
332 createReadStream (name, opts) {
333 if (!opts) opts = {}
334 const self = this
335
336 name = fixName(name)
337
338 const length = typeof opts.end === 'number' ? 1 + opts.end - (opts.start || 0) : typeof opts.length === 'number' ? opts.length : -1
339 const stream = coreByteStream({
340 ...opts,
341 highWaterMark: opts.highWaterMark || 64 * 1024
342 })
343
344 this.ready(err => {
345 if (err) return stream.destroy(err)
346 return this.stat(name, { file: true }, (err, st, trie) => {
347 if (err) return stream.destroy(err)
348 return this._getContent(trie.feed, (err, contentState) => {
349 if (err) return stream.destroy(err)
350 return oncontent(st, contentState)
351 })
352 })
353 })
354
355 function oncontent (st, contentState) {
356 if (st.mount && st.mount.hypercore) {
357 var byteOffset = 0
358 var blockOffset = 0
359 var blockLength = st.blocks
360 var feed = self.corestore.get({
361 key: st.mount.key,
362 sparse: self.sparse
363 })
364 feed.once('ready', () => self.emit('content-feed', feed))
365 } else {
366 blockOffset = st.offset
367 blockLength = st.blocks
368 byteOffset = opts.start ? st.byteOffset + opts.start : (length === -1 ? -1 : st.byteOffset)
369 feed = contentState.feed
370 }
371
372 const byteLength = length
373
374 stream.start({
375 feed,
376 blockOffset,
377 blockLength,
378 byteOffset,
379 byteLength
380 })
381 }
382
383 return stream
384 }
385
386 createDiffStream (other, prefix, opts) {
387 if (other instanceof Hyperdrive) other = other.version
388 if (typeof prefix === 'object') return this.createDiffStream(other, '/', prefix)
389 prefix = prefix || '/'
390
391 const diffStream = this.db.createDiffStream(other, prefix, opts)
392 return pumpify.obj(
393 diffStream,
394 through.obj((chunk, enc, cb) => {
395 const entry = { type: chunk.type, name: chunk.key }
396 if (chunk.left) entry.seq = chunk.left.seq
397 if (chunk.right) entry.previous = { seq: chunk.right.seq }
398 if (chunk.left && entry.type !== 'mount' && entry.type !== 'unmount') {
399 try {
400 entry.value = Stat.decode(chunk.left.value)
401 } catch (err) {
402 return cb(err)
403 }
404 } else if (chunk.left) {
405 entry.value = chunk.left.info
406 }
407 return cb(null, entry)
408 })
409 )
410 }
411
412 createDirectoryStream (name, opts) {
413 if (!opts) opts = {}
414 name = fixName(name)
415 return createStatStream(this, name, opts)
416 }
417
418 createWriteStream (name, opts) {
419 if (!opts) opts = {}
420 name = fixName(name)
421
422 const self = this
423 var release
424
425 const proxy = duplexify()
426 proxy.setReadable(false)
427
428 // TODO: support piping through a "split" stream like rabin
429
430 this.ready(err => {
431 if (err) return proxy.destroy(err)
432 this.stat(name, { trie: true }, (err, stat, trie) => {
433 if (err && (err.errno !== 2)) return proxy.destroy(err)
434
435 this._getContent(trie.feed, (err, contentState) => {
436 if (err) return proxy.destroy(err)
437 if (opts.wait === false && contentState.isLocked()) return cb(new Error('Content is locked.'))
438 contentState.lock(_release => {
439 release = _release
440 append(contentState)
441 })
442 })
443 })
444 })
445
446 return proxy
447
448 function append (contentState) {
449 if (proxy.destroyed) return release()
450
451 const byteOffset = contentState.feed.byteLength
452 const offset = contentState.feed.length
453
454 self.emit('appending', name, opts)
455
456 // TODO: revert the content feed if this fails!!!! (add an option to the write stream for this (atomic: true))
457 const stream = contentState.feed.createWriteStream()
458
459 proxy.on('close', ondone)
460 proxy.on('finish', ondone)
461
462 proxy.setWritable(stream)
463 proxy.on('prefinish', function () {
464 const stat = Stat.file({
465 ...opts,
466 offset,
467 byteOffset,
468 size: contentState.feed.byteLength - byteOffset,
469 blocks: contentState.feed.length - offset
470 })
471 proxy.cork()
472 self._putStat(name, stat, function (err) {
473 if (err) return proxy.destroy(err)
474 self.emit('append', name, opts)
475 proxy.uncork()
476 })
477 })
478 }
479
480 function ondone () {
481 proxy.removeListener('close', ondone)
482 proxy.removeListener('finish', ondone)
483 release()
484 }
485 }
486
487 create (name, opts, cb) {
488 if (typeof opts === 'function') return this.create(name, null, opts)
489
490 name = fixName(name)
491
492 this.ready(err => {
493 if (err) return cb(err)
494 this.lstat(name, { file: true, trie: true }, (err, stat) => {
495 if (err && err.errno !== 2) return cb(err)
496 if (stat) return cb(null, stat)
497 const st = Stat.file(opts)
498 return this._putStat(name, st, cb)
499 })
500 })
501 }
502
503 readFile (name, opts, cb) {
504 if (typeof opts === 'function') return this.readFile(name, null, opts)
505 if (typeof opts === 'string') opts = { encoding: opts }
506 if (!opts) opts = {}
507
508 name = fixName(name)
509
510 collect(this.createReadStream(name, opts), function (err, bufs) {
511 if (err) return cb(err)
512 let buf = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs)
513 cb(null, opts.encoding && opts.encoding !== 'binary' ? buf.toString(opts.encoding) : buf)
514 })
515 }
516
517 writeFile (name, buf, opts, cb) {
518 if (typeof opts === 'function') return this.writeFile(name, buf, null, opts)
519 if (typeof opts === 'string') opts = { encoding: opts }
520 if (!opts) opts = {}
521 if (typeof buf === 'string') buf = Buffer.from(buf, opts.encoding || 'utf-8')
522 if (!cb) cb = noop
523
524 name = fixName(name)
525
526 let stream = this.createWriteStream(name, opts)
527
528 // TODO: Do we need to maintain the error state? What's triggering 'finish' after 'error'?
529 var errored = false
530
531 stream.on('error', err => {
532 errored = true
533 return cb(err)
534 })
535 stream.on('finish', () => {
536 if (!errored) return cb(null)
537 })
538 stream.end(buf)
539 }
540
541 truncate (name, size, cb) {
542 name = fixName(name)
543
544 this.lstat(name, { file: true, trie: true }, (err, st) => {
545 if (err && err.errno !== 2) return cb(err)
546 if (!st || !size) return this.create(name, cb)
547 if (size === st.size) return cb(null)
548 if (size < st.size) {
549 const readStream = this.createReadStream(name, { length: size })
550 const writeStream = this.createWriteStream(name)
551 return pump(readStream, writeStream, cb)
552 } else {
553 this.open(name, 'a', (err, fd) => {
554 if (err) return cb(err)
555 const length = size - st.size
556 this.write(fd, Buffer.alloc(length), 0, length, st.size, err => {
557 if (err) return cb(err)
558 this.close(fd, cb)
559 })
560 })
561 }
562 })
563 }
564
565 ftruncate (fd, size, cb) {
566 const desc = this._fds[(fd - STDIO_CAP) / 2]
567 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
568 return desc.truncate(size, cb)
569 }
570
571 _createStat (name, opts, cb) {
572 const self = this
573
574 const statConstructor = (opts && opts.directory) ? Stat.directory : Stat.file
575 const shouldForce = !!opts.force
576
577 this.ready(err => {
578 if (err) return cb(err)
579 this.db.get(name, (err, node, trie) => {
580 if (err) return cb(err)
581 if (node && !shouldForce) return cb(new errors.PathAlreadyExists(name))
582 onexisting(node, trie)
583 })
584 })
585
586 function onexisting (node, trie) {
587 self.ready(err => {
588 if (err) return cb(err)
589 self._getContent(trie.feed, (err, contentState) => {
590 if (err) return cb(err)
591 const st = statConstructor({
592 ...opts,
593 offset: contentState.feed.length,
594 byteOffset: contentState.feed.byteLength
595 })
596 return cb(null, st)
597 })
598 })
599 }
600 }
601
602 mkdir (name, opts, cb) {
603 if (typeof opts === 'function') return this.mkdir(name, null, opts)
604 if (typeof opts === 'number') opts = { mode: opts }
605 if (!opts) opts = {}
606 if (!cb) cb = noop
607
608 name = fixName(name)
609 opts.directory = true
610
611 this._createStat(name, opts, (err, st) => {
612 if (err) return cb(err)
613 this._putStat(name, st, {
614 condition: ifNotExists
615 }, cb)
616 })
617 }
618
619 _statDirectory (name, opts, cb) {
620 const ite = this.db.iterator(name)
621 ite.next((err, st) => {
622 if (err) return cb(err)
623 if (name !== '/' && !st) return cb(new errors.FileNotFound(name))
624 if (name === '/') return cb(null, Stat.directory(), this.db)
625 const trie = st[MountableHypertrie.Symbols.TRIE]
626 const mount = st[MountableHypertrie.Symbols.MOUNT]
627 const innerPath = st[MountableHypertrie.Symbols.INNER_PATH]
628 try {
629 st = Stat.decode(st.value)
630 } catch (err) {
631 return cb(err)
632 }
633 const noMode = Object.assign({}, st, { mode: 0 })
634 return cb(null, Stat.directory(noMode), trie, mount, innerPath)
635 })
636 }
637
638 lstat (name, opts, cb) {
639 if (typeof opts === 'function') return this.lstat(name, null, opts)
640 if (!opts) opts = {}
641 const self = this
642 name = fixName(name)
643
644 this.ready(err => {
645 if (err) return cb(err)
646 this.db.get(name, opts, onstat)
647 })
648
649 function onstat (err, node, trie, mount, mountPath) {
650 if (err) return cb(err)
651 if (!node && opts.trie) return cb(null, null, trie, mount, mountPath)
652 if (!node && opts.file) return cb(new errors.FileNotFound(name))
653 if (!node) return self._statDirectory(name, opts, cb)
654 try {
655 var st = Stat.decode(node.value)
656 } catch (err) {
657 return cb(err)
658 }
659 const writingFd = self._writingFds.get(name)
660 if (writingFd) {
661 st.size = writingFd.stat.size
662 }
663 cb(null, st, trie, mount, mountPath)
664 }
665 }
666
667 stat (name, opts, cb) {
668 if (typeof opts === 'function') return this.stat(name, null, opts)
669 if (!opts) opts = {}
670
671 this.lstat(name, opts, (err, stat, trie, mount, mountPath) => {
672 if (err) return cb(err)
673 if (!stat) return cb(null, null, trie, name, mount, mountPath)
674 if (stat.linkname) {
675 if (path.isAbsolute(stat.linkname)) return this.stat(stat.linkname, opts, cb)
676 const relativeStat = path.resolve('/', path.dirname(name), stat.linkname)
677 return this.stat(relativeStat, opts, cb)
678 }
679 return cb(null, stat, trie, name, mount, mountPath)
680 })
681 }
682
683 access (name, opts, cb) {
684 if (typeof opts === 'function') return this.access(name, null, opts)
685 if (!opts) opts = {}
686 name = fixName(name)
687
688 this.stat(name, opts, err => {
689 cb(err)
690 })
691 }
692
693 exists (name, opts, cb) {
694 if (typeof opts === 'function') return this.exists(name, null, opts)
695 if (!opts) opts = {}
696
697 this.access(name, opts, err => {
698 cb(!err)
699 })
700 }
701
702 readdir (name, opts, cb) {
703 if (typeof opts === 'function') return this.readdir(name, null, opts)
704 name = fixName(name)
705
706 const readdirStream = createReaddirStream(this, name, opts)
707 return collect(readdirStream, (err, entries) => {
708 if (err) return cb(err)
709 return cb(null, entries)
710 })
711 }
712
713 _del (name, cb) {
714 this.ready(err => {
715 if (err) return cb(err)
716 this.db.del(name, (err, node) => {
717 if (err) return cb(err)
718 if (!node) return cb(new errors.FileNotFound(name))
719 return cb(null)
720 })
721 })
722 }
723
724 unlink (name, cb) {
725 name = fixName(name)
726 this._del(name, cb || noop)
727 }
728
729 rmdir (name, cb) {
730 if (!cb) cb = noop
731 name = fixName(name)
732 const self = this
733
734 const ite = readdirIterator(this, name)
735 ite.next((err, val) => {
736 if (err) return cb(err)
737 if (val) return cb(new errors.DirectoryNotEmpty(name))
738 self._del(name, cb)
739 })
740 }
741
742 replicate (isInitiator, opts) {
743 // support replicate({ initiator: bool }) also
744 if (typeof isInitiator === 'object' && isInitiator && !opts) {
745 opts = isInitiator
746 isInitiator = !!opts.initiator
747 }
748 const stream = (opts && opts.stream) || new HypercoreProtocol(isInitiator, { ...opts })
749 this.ready(err => {
750 if (err) return stream.destroy(err)
751 this.corestore.replicate(isInitiator, { ...opts, stream })
752 })
753 return stream
754 }
755
756 checkout (version, opts) {
757 opts = {
758 ...opts,
759 _db: this.db.checkout(version),
760 _contentStates: this._contentStates,
761 }
762 return new Hyperdrive(this.corestore, this.key, opts)
763 }
764
765 _closeFile (fd, cb) {
766 const idx = (fd - STDIO_CAP) / 2
767 const desc = this._fds[idx]
768 if (!desc) return process.nextTick(cb, new Error('Invalid file descriptor'))
769 this._fds[idx] = null
770 while (this._fds.length && !this._fds[this._fds.length - 1]) this._fds.pop()
771 desc.close(cb)
772 }
773
774 _close (cb) {
775 this.corestore.close((err) => {
776 this.emit('close')
777 cb(err)
778 })
779 }
780
781 close (fd, cb) {
782 if (typeof fd === 'number') return this._closeFile(fd, cb || noop)
783 super.close(false, fd)
784 }
785
786 stats (path, opts, cb) {
787 if (typeof opts === 'function') return this.stats(path, null, opts)
788 const self = this
789 const stats = new Map()
790
791 this.stat(path, (err, stat, trie) => {
792 if (err) return cb(err)
793 if (stat.isFile()) {
794 return fileStats(path, cb)
795 } else {
796 const recursive = opts && (opts.recursive !== false)
797 const ite = statIterator(self, path, { recursive })
798 return ite.next(function loop (err, info) {
799 if (err) return cb(err)
800 if (!info) return cb(null, stats)
801 fileStats(info.path, (err, fileStats) => {
802 if (err && err.errno !== 2) return cb(err)
803 if (!fileStats) return ite.next(loop)
804 stats.set(info.path, fileStats)
805 return ite.next(loop)
806 })
807 })
808 }
809 })
810
811 function onstats (err, path, fileStats) {
812 if (err) return cb(err)
813 stats.set(path, fileStats)
814 }
815
816 function fileStats (path, cb) {
817 const total = emptyStats()
818 return self.stat(path, (err, stat, trie) => {
819 if (err) return cb(err)
820 return self._getContent(trie.feed, (err, contentState) => {
821 if (err) return cb(err)
822 const downloadedBlocks = contentState.feed.downloaded(stat.offset, stat.offset + stat.blocks)
823 total.blocks = stat.blocks
824 total.size = stat.size
825 total.downloadedBlocks = downloadedBlocks
826 // TODO: This is not possible to implement now. Need a better byte length index in hypercore.
827 // total.downloadedBytes = 0
828 return cb(null, total)
829 })
830 })
831 }
832
833 function emptyStats () {
834 return {
835 blocks: 0,
836 size: 0,
837 downloadedBlocks: 0,
838 }
839 }
840 }
841
842 watchStats (path, opts) {
843 const self = this
844 var timer = setInterval(collectStats, (opts && opts.statsInveral) || 2000)
845 var collecting = false
846 var destroyed = false
847
848 const handle = new EventEmitter()
849 Object.assign(handle, {
850 destroy
851 })
852 return handle
853
854 function collectStats () {
855 if (collecting) return
856 collecting = true
857 self.stats(path, opts, (err, stats) => {
858 if (err) return destroy(err)
859 collecting = false
860 handle.stats = stats
861 handle.emit('update')
862 })
863 }
864
865 function destroy (err) {
866 handle.emit('destroy', err)
867 clearInterval(timer)
868 destroyed = true
869 }
870 }
871
872 download (path, opts = {}) {
873 const self = this
874 const ranges = new Map()
875 var pending = 0
876 var destroyed = false
877
878 const handle = new EventEmitter()
879 Object.assign(handle, {
880 destroy
881 })
882
883 self.stat(path, (err, stat, trie) => {
884 if (err) return destroy(err)
885 if (stat.isFile()) {
886 downloadFile(path, stat, trie, destroy)
887 } else {
888 const recursive = opts.recursive !== false
889 const noMounts = opts.noMounts !== false
890 const ite = statIterator(self, path, { recursive, noMounts, random: true })
891 downloadNext(ite)
892 }
893 })
894
895 return handle
896
897 function downloadNext (ite) {
898 if (destroyed) return
899 ite.next((err, info) => {
900 if (err) return destroy(err)
901 if (!info) {
902 if (!ranges.size) return destroy(null)
903 else return
904 }
905 const { path, stat, trie } = info
906 if (stat.mount) return downloadNext(ite)
907 downloadFile(path, stat, trie, err => {
908 if (err) return destroy(err)
909 return downloadNext(ite)
910 })
911 if (pending < ((opts && opts.maxConcurrent) || 50)) {
912 return downloadNext(ite)
913 }
914 })
915 }
916
917 function downloadFile (path, stat, trie, cb) {
918 pending++
919 self._getContent(trie.feed, (err, contentState) => {
920 if (err) return destroy(err)
921 const feed = contentState.feed
922 const range = feed.download({
923 start: stat.offset,
924 end: stat.offset + stat.blocks
925 }, err => {
926 pending--
927 if (err) return cb(err)
928 ranges.delete(path)
929 return cb(null)
930 })
931 ranges.set(path, { range, feed })
932 })
933 }
934
935 function destroy (err) {
936 if (destroyed) return
937 destroyed = true
938 for (const [path, { feed, range }] of ranges) {
939 feed.undownload(range)
940 }
941 if (err) handle.emit('error', err)
942 else handle.emit('finish')
943 }
944 }
945
946 watch (name, onchange) {
947 name = fixName(name)
948 return this.db.watch(name, onchange)
949 }
950
951 mount (path, key, opts, cb) {
952 if (typeof opts === 'function') return this.mount(path, key, null, opts)
953 const self = this
954
955 path = fixName(path)
956 opts = opts || {}
957
958 const statOpts = {
959 uid: opts.uid,
960 gid: opts.gid
961 }
962 statOpts.mount = {
963 key,
964 version: opts.version,
965 hash: opts.hash,
966 hypercore: !!opts.hypercore
967 }
968 statOpts.directory = !opts.hypercore
969
970 if (opts.hypercore) {
971 const core = this.corestore.get({
972 key,
973 ...opts,
974 parents: [this.key],
975 sparse: this.sparse
976 })
977 core.ready(err => {
978 if (err) return cb(err)
979 this.emit('content-feed', core)
980 statOpts.size = core.byteLength
981 statOpts.blocks = core.length
982 return mountCore()
983 })
984 } else {
985 return process.nextTick(mountTrie, null)
986 }
987
988 function mountCore () {
989 self._createStat(path, statOpts, (err, st) => {
990 if (err) return cb(err)
991 return self.db.put(path, st.encode(), cb)
992 })
993 }
994
995 function mountTrie () {
996 self._createStat(path, statOpts, (err, st) => {
997 if (err) return cb(err)
998 self.db.mount(path, key, { ...opts, value: st.encode() }, err => {
999 return self.db.loadMount(path, cb)
1000 })
1001 })
1002 }
1003 }
1004
1005 unmount (path, cb) {
1006 this.stat(path, (err, st) => {
1007 if (err) return cb(err)
1008 if (!st.mount) return cb(new Error('Can only unmount mounts.'))
1009 if (st.mount.hypercore) {
1010 return this.unlink(path, cb)
1011 } else {
1012 return this.db.unmount(path, cb)
1013 }
1014 })
1015 }
1016
1017 symlink (target, linkName, cb) {
1018 target = unixify(target)
1019 linkName = fixName(linkName)
1020
1021 this.lstat(linkName, (err, stat) => {
1022 if (err && (err.errno !== 2)) return cb(err)
1023 if (!err) return cb(new errors.PathAlreadyExists(linkName))
1024 const st = Stat.symlink({
1025 linkname: target
1026 })
1027 return this._putStat(linkName, st, cb)
1028 })
1029 }
1030
1031 createMountStream (opts) {
1032 return createMountStream(this, opts)
1033 }
1034
1035 getAllMounts (opts, cb) {
1036 if (typeof opts === 'function') return this.getAllMounts(null, opts)
1037 const mounts = new Map()
1038
1039 this.ready(err => {
1040 if (err) return cb(err)
1041 collect(this.createMountStream(opts), (err, mountList) => {
1042 if (err) return cb(err)
1043 for (const { path, metadata, content } of mountList) {
1044 mounts.set(path, { metadata, content })
1045 }
1046 return cb(null, mounts)
1047 })
1048 })
1049 }
1050
1051 extension (name, message) {
1052 this.metadata.extension(name, message)
1053 }
1054
1055 get peers () {
1056 return this.metadata.peers
1057 }
1058
1059 setMetadata (path, key, value, cb) {
1060 const metadata = {}
1061 metadata[key] = value
1062 this._update(path, { metadata }, cb)
1063 }
1064
1065 removeMetadata (path, key, cb) {
1066 const metadata = {}
1067 metadata[key] = null
1068 this._update(path, { metadata }, cb)
1069 }
1070
1071 copy (from, to, cb) {
1072 this.stat(from, (err, stat) => {
1073 if (err) return cb(err)
1074 this.create(to, stat, cb)
1075 })
1076 }
1077}
1078
1079function isObject (val) {
1080 return !!val && typeof val !== 'string' && !Buffer.isBuffer(val)
1081}
1082
1083function ifNotExists (oldNode, newNode, cb) {
1084 if (oldNode) return cb(new errors.PathAlreadyExists(oldNode.key))
1085 return cb(null, true)
1086}
1087
1088function fixName (name) {
1089 name = unixify(name)
1090 if (!name.startsWith('/')) name = '/' + name
1091 return name
1092}
1093
1094function noop () {}