UNPKG

31.2 kBJavaScriptView Raw
1const pathRoot = require('path')
2const path = pathRoot.posix || pathRoot
3const { EventEmitter } = require('events')
4
5const collect = require('stream-collector')
6const thunky = require('thunky')
7const unixify = require('unixify')
8const duplexify = require('duplexify')
9const through = require('through2')
10const pumpify = require('pumpify')
11const pump = require('pump')
12
13const coreByteStream = require('hypercore-byte-stream')
14const Nanoresource = require('nanoresource/emitter')
15const HypercoreProtocol = require('hypercore-protocol')
16const MountableHypertrie = require('mountable-hypertrie')
17const Corestore = require('corestore')
18const { Stat } = require('hyperdrive-schemas')
19
20const createFileDescriptor = require('./lib/fd')
21const errors = require('./lib/errors')
22const defaultCorestore = require('./lib/storage')
23const { contentKeyPair, contentOptions, ContentState } = require('./lib/content')
24const { statIterator, createStatStream, createMountStream, createReaddirStream, readdirIterator } = require('./lib/iterator')
25
26// 20 is arbitrary, just to make the fds > stdio etc
27const STDIO_CAP = 20
28
29module.exports = (...args) => new Hyperdrive(...args)
30module.exports.constants = require('filesystem-constants').linux
31
32class Hyperdrive extends Nanoresource {
33 constructor (storage, key, opts) {
34 super()
35
36 if (isObject(key)) {
37 opts = key
38 key = null
39 }
40 if (!opts) opts = {}
41
42 this.opts = opts
43 this.key = key
44 this.discoveryKey = null
45 this.live = true
46 this.sparse = opts.sparse !== false
47 this.sparseMetadata = opts.sparseMetadata !== false
48
49 this._namespace = opts.namespace
50 this.corestore = defaultCorestore(storage, {
51 ...opts,
52 valueEncoding: 'binary',
53 // TODO: Support mixed sparsity.
54 sparse: this.sparse || this.sparseMetadata,
55 extensions: opts.extensions
56 })
57
58 if (this.corestore !== storage) this.corestore.on('error', err => this.emit('error', err))
59 if (opts.namespace) {
60 this.corestore = this.corestore.namespace(opts.namespace)
61 }
62
63 // Set in ready.
64 this.metadata = null
65 this.db = opts._db
66
67 this._contentStates = opts.contentStates || new Map()
68 this._fds = []
69 this._writingFds = new Map()
70
71 this._metadataOpts = {
72 key,
73 sparse: this.sparseMetadata,
74 keyPair: opts.keyPair,
75 extensions: opts.extensions
76 }
77 this._checkoutContent = opts._content
78
79 this.ready(onReady)
80
81 const self = this
82
83 function onReady (err) {
84 if (err) return self.emit('error', err)
85 self.emit('ready')
86 }
87 }
88
89 get version () {
90 // TODO: The trie version starts at 1, so the empty hyperdrive version is also 1. This should be 0.
91 return this.db.version
92 }
93
94 get writable () {
95 return this.metadata.writable
96 }
97
98 get contentWritable () {
99 const contentState = this._contentStates.get(this.db)
100 if (!contentState) return false
101 return contentState.feed.writable
102 }
103
104 ready (cb) {
105 return this.open(cb)
106 }
107
108 _open (cb) {
109 const self = this
110 return this.corestore.ready(err => {
111 if (err) return cb(err)
112 this.metadata = this.corestore.default(this._metadataOpts)
113 this.metadata.ifAvailable.wait()
114 this.db = this.db || new MountableHypertrie(this.corestore, this.key, {
115 feed: this.metadata,
116 sparse: this.sparseMetadata
117 })
118 this.db.on('hypertrie', trie => {
119 this.emit('metadata-feed', trie.feed)
120 this.emit('mount', trie)
121 })
122 this.db.on('error', err => this.emit('error', err))
123
124 if (this._checkoutContent) this._contentStates.set(this.db, new ContentState(this._checkoutContent))
125
126 self.metadata.on('error', onerror)
127 self.metadata.on('append', update)
128 self.metadata.on('extension', extension)
129 self.metadata.on('peer-add', peeradd)
130 self.metadata.on('peer-remove', peerremove)
131
132 return self.metadata.ready(err => {
133 if (err) return done(err)
134
135 /**
136 * TODO: Update comment to reflect mounts.
137 *
138 * If a db is provided as input, ensure that a contentFeed is also provided, then return (this is a checkout).
139 * If the metadata feed is writable:
140 * If the metadata feed has length 0, then the db should be initialized with the content feed key as metadata.
141 * Else, initialize the db without metadata and load the content feed key from the header.
142 * If the metadata feed is readable:
143 * Initialize the db without metadata and load the content feed key from the header.
144 */
145 if (self.opts._db) {
146 checkout()
147 } else if (self.metadata.writable && !self.metadata.length) {
148 initialize()
149 } else {
150 restore()
151 }
152 })
153 })
154
155 /**
156 * If a content feed was not passed in with the checkout, create it. Otherwise, the checkout is complete.
157 */
158 function checkout () {
159 if (!self._contentStates.get(self.db)) {
160 self._getContent(self.db, err => {
161 if (err) return done(err)
162 return done(null)
163 })
164 } else {
165 return done(null)
166 }
167 }
168
169 /**
170 * The first time the hyperdrive is created, we initialize both the db (metadata feed) and the content feed here.
171 */
172 function initialize () {
173 self._getContent(self.db, { initialize: true }, (err, contentState) => {
174 if (err) return done(err)
175 self.db.setMetadata(contentState.feed.key)
176 self.db.ready(err => {
177 if (err) return done(err)
178 return done(null)
179 })
180 })
181 }
182
183 /**
184 * If the hyperdrive has already been created, wait for the db (metadata feed) to load.
185 * If the metadata feed is writable, we can immediately load the content feed from its private key.
186 * (Otherwise, we need to read the feed's metadata block first)
187 */
188 function restore (keyPair) {
189 if (self.metadata.writable) {
190 self.db.ready(err => {
191 if (err) return done(err)
192 self._getContent(self.db, done)
193 })
194 } else {
195 self.db.ready(done)
196 }
197 }
198
199 function done (err) {
200 self.metadata.ifAvailable.continue()
201 if (err) return cb(err)
202 self.key = self.metadata.key
203 self.discoveryKey = self.metadata.discoveryKey
204 return cb(null)
205 }
206
207 function onerror (err) {
208 if (err) self.emit('error', err)
209 return cb(err)
210 }
211
212 function update () {
213 self.emit('update')
214 }
215
216 function extension(name, message, peer) {
217 self.emit('extension', name, message, peer)
218 }
219
220 function peeradd(peer) {
221 self.emit('peer-add', peer)
222 }
223
224 function peerremove(peer) {
225 self.emit('peer-remove', peer)
226 }
227 }
228
229 _getContent (db, opts, cb) {
230 if (typeof opts === 'function') return this._getContent(db, null, opts)
231 const self = this
232
233 const existingContent = self._contentStates.get(db)
234 if (existingContent) return process.nextTick(cb, null, existingContent)
235
236 if (opts && opts.initialize) return onkey(null)
237
238 db.getMetadata((err, publicKey) => {
239 if (err) return cb(err)
240 return onkey(publicKey)
241 })
242
243 function onkey (publicKey) {
244 const contentOpts = { key: publicKey, ...contentOptions(self), cache: { data: false } }
245 try {
246 var feed = self.corestore.get(contentOpts)
247 } catch (err) {
248 return cb(err)
249 }
250 feed.ready(err => {
251 if (err) return cb(err)
252 self.emit('content-feed', feed)
253 const state = new ContentState(feed)
254 self._contentStates.set(db, state)
255 feed.on('error', err => self.emit('error', err))
256 return cb(null, state)
257 })
258 }
259 }
260
261 _putStat (name, stat, opts, cb) {
262 if (typeof opts === 'function') return this._putStat(name, stat, null, opts)
263 try {
264 var encoded = stat.encode()
265 } catch (err) {
266 return cb(err)
267 }
268 this.db.put(name, encoded, opts, err => {
269 if (err) return cb(err)
270 return cb(null, stat)
271 })
272 }
273
274 _update (name, stat, cb) {
275 name = fixName(name)
276
277 this.db.get(name, (err, st) => {
278 if (err) return cb(err)
279 if (!st) return cb(new errors.FileNotFound(name))
280 try {
281 var decoded = Stat.decode(st.value)
282 } catch (err) {
283 return cb(err)
284 }
285 const oldMetadata = decoded.metadata
286 const newStat = Object.assign(decoded, stat)
287 if (stat.metadata) {
288 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
289 }
290 return this._putStat(name, newStat, { flags: st.flags }, cb)
291 })
292 }
293
294 _upsert (name, stat, cb) {
295 name = fixName(name)
296
297 this.db.get(name, (err, st) => {
298 if (err) return cb(err)
299 if (!st) {
300 var decoded = Stat.file()
301 }
302 else {
303 try {
304 var decoded = Stat.decode(st.value)
305 } catch (err) {
306 return cb(err)
307 }
308 }
309 const oldMetadata = decoded.metadata
310 const newStat = Object.assign(decoded, stat)
311 if (stat.metadata) {
312 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
313 }
314 return this._putStat(name, newStat, { flags: st ? st.flags : 0 }, cb)
315 })
316 }
317
318 open (name, flags, cb) {
319 if (!name || typeof name === 'function') return super.open(name)
320 name = fixName(name)
321
322 this.ready(err => {
323 if (err) return cb(err)
324 createFileDescriptor(this, name, flags, (err, fd) => {
325 if (err) return cb(err)
326 cb(null, STDIO_CAP + (this._fds.push(fd) - 1) * 2)
327 })
328 })
329 }
330
331 read (fd, buf, offset, len, pos, cb) {
332 if (typeof pos === 'function') {
333 cb = pos
334 pos = null
335 }
336
337 const desc = this._fds[(fd - STDIO_CAP) / 2]
338 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
339 if (pos == null) pos = desc.position
340 desc.read(buf, offset, len, pos, cb)
341 }
342
343 write (fd, buf, offset, len, pos, cb) {
344 if (typeof pos === 'function') {
345 cb = pos
346 pos = null
347 }
348
349 const desc = this._fds[(fd - STDIO_CAP) / 2]
350 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
351 if (pos == null) pos = desc.position
352 desc.write(buf, offset, len, pos, cb)
353 }
354
355 createReadStream (name, opts) {
356 if (!opts) opts = {}
357 const self = this
358
359 name = fixName(name)
360
361 const length = typeof opts.end === 'number' ? 1 + opts.end - (opts.start || 0) : typeof opts.length === 'number' ? opts.length : -1
362 const stream = coreByteStream({
363 ...opts,
364 highWaterMark: opts.highWaterMark || 64 * 1024
365 })
366
367 this.ready(err => {
368 if (err) return stream.destroy(err)
369 return this.stat(name, { file: true }, (err, st, trie) => {
370 if (err) return stream.destroy(err)
371 return this._getContent(trie, (err, contentState) => {
372 if (err) return stream.destroy(err)
373 return oncontent(st, contentState)
374 })
375 })
376 })
377
378 function oncontent (st, contentState) {
379 if (st.mount && st.mount.hypercore) {
380 var byteOffset = 0
381 var blockOffset = 0
382 var blockLength = st.blocks
383 var feed = self.corestore.get({
384 key: st.mount.key,
385 sparse: self.sparse
386 })
387 feed.once('ready', () => self.emit('content-feed', feed))
388 } else {
389 blockOffset = st.offset
390 blockLength = st.blocks
391 byteOffset = opts.start ? st.byteOffset + opts.start : (length === -1 ? -1 : st.byteOffset)
392 feed = contentState.feed
393 }
394
395 const byteLength = length
396
397 stream.start({
398 feed,
399 blockOffset,
400 blockLength,
401 byteOffset,
402 byteLength
403 })
404 }
405
406 return stream
407 }
408
409 createDiffStream (other, prefix, opts) {
410 if (other instanceof Hyperdrive) other = other.version
411 if (typeof prefix === 'object') return this.createDiffStream(other, '/', prefix)
412 prefix = prefix || '/'
413
414 const diffStream = this.db.createDiffStream(other, prefix, opts)
415 return pumpify.obj(
416 diffStream,
417 through.obj((chunk, enc, cb) => {
418 const entry = { type: chunk.type, name: chunk.key }
419 if (chunk.left) entry.seq = chunk.left.seq
420 if (chunk.right) entry.previous = { seq: chunk.right.seq }
421 if (chunk.left && entry.type !== 'mount' && entry.type !== 'unmount') {
422 try {
423 entry.value = Stat.decode(chunk.left.value)
424 } catch (err) {
425 return cb(err)
426 }
427 } else if (chunk.left) {
428 entry.value = chunk.left.info
429 }
430 return cb(null, entry)
431 })
432 )
433 }
434
435 createDirectoryStream (name, opts) {
436 if (!opts) opts = {}
437 name = fixName(name)
438 return createStatStream(this, name, opts)
439 }
440
441 createWriteStream (name, opts) {
442 if (!opts) opts = {}
443 name = fixName(name)
444
445 const self = this
446 var release
447
448 const proxy = duplexify()
449 proxy.setReadable(false)
450
451 // TODO: support piping through a "split" stream like rabin
452
453 this.ready(err => {
454 if (err) return proxy.destroy(err)
455 this.stat(name, { trie: true }, (err, stat, trie) => {
456 if (err && (err.errno !== 2)) return proxy.destroy(err)
457 this._getContent(trie, (err, contentState) => {
458 if (err) return proxy.destroy(err)
459 if (opts.wait === false && contentState.isLocked()) return cb(new Error('Content is locked.'))
460 contentState.lock(_release => {
461 release = _release
462 append(contentState)
463 })
464 })
465 })
466 })
467
468 return proxy
469
470 function append (contentState) {
471 if (proxy.destroyed) return release()
472
473 const byteOffset = contentState.feed.byteLength
474 const offset = contentState.feed.length
475
476 self.emit('appending', name, opts)
477
478 // TODO: revert the content feed if this fails!!!! (add an option to the write stream for this (atomic: true))
479 const stream = contentState.feed.createWriteStream()
480
481 proxy.on('close', ondone)
482 proxy.on('finish', ondone)
483
484 proxy.setWritable(stream)
485 proxy.on('prefinish', function () {
486 const stat = Stat.file({
487 ...opts,
488 offset,
489 byteOffset,
490 size: contentState.feed.byteLength - byteOffset,
491 blocks: contentState.feed.length - offset
492 })
493 proxy.cork()
494 self._putStat(name, stat, function (err) {
495 if (err) return proxy.destroy(err)
496 self.emit('append', name, opts)
497 proxy.uncork()
498 })
499 })
500 }
501
502 function ondone () {
503 proxy.removeListener('close', ondone)
504 proxy.removeListener('finish', ondone)
505 release()
506 }
507 }
508
509 create (name, opts, cb) {
510 if (typeof opts === 'function') return this.create(name, null, opts)
511
512 name = fixName(name)
513
514 this.ready(err => {
515 if (err) return cb(err)
516 this.lstat(name, { file: true, trie: true }, (err, stat) => {
517 if (err && err.errno !== 2) return cb(err)
518 if (stat) return cb(null, stat)
519 const st = Stat.file(opts)
520 return this._putStat(name, st, cb)
521 })
522 })
523 }
524
525 readFile (name, opts, cb) {
526 if (typeof opts === 'function') return this.readFile(name, null, opts)
527 if (typeof opts === 'string') opts = { encoding: opts }
528 if (!opts) opts = {}
529
530 name = fixName(name)
531
532 collect(this.createReadStream(name, opts), function (err, bufs) {
533 if (err) return cb(err)
534 let buf = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs)
535 cb(null, opts.encoding && opts.encoding !== 'binary' ? buf.toString(opts.encoding) : buf)
536 })
537 }
538
539 writeFile (name, buf, opts, cb) {
540 if (typeof opts === 'function') return this.writeFile(name, buf, null, opts)
541 if (typeof opts === 'string') opts = { encoding: opts }
542 if (!opts) opts = {}
543 if (typeof buf === 'string') buf = Buffer.from(buf, opts.encoding || 'utf-8')
544 if (!cb) cb = noop
545
546 name = fixName(name)
547
548 let stream = this.createWriteStream(name, opts)
549
550 // TODO: Do we need to maintain the error state? What's triggering 'finish' after 'error'?
551 var errored = false
552
553 stream.on('error', err => {
554 errored = true
555 return cb(err)
556 })
557 stream.on('finish', () => {
558 if (!errored) return cb(null)
559 })
560 stream.end(buf)
561 }
562
563 truncate (name, size, cb) {
564 name = fixName(name)
565
566 this.lstat(name, { file: true, trie: true }, (err, st) => {
567 if (err && err.errno !== 2) return cb(err)
568 if (!st || !size) return this.create(name, cb)
569 if (size === st.size) return cb(null)
570 if (size < st.size) {
571 const readStream = this.createReadStream(name, { length: size })
572 const writeStream = this.createWriteStream(name)
573 return pump(readStream, writeStream, cb)
574 } else {
575 this.open(name, 'a', (err, fd) => {
576 if (err) return cb(err)
577 const length = size - st.size
578 this.write(fd, Buffer.alloc(length), 0, length, st.size, err => {
579 if (err) return cb(err)
580 this.close(fd, cb)
581 })
582 })
583 }
584 })
585 }
586
587 ftruncate (fd, size, cb) {
588 const desc = this._fds[(fd - STDIO_CAP) / 2]
589 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
590 return desc.truncate(size, cb)
591 }
592
593 _createStat (name, opts, cb) {
594 const self = this
595
596 const statConstructor = (opts && opts.directory) ? Stat.directory : Stat.file
597 const shouldForce = !!opts.force
598
599 this.ready(err => {
600 if (err) return cb(err)
601 this.db.get(name, (err, node, trie) => {
602 if (err) return cb(err)
603 if (node && !shouldForce) return cb(new errors.PathAlreadyExists(name))
604 onexisting(node, trie)
605 })
606 })
607
608 function onexisting (node, trie) {
609 self.ready(err => {
610 if (err) return cb(err)
611 self._getContent(trie, (err, contentState) => {
612 if (err) return cb(err)
613 const st = statConstructor({
614 ...opts,
615 offset: contentState.feed.length,
616 byteOffset: contentState.feed.byteLength
617 })
618 return cb(null, st)
619 })
620 })
621 }
622 }
623
624 mkdir (name, opts, cb) {
625 if (typeof opts === 'function') return this.mkdir(name, null, opts)
626 if (typeof opts === 'number') opts = { mode: opts }
627 if (!opts) opts = {}
628 if (!cb) cb = noop
629
630 name = fixName(name)
631 opts.directory = true
632
633 this._createStat(name, opts, (err, st) => {
634 if (err) return cb(err)
635 this._putStat(name, st, {
636 condition: ifNotExists
637 }, cb)
638 })
639 }
640
641 _statDirectory (name, opts, cb) {
642 const ite = this.db.iterator(name)
643 ite.next((err, st) => {
644 if (err) return cb(err)
645 if (name !== '/' && !st) return cb(new errors.FileNotFound(name))
646 if (name === '/') return cb(null, Stat.directory(), this.db)
647 const trie = st[MountableHypertrie.Symbols.TRIE]
648 const mount = st[MountableHypertrie.Symbols.MOUNT]
649 try {
650 st = Stat.decode(st.value)
651 } catch (err) {
652 return cb(err)
653 }
654 const noMode = Object.assign({}, st, { mode: 0 })
655 return cb(null, Stat.directory(noMode), trie, mount)
656 })
657 }
658
659 lstat (name, opts, cb) {
660 if (typeof opts === 'function') return this.lstat(name, null, opts)
661 if (!opts) opts = {}
662 const self = this
663 name = fixName(name)
664
665 this.ready(err => {
666 if (err) return cb(err)
667 this.db.get(name, opts, onstat)
668 })
669
670 function onstat (err, node, trie, mount) {
671 if (err) return cb(err)
672 if (!node && opts.trie) return cb(null, null, trie, mount)
673 if (!node && opts.file) return cb(new errors.FileNotFound(name))
674 if (!node) return self._statDirectory(name, opts, cb)
675 try {
676 var st = Stat.decode(node.value)
677 } catch (err) {
678 return cb(err)
679 }
680 const writingFd = self._writingFds.get(name)
681 if (writingFd) {
682 st.size = writingFd.stat.size
683 }
684 cb(null, st, trie, mount)
685 }
686 }
687
688 stat (name, opts, cb) {
689 if (typeof opts === 'function') return this.stat(name, null, opts)
690 if (!opts) opts = {}
691
692 this.lstat(name, opts, (err, stat, trie, mount) => {
693 if (err) return cb(err)
694 if (!stat) return cb(null, null, trie, name, mount)
695 if (stat.linkname) {
696 if (path.isAbsolute(stat.linkname)) return this.stat(stat.linkname, opts, cb)
697 const relativeStat = path.resolve('/', path.dirname(name), stat.linkname)
698 return this.stat(relativeStat, opts, cb)
699 }
700 return cb(null, stat, trie, name, mount)
701 })
702 }
703
704 access (name, opts, cb) {
705 if (typeof opts === 'function') return this.access(name, null, opts)
706 if (!opts) opts = {}
707 name = fixName(name)
708
709 this.stat(name, opts, err => {
710 cb(err)
711 })
712 }
713
714 exists (name, opts, cb) {
715 if (typeof opts === 'function') return this.exists(name, null, opts)
716 if (!opts) opts = {}
717
718 this.access(name, opts, err => {
719 cb(!err)
720 })
721 }
722
723 readdir (name, opts, cb) {
724 if (typeof opts === 'function') return this.readdir(name, null, opts)
725 name = fixName(name)
726
727 const readdirStream = createReaddirStream(this, name, opts)
728 return collect(readdirStream, (err, entries) => {
729 if (err) return cb(err)
730 return cb(null, entries)
731 })
732 }
733
734 _del (name, cb) {
735 this.ready(err => {
736 if (err) return cb(err)
737 this.db.del(name, (err, node) => {
738 if (err) return cb(err)
739 if (!node) return cb(new errors.FileNotFound(name))
740 return cb(null)
741 })
742 })
743 }
744
745 unlink (name, cb) {
746 name = fixName(name)
747 this._del(name, cb || noop)
748 }
749
750 rmdir (name, cb) {
751 if (!cb) cb = noop
752 name = fixName(name)
753 const self = this
754
755 const ite = readdirIterator(this, name)
756 ite.next((err, val) => {
757 if (err) return cb(err)
758 if (val) return cb(new errors.DirectoryNotEmpty(name))
759 self._del(name, cb)
760 })
761 }
762
763 replicate (isInitiator, opts) {
764 // support replicate({ initiator: bool }) also
765 if (typeof isInitiator === 'object' && isInitiator && !opts) {
766 opts = isInitiator
767 isInitiator = !!opts.initiator
768 }
769 const stream = (opts && opts.stream) || new HypercoreProtocol(isInitiator, { ...opts })
770 this.ready(err => {
771 if (err) return stream.destroy(err)
772 this.corestore.replicate(isInitiator, { ...opts, stream })
773 })
774 return stream
775 }
776
777 checkout (version, opts) {
778 const db = this.db.checkout(version)
779 opts = {
780 ...opts,
781 _db: db,
782 _content: this._contentStates.get(this.db),
783 contentStates: this._contentStates,
784 }
785 return new Hyperdrive(this.corestore, this.key, opts)
786 }
787
788 _closeFile (fd, cb) {
789 const idx = (fd - STDIO_CAP) / 2
790 const desc = this._fds[idx]
791 if (!desc) return process.nextTick(cb, new Error('Invalid file descriptor'))
792 this._fds[idx] = null
793 while (this._fds.length && !this._fds[this._fds.length - 1]) this._fds.pop()
794 desc.close(cb)
795 }
796
797 _close (cb) {
798 this.corestore.close((err) => {
799 this.emit('close')
800 cb(err)
801 })
802 }
803
804 close (fd, cb) {
805 if (typeof fd === 'number') return this._closeFile(fd, cb || noop)
806 super.close(false, fd)
807 }
808
809 stats (path, opts, cb) {
810 if (typeof opts === 'function') return this.stats(path, null, opts)
811 const self = this
812 const stats = new Map()
813
814 this.stat(path, (err, stat, trie) => {
815 if (err) return cb(err)
816 if (stat.isFile()) {
817 return fileStats(path, cb)
818 } else {
819 const recursive = opts && (opts.recursive !== false)
820 const ite = statIterator(self, path, { recursive })
821 return ite.next(function loop (err, info) {
822 if (err) return cb(err)
823 if (!info) return cb(null, stats)
824 fileStats(info.path, (err, fileStats) => {
825 if (err && err.errno !== 2) return cb(err)
826 if (!fileStats) return ite.next(loop)
827 stats.set(info.path, fileStats)
828 return ite.next(loop)
829 })
830 })
831 }
832 })
833
834 function onstats (err, path, fileStats) {
835 if (err) return cb(err)
836 stats.set(path, fileStats)
837 }
838
839 function fileStats (path, cb) {
840 const total = emptyStats()
841 return self.stat(path, (err, stat, trie) => {
842 if (err) return cb(err)
843 return self._getContent(trie, (err, contentState) => {
844 if (err) return cb(err)
845 const downloadedBlocks = contentState.feed.downloaded(stat.offset, stat.offset + stat.blocks)
846 total.blocks = stat.blocks
847 total.size = stat.size
848 total.downloadedBlocks = downloadedBlocks
849 // TODO: This is not possible to implement now. Need a better byte length index in hypercore.
850 // total.downloadedBytes = 0
851 return cb(null, total)
852 })
853 })
854 }
855
856 function emptyStats () {
857 return {
858 blocks: 0,
859 size: 0,
860 downloadedBlocks: 0,
861 }
862 }
863 }
864
865 watchStats (path, opts) {
866 const self = this
867 var timer = setInterval(collectStats, (opts && opts.statsInveral) || 2000)
868 var collecting = false
869 var destroyed = false
870
871 const handle = new EventEmitter()
872 Object.assign(handle, {
873 destroy
874 })
875 return handle
876
877 function collectStats () {
878 if (collecting) return
879 collecting = true
880 self.stats(path, opts, (err, stats) => {
881 if (err) return destroy(err)
882 collecting = false
883 handle.stats = stats
884 handle.emit('update')
885 })
886 }
887
888 function destroy (err) {
889 handle.emit('destroy', err)
890 clearInterval(timer)
891 destroyed = true
892 }
893 }
894
895 download (path, opts = {}) {
896 const self = this
897 const ranges = new Map()
898 var pending = 0
899 var destroyed = false
900
901 const handle = new EventEmitter()
902 Object.assign(handle, {
903 destroy
904 })
905
906 self.stat(path, (err, stat, trie) => {
907 if (err) return destroy(err)
908 if (stat.isFile()) {
909 downloadFile(path, stat, trie, destroy)
910 } else {
911 const recursive = opts.recursive !== false
912 const noMounts = opts.noMounts !== false
913 const ite = statIterator(self, path, { recursive, noMounts, random: true })
914 downloadNext(ite)
915 }
916 })
917
918 return handle
919
920 function downloadNext (ite) {
921 if (destroyed) return
922 ite.next((err, info) => {
923 if (err) return destroy(err)
924 if (!info) {
925 if (!ranges.size) return destroy(null)
926 else return
927 }
928 const { path, stat, trie } = info
929 if (stat.mount) return downloadNext(ite)
930 downloadFile(path, stat, trie, err => {
931 if (err) return destroy(err)
932 return downloadNext(ite)
933 })
934 if (pending < ((opts && opts.maxConcurrent) || 50)) {
935 return downloadNext(ite)
936 }
937 })
938 }
939
940 function downloadFile (path, stat, trie, cb) {
941 pending++
942 self._getContent(trie, (err, contentState) => {
943 if (err) return destroy(err)
944 const feed = contentState.feed
945 const range = feed.download({
946 start: stat.offset,
947 end: stat.offset + stat.blocks
948 }, err => {
949 pending--
950 if (err) return cb(err)
951 ranges.delete(path)
952 return cb(null)
953 })
954 ranges.set(path, { range, feed })
955 })
956 }
957
958 function destroy (err) {
959 if (destroyed) return
960 destroyed = true
961 for (const [path, { feed, range }] of ranges) {
962 feed.undownload(range)
963 }
964 if (err) handle.emit('error', err)
965 else handle.emit('finish')
966 }
967 }
968
969 watch (name, onchange) {
970 name = fixName(name)
971 return this.db.watch(name, onchange)
972 }
973
974 mount (path, key, opts, cb) {
975 if (typeof opts === 'function') return this.mount(path, key, null, opts)
976 const self = this
977
978 path = fixName(path)
979 opts = opts || {}
980
981 const statOpts = {
982 uid: opts.uid,
983 gid: opts.gid
984 }
985 statOpts.mount = {
986 key,
987 version: opts.version,
988 hash: opts.hash,
989 hypercore: !!opts.hypercore
990 }
991 statOpts.directory = !opts.hypercore
992
993 if (opts.hypercore) {
994 const core = this.corestore.get({
995 key,
996 ...opts,
997 parents: [this.key],
998 sparse: this.sparse
999 })
1000 core.ready(err => {
1001 if (err) return cb(err)
1002 this.emit('content-feed', core)
1003 statOpts.size = core.byteLength
1004 statOpts.blocks = core.length
1005 return mountCore()
1006 })
1007 } else {
1008 return process.nextTick(mountTrie, null)
1009 }
1010
1011 function mountCore () {
1012 self._createStat(path, statOpts, (err, st) => {
1013 if (err) return cb(err)
1014 return self.db.put(path, st.encode(), cb)
1015 })
1016 }
1017
1018 function mountTrie () {
1019 self._createStat(path, statOpts, (err, st) => {
1020 if (err) return cb(err)
1021 self.db.mount(path, key, { ...opts, value: st.encode() }, err => {
1022 return self.db.loadMount(path, cb)
1023 })
1024 })
1025 }
1026 }
1027
1028 unmount (path, cb) {
1029 this.stat(path, (err, st) => {
1030 if (err) return cb(err)
1031 if (!st.mount) return cb(new Error('Can only unmount mounts.'))
1032 if (st.mount.hypercore) {
1033 return this.unlink(path, cb)
1034 } else {
1035 return this.db.unmount(path, cb)
1036 }
1037 })
1038 }
1039
1040 symlink (target, linkName, cb) {
1041 target = unixify(target)
1042 linkName = fixName(linkName)
1043
1044 this.lstat(linkName, (err, stat) => {
1045 if (err && (err.errno !== 2)) return cb(err)
1046 if (!err) return cb(new errors.PathAlreadyExists(linkName))
1047 const st = Stat.symlink({
1048 linkname: target
1049 })
1050 return this._putStat(linkName, st, cb)
1051 })
1052 }
1053
1054 createMountStream (opts) {
1055 return createMountStream(this, opts)
1056 }
1057
1058 getAllMounts (opts, cb) {
1059 if (typeof opts === 'function') return this.getAllMounts(null, opts)
1060 const mounts = new Map()
1061
1062 this.ready(err => {
1063 if (err) return cb(err)
1064 collect(this.createMountStream(opts), (err, mountList) => {
1065 if (err) return cb(err)
1066 for (const { path, metadata, content } of mountList) {
1067 mounts.set(path, { metadata, content })
1068 }
1069 return cb(null, mounts)
1070 })
1071 })
1072 }
1073
1074 extension (name, message) {
1075 this.metadata.extension(name, message)
1076 }
1077
1078 get peers () {
1079 return this.metadata.peers
1080 }
1081
1082 setMetadata (path, key, value, cb) {
1083 const metadata = {}
1084 metadata[key] = value
1085 this._update(path, { metadata }, cb)
1086 }
1087
1088 removeMetadata (path, key, cb) {
1089 const metadata = {}
1090 metadata[key] = null
1091 this._update(path, { metadata }, cb)
1092 }
1093
1094 copy (from, to, cb) {
1095 this.stat(from, (err, stat) => {
1096 if (err) return cb(err)
1097 this.create(to, stat, cb)
1098 })
1099 }
1100}
1101
1102function isObject (val) {
1103 return !!val && typeof val !== 'string' && !Buffer.isBuffer(val)
1104}
1105
1106function ifNotExists (oldNode, newNode, cb) {
1107 if (oldNode) return cb(new errors.PathAlreadyExists(oldNode.key))
1108 return cb(null, true)
1109}
1110
1111function fixName (name) {
1112 name = unixify(name)
1113 if (!name.startsWith('/')) name = '/' + name
1114 return name
1115}
1116
1117function noop () {}