UNPKG

31.7 kBJavaScriptView Raw
1const pathRoot = require('path')
2const path = pathRoot.posix || pathRoot
3const { EventEmitter } = require('events')
4
5const collect = require('stream-collector')
6const thunky = require('thunky')
7const ThunkyMap = require('thunky-map')
8const unixify = require('unixify')
9const duplexify = require('duplexify')
10const through = require('through2')
11const pumpify = require('pumpify')
12const pump = require('pump')
13
14const coreByteStream = require('hypercore-byte-stream')
15const Nanoresource = require('nanoresource/emitter')
16const HypercoreProtocol = require('hypercore-protocol')
17const MountableHypertrie = require('mountable-hypertrie')
18const Corestore = require('corestore')
19const { Stat } = require('hyperdrive-schemas')
20
21const createFileDescriptor = require('./lib/fd')
22const errors = require('./lib/errors')
23const defaultCorestore = require('./lib/storage')
24const TagManager = require('./lib/tagging')
25const { contentKeyPair, contentOptions, ContentState } = require('./lib/content')
26const { statIterator, createStatStream, createMountStream, createReaddirStream, readdirIterator } = require('./lib/iterator')
27
28// 20 is arbitrary, just to make the fds > stdio etc
29const STDIO_CAP = 20
30
31module.exports = (...args) => new Hyperdrive(...args)
32module.exports.constants = require('filesystem-constants').linux
33
34class Hyperdrive extends Nanoresource {
35 constructor (storage, key, opts) {
36 super()
37
38 if (isObject(key)) {
39 opts = key
40 key = null
41 }
42 if (!opts) opts = {}
43
44 this.opts = opts
45 this.key = key
46 this.discoveryKey = null
47 this.live = true
48 this.sparse = opts.sparse !== false
49 this.sparseMetadata = opts.sparseMetadata !== false
50
51 this._namespace = opts.namespace
52 this.corestore = defaultCorestore(storage, {
53 ...opts,
54 valueEncoding: 'binary',
55 // TODO: Support mixed sparsity.
56 sparse: this.sparse || this.sparseMetadata,
57 extensions: opts.extensions
58 })
59
60 if (this.corestore !== storage) this.corestore.on('error', err => this.emit('error', err))
61 if (opts.namespace) {
62 this.corestore = this.corestore.namespace(opts.namespace)
63 }
64
65 // Set in ready.
66 this.metadata = null
67 this.db = opts._db
68 this.tags = new TagManager(this)
69 this.isCheckout = !!this.db
70
71 this._contentStates = opts._contentStates || new ThunkyMap(this._contentStateFromMetadata.bind(this))
72 this._fds = []
73 this._writingFds = new Map()
74
75 this._metadataOpts = {
76 key,
77 sparse: this.sparseMetadata,
78 keyPair: opts.keyPair,
79 extensions: opts.extensions
80 }
81
82 this.ready(onReady)
83
84 const self = this
85
86 function onReady (err) {
87 if (err) return self.emit('error', err)
88 self.emit('ready')
89
90 if (self._contentStates.has(self.db.feed)) return
91 self._getContent(self.db.feed, noop)
92 }
93 }
94
95 get version () {
96 // TODO: The trie version starts at 1, so the empty hyperdrive version is also 1. This should be 0.
97 return this.db.version
98 }
99
100 get writable () {
101 return this.metadata.writable
102 }
103
104 get contentWritable () {
105 const contentState = this._contentStates.cache.get(this.db.feed)
106 if (!contentState) return false
107 return contentState.feed.writable
108 }
109
110 ready (cb) {
111 return this.open(cb)
112 }
113
114 _open (cb) {
115 const self = this
116 return this.corestore.ready(err => {
117 if (err) return cb(err)
118 this.metadata = this.corestore.default(this._metadataOpts)
119 this.metadata.ifAvailable.wait()
120 this.db = this.db || new MountableHypertrie(this.corestore, this.key, {
121 feed: this.metadata,
122 sparse: this.sparseMetadata
123 })
124 this.db.on('hypertrie', trie => {
125 this.emit('metadata-feed', trie.feed)
126 this.emit('mount', trie)
127 })
128 this.db.on('error', err => this.emit('error', err))
129
130 self.metadata.on('error', onerror)
131 self.metadata.on('append', update)
132 self.metadata.on('extension', extension)
133 self.metadata.on('peer-add', peeradd)
134 self.metadata.on('peer-remove', peerremove)
135
136 return self.metadata.ready(err => {
137 if (err) return done(err)
138
139 /**
140 * TODO: Update comment to reflect mounts.
141 *
142 * If a db is provided as input, ensure that a contentFeed is also provided, then return (this is a checkout).
143 * If the metadata feed is writable:
144 * If the metadata feed has length 0, then the db should be initialized with the content feed key as metadata.
145 * Else, initialize the db without metadata and load the content feed key from the header.
146 * If the metadata feed is readable:
147 * Initialize the db without metadata and load the content feed key from the header.
148 */
149 if (self.metadata.writable && !self.metadata.length && !self.isCheckout) {
150 initialize()
151 } else {
152 restore()
153 }
154 })
155 })
156
157 /**
158 * The first time the hyperdrive is created, we initialize both the db (metadata feed) and the content feed here.
159 */
160 function initialize () {
161 self._contentStateFromKey(null, (err, contentState) => {
162 if (err) return done(err)
163 // warm up the thunky map
164 self._contentStates.cache.set(self.db.feed, contentState)
165 self.db.setMetadata(contentState.feed.key)
166 self.db.ready(err => {
167 if (err) return done(err)
168 return done(null)
169 })
170 })
171 }
172
173 /**
174 * If the hyperdrive has already been created, wait for the db (metadata feed) to load.
175 * If the metadata feed is writable, we can immediately load the content feed from its private key.
176 * (Otherwise, we need to read the feed's metadata block first)
177 */
178 function restore (keyPair) {
179 self.db.ready(err => {
180 if (err) return done(err)
181 if (self.metadata.has(0)) self._getContent(self.db.feed, done)
182 else done(null)
183 })
184 }
185
186 function done (err) {
187 self.metadata.ifAvailable.continue()
188 if (err) return cb(err)
189 self.key = self.metadata.key
190 self.discoveryKey = self.metadata.discoveryKey
191 return cb(null)
192 }
193
194 function onerror (err) {
195 if (err) self.emit('error', err)
196 return cb(err)
197 }
198
199 function update () {
200 self.emit('update')
201 }
202
203 function extension(name, message, peer) {
204 self.emit('extension', name, message, peer)
205 }
206
207 function peeradd(peer) {
208 self.emit('peer-add', peer)
209 }
210
211 function peerremove(peer) {
212 self.emit('peer-remove', peer)
213 }
214 }
215
216 _getContent (metadata, cb) {
217 this._contentStates.get(metadata, cb)
218 }
219
220 _contentStateFromMetadata (metadata, cb) {
221 MountableHypertrie.getMetadata(metadata, (err, publicKey) => {
222 if (err) return cb(err)
223 this._contentStateFromKey(publicKey, cb)
224 })
225 }
226
227 _contentStateFromKey (publicKey, cb) {
228 const contentOpts = { key: publicKey, ...contentOptions(this), cache: { data: false } }
229 try {
230 var feed = this.corestore.get(contentOpts)
231 } catch (err) {
232 return cb(err)
233 }
234
235 feed.on('error', err => this.emit('error', err))
236 feed.ready(err => {
237 if (err) return cb(err)
238 this.emit('content-feed', feed)
239 return cb(null, new ContentState(feed))
240 })
241 }
242
243 _putStat (name, stat, opts, cb) {
244 if (typeof opts === 'function') return this._putStat(name, stat, null, opts)
245 try {
246 var encoded = stat.encode()
247 } catch (err) {
248 return cb(err)
249 }
250 this.db.put(name, encoded, opts, err => {
251 if (err) return cb(err)
252 return cb(null, stat)
253 })
254 }
255
256 _update (name, stat, cb) {
257 name = fixName(name)
258
259 this.db.get(name, (err, st) => {
260 if (err) return cb(err)
261 if (!st) return cb(new errors.FileNotFound(name))
262 try {
263 var decoded = Stat.decode(st.value)
264 } catch (err) {
265 return cb(err)
266 }
267 const oldMetadata = decoded.metadata
268 const newStat = Object.assign(decoded, stat)
269 if (stat.metadata) {
270 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
271 }
272 return this._putStat(name, newStat, { flags: st.flags }, cb)
273 })
274 }
275
276 _upsert (name, stat, cb) {
277 name = fixName(name)
278
279 this.db.get(name, (err, st) => {
280 if (err) return cb(err)
281 if (!st) {
282 var decoded = Stat.file()
283 }
284 else {
285 try {
286 var decoded = Stat.decode(st.value)
287 } catch (err) {
288 return cb(err)
289 }
290 }
291 const oldMetadata = decoded.metadata
292 const newStat = Object.assign(decoded, stat)
293 if (stat.metadata) {
294 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
295 }
296 return this._putStat(name, newStat, { flags: st ? st.flags : 0 }, cb)
297 })
298 }
299
300 getContent (cb) {
301 if (!this.db) return cb(null, null)
302 this._getContent(this.db.feed, (err, contentState) => {
303 if (err) return cb(err)
304 return cb(null, contentState.feed)
305 })
306 }
307
308 open (name, flags, cb) {
309 if (!name || typeof name === 'function') return super.open(name)
310 name = fixName(name)
311
312 this.ready(err => {
313 if (err) return cb(err)
314 createFileDescriptor(this, name, flags, (err, fd) => {
315 if (err) return cb(err)
316 cb(null, STDIO_CAP + (this._fds.push(fd) - 1) * 2)
317 })
318 })
319 }
320
321 read (fd, buf, offset, len, pos, cb) {
322 if (typeof pos === 'function') {
323 cb = pos
324 pos = null
325 }
326
327 const desc = this._fds[(fd - STDIO_CAP) / 2]
328 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
329 if (pos == null) pos = desc.position
330 desc.read(buf, offset, len, pos, cb)
331 }
332
333 write (fd, buf, offset, len, pos, cb) {
334 if (typeof pos === 'function') {
335 cb = pos
336 pos = null
337 }
338
339 const desc = this._fds[(fd - STDIO_CAP) / 2]
340 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
341 if (pos == null) pos = desc.position
342 desc.write(buf, offset, len, pos, cb)
343 }
344
345 createReadStream (name, opts) {
346 if (!opts) opts = {}
347 const self = this
348
349 name = fixName(name)
350
351 const length = typeof opts.end === 'number' ? 1 + opts.end - (opts.start || 0) : typeof opts.length === 'number' ? opts.length : -1
352 const stream = coreByteStream({
353 ...opts,
354 highWaterMark: opts.highWaterMark || 64 * 1024
355 })
356
357 this.ready(err => {
358 if (err) return stream.destroy(err)
359 return this.stat(name, { file: true }, (err, st, trie) => {
360 if (err) return stream.destroy(err)
361 return this._getContent(trie.feed, (err, contentState) => {
362 if (err) return stream.destroy(err)
363 return oncontent(st, contentState)
364 })
365 })
366 })
367
368 function oncontent (st, contentState) {
369 if (st.mount && st.mount.hypercore) {
370 var byteOffset = 0
371 var blockOffset = 0
372 var blockLength = st.blocks
373 var feed = self.corestore.get({
374 key: st.mount.key,
375 sparse: self.sparse
376 })
377 feed.once('ready', () => self.emit('content-feed', feed))
378 } else {
379 blockOffset = st.offset
380 blockLength = st.blocks
381 byteOffset = opts.start ? st.byteOffset + opts.start : (length === -1 ? -1 : st.byteOffset)
382 feed = contentState.feed
383 }
384
385 const byteLength = length
386
387 stream.start({
388 feed,
389 blockOffset,
390 blockLength,
391 byteOffset,
392 byteLength
393 })
394 }
395
396 return stream
397 }
398
399 createDiffStream (other, prefix, opts) {
400 if (other instanceof Hyperdrive) other = other.version
401 if (typeof prefix === 'object') return this.createDiffStream(other, '/', prefix)
402 prefix = prefix || '/'
403
404 const diffStream = this.db.createDiffStream(other, prefix, opts)
405 return pumpify.obj(
406 diffStream,
407 through.obj((chunk, enc, cb) => {
408 const entry = { type: chunk.type, name: chunk.key }
409 if (chunk.left) entry.seq = chunk.left.seq
410 if (chunk.right) entry.previous = { seq: chunk.right.seq }
411 if (chunk.left && entry.type !== 'mount' && entry.type !== 'unmount') {
412 try {
413 entry.value = Stat.decode(chunk.left.value)
414 } catch (err) {
415 return cb(err)
416 }
417 } else if (chunk.left) {
418 entry.value = chunk.left.info
419 }
420 return cb(null, entry)
421 })
422 )
423 }
424
425 createDirectoryStream (name, opts) {
426 if (!opts) opts = {}
427 name = fixName(name)
428 return createStatStream(this, name, opts)
429 }
430
431 createWriteStream (name, opts) {
432 if (!opts) opts = {}
433 name = fixName(name)
434
435 const self = this
436 var release
437
438 const proxy = duplexify()
439 proxy.setReadable(false)
440
441 // TODO: support piping through a "split" stream like rabin
442
443 this.ready(err => {
444 if (err) return proxy.destroy(err)
445 this.stat(name, { trie: true }, (err, stat, trie) => {
446 if (err && (err.errno !== 2)) return proxy.destroy(err)
447
448 this._getContent(trie.feed, (err, contentState) => {
449 if (err) return proxy.destroy(err)
450 if (opts.wait === false && contentState.isLocked()) return cb(new Error('Content is locked.'))
451 contentState.lock(_release => {
452 release = _release
453 append(contentState)
454 })
455 })
456 })
457 })
458
459 return proxy
460
461 function append (contentState) {
462 if (proxy.destroyed) return release()
463
464 const byteOffset = contentState.feed.byteLength
465 const offset = contentState.feed.length
466
467 self.emit('appending', name, opts)
468
469 // TODO: revert the content feed if this fails!!!! (add an option to the write stream for this (atomic: true))
470 const stream = contentState.feed.createWriteStream()
471
472 proxy.on('close', ondone)
473 proxy.on('finish', ondone)
474
475 proxy.setWritable(stream)
476 proxy.on('prefinish', function () {
477 const stat = Stat.file({
478 ...opts,
479 offset,
480 byteOffset,
481 size: contentState.feed.byteLength - byteOffset,
482 blocks: contentState.feed.length - offset
483 })
484 proxy.cork()
485 self._putStat(name, stat, function (err) {
486 if (err) return proxy.destroy(err)
487 self.emit('append', name, opts)
488 proxy.uncork()
489 })
490 })
491 }
492
493 function ondone () {
494 proxy.removeListener('close', ondone)
495 proxy.removeListener('finish', ondone)
496 release()
497 }
498 }
499
500 create (name, opts, cb) {
501 if (typeof opts === 'function') return this.create(name, null, opts)
502
503 name = fixName(name)
504
505 this.ready(err => {
506 if (err) return cb(err)
507 this.lstat(name, { file: true, trie: true }, (err, stat) => {
508 if (err && err.errno !== 2) return cb(err)
509 if (stat) return cb(null, stat)
510 const st = Stat.file(opts)
511 return this._putStat(name, st, cb)
512 })
513 })
514 }
515
516 readFile (name, opts, cb) {
517 if (typeof opts === 'function') return this.readFile(name, null, opts)
518 if (typeof opts === 'string') opts = { encoding: opts }
519 if (!opts) opts = {}
520
521 name = fixName(name)
522
523 collect(this.createReadStream(name, opts), function (err, bufs) {
524 if (err) return cb(err)
525 let buf = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs)
526 cb(null, opts.encoding && opts.encoding !== 'binary' ? buf.toString(opts.encoding) : buf)
527 })
528 }
529
530 writeFile (name, buf, opts, cb) {
531 if (typeof opts === 'function') return this.writeFile(name, buf, null, opts)
532 if (typeof opts === 'string') opts = { encoding: opts }
533 if (!opts) opts = {}
534 if (typeof buf === 'string') buf = Buffer.from(buf, opts.encoding || 'utf-8')
535 if (!cb) cb = noop
536
537 name = fixName(name)
538
539 let stream = this.createWriteStream(name, opts)
540
541 // TODO: Do we need to maintain the error state? What's triggering 'finish' after 'error'?
542 var errored = false
543
544 stream.on('error', err => {
545 errored = true
546 return cb(err)
547 })
548 stream.on('finish', () => {
549 if (!errored) return cb(null)
550 })
551 stream.end(buf)
552 }
553
554 truncate (name, size, cb) {
555 name = fixName(name)
556
557 this.lstat(name, { file: true, trie: true }, (err, st) => {
558 if (err && err.errno !== 2) return cb(err)
559 if (!st || !size) return this.create(name, cb)
560 if (size === st.size) return cb(null)
561 if (size < st.size) {
562 const readStream = this.createReadStream(name, { length: size })
563 const writeStream = this.createWriteStream(name)
564 return pump(readStream, writeStream, cb)
565 } else {
566 this.open(name, 'a', (err, fd) => {
567 if (err) return cb(err)
568 const length = size - st.size
569 this.write(fd, Buffer.alloc(length), 0, length, st.size, err => {
570 if (err) return cb(err)
571 this.close(fd, cb)
572 })
573 })
574 }
575 })
576 }
577
578 ftruncate (fd, size, cb) {
579 const desc = this._fds[(fd - STDIO_CAP) / 2]
580 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
581 return desc.truncate(size, cb)
582 }
583
584 _createStat (name, opts, cb) {
585 const self = this
586
587 const statConstructor = (opts && opts.directory) ? Stat.directory : Stat.file
588 const shouldForce = !!opts.force
589
590 this.ready(err => {
591 if (err) return cb(err)
592 this.db.get(name, (err, node, trie) => {
593 if (err) return cb(err)
594 if (node && !shouldForce) return cb(new errors.PathAlreadyExists(name))
595 onexisting(node, trie)
596 })
597 })
598
599 function onexisting (node, trie) {
600 self.ready(err => {
601 if (err) return cb(err)
602 self._getContent(trie.feed, (err, contentState) => {
603 if (err) return cb(err)
604 const st = statConstructor({
605 ...opts,
606 offset: contentState.feed.length,
607 byteOffset: contentState.feed.byteLength
608 })
609 return cb(null, st)
610 })
611 })
612 }
613 }
614
615 mkdir (name, opts, cb) {
616 if (typeof opts === 'function') return this.mkdir(name, null, opts)
617 if (typeof opts === 'number') opts = { mode: opts }
618 if (!opts) opts = {}
619 if (!cb) cb = noop
620
621 name = fixName(name)
622 opts.directory = true
623
624 this._createStat(name, opts, (err, st) => {
625 if (err) return cb(err)
626 this._putStat(name, st, {
627 condition: ifNotExists
628 }, cb)
629 })
630 }
631
632 _statDirectory (name, opts, cb) {
633 const ite = this.db.iterator(name)
634 ite.next((err, st) => {
635 if (err) return cb(err)
636 if (name !== '/' && !st) return cb(new errors.FileNotFound(name))
637 if (name === '/') return cb(null, Stat.directory(), this.db)
638 const trie = st[MountableHypertrie.Symbols.TRIE]
639 const mount = st[MountableHypertrie.Symbols.MOUNT]
640 const innerPath = st[MountableHypertrie.Symbols.INNER_PATH]
641 try {
642 st = Stat.decode(st.value)
643 } catch (err) {
644 return cb(err)
645 }
646 const noMode = Object.assign({}, st, { mode: 0 })
647 return cb(null, Stat.directory(noMode), trie, mount, innerPath)
648 })
649 }
650
651 readlink (name, cb) {
652 this.lstat(name, function (err, st) {
653 if (err) return cb(err)
654 cb(null, st.linkname)
655 })
656 }
657
658 lstat (name, opts, cb) {
659 if (typeof opts === 'function') return this.lstat(name, null, opts)
660 if (!opts) opts = {}
661 const self = this
662 name = fixName(name)
663
664 this.ready(err => {
665 if (err) return cb(err)
666 this.db.get(name, opts, onstat)
667 })
668
669 function onstat (err, node, trie, mount, mountPath) {
670 if (err) return cb(err)
671 if (!node && opts.trie) return cb(null, null, trie, mount, mountPath)
672 if (!node && opts.file) return cb(new errors.FileNotFound(name))
673 if (!node) return self._statDirectory(name, opts, cb)
674 try {
675 var st = Stat.decode(node.value)
676 } catch (err) {
677 return cb(err)
678 }
679 const writingFd = self._writingFds.get(name)
680 if (writingFd) {
681 st.size = writingFd.stat.size
682 }
683 cb(null, st, trie, mount, mountPath)
684 }
685 }
686
687 stat (name, opts, cb) {
688 if (typeof opts === 'function') return this.stat(name, null, opts)
689 if (!opts) opts = {}
690
691 this.lstat(name, opts, (err, stat, trie, mount, mountPath) => {
692 if (err) return cb(err)
693 if (!stat) return cb(null, null, trie, name, mount, mountPath)
694 if (stat.linkname) {
695 if (path.isAbsolute(stat.linkname)) return this.stat(stat.linkname, opts, cb)
696 const relativeStat = path.resolve('/', path.dirname(name), stat.linkname)
697 return this.stat(relativeStat, opts, cb)
698 }
699 return cb(null, stat, trie, name, mount, mountPath)
700 })
701 }
702
703 access (name, opts, cb) {
704 if (typeof opts === 'function') return this.access(name, null, opts)
705 if (!opts) opts = {}
706 name = fixName(name)
707
708 this.stat(name, opts, err => {
709 cb(err)
710 })
711 }
712
713 exists (name, opts, cb) {
714 if (typeof opts === 'function') return this.exists(name, null, opts)
715 if (!opts) opts = {}
716
717 this.access(name, opts, err => {
718 cb(!err)
719 })
720 }
721
722 readdir (name, opts, cb) {
723 if (typeof opts === 'function') return this.readdir(name, null, opts)
724 name = fixName(name)
725
726 const readdirStream = createReaddirStream(this, name, opts)
727 return collect(readdirStream, (err, entries) => {
728 if (err) return cb(err)
729 return cb(null, entries)
730 })
731 }
732
733 _del (name, cb) {
734 this.ready(err => {
735 if (err) return cb(err)
736 this.db.del(name, (err, node) => {
737 if (err) return cb(err)
738 if (!node) return cb(new errors.FileNotFound(name))
739 return cb(null)
740 })
741 })
742 }
743
744 unlink (name, cb) {
745 name = fixName(name)
746 this._del(name, cb || noop)
747 }
748
749 rmdir (name, cb) {
750 if (!cb) cb = noop
751 name = fixName(name)
752 const self = this
753
754 const ite = readdirIterator(this, name)
755 ite.next((err, val) => {
756 if (err) return cb(err)
757 if (val) return cb(new errors.DirectoryNotEmpty(name))
758 self._del(name, cb)
759 })
760 }
761
762 replicate (isInitiator, opts) {
763 // support replicate({ initiator: bool }) also
764 if (typeof isInitiator === 'object' && isInitiator && !opts) {
765 opts = isInitiator
766 isInitiator = !!opts.initiator
767 }
768 const stream = (opts && opts.stream) || new HypercoreProtocol(isInitiator, { ...opts })
769 this.ready(err => {
770 if (err) return stream.destroy(err)
771 this.corestore.replicate(isInitiator, { ...opts, stream })
772 })
773 return stream
774 }
775
776 checkout (version, opts) {
777 opts = {
778 ...opts,
779 _db: this.db.checkout(version),
780 _contentStates: this._contentStates,
781 }
782 return new Hyperdrive(this.corestore, this.key, opts)
783 }
784
785 _closeFile (fd, cb) {
786 const idx = (fd - STDIO_CAP) / 2
787 const desc = this._fds[idx]
788 if (!desc) return process.nextTick(cb, new Error('Invalid file descriptor'))
789 this._fds[idx] = null
790 while (this._fds.length && !this._fds[this._fds.length - 1]) this._fds.pop()
791 desc.close(cb)
792 }
793
794 _close (cb) {
795 this.corestore.close((err) => {
796 this.emit('close')
797 cb(err)
798 })
799 }
800
801 close (fd, cb) {
802 if (typeof fd === 'number') return this._closeFile(fd, cb || noop)
803 super.close(false, fd)
804 }
805
806 stats (path, opts, cb) {
807 if (typeof opts === 'function') return this.stats(path, null, opts)
808 const self = this
809 const stats = new Map()
810
811 this.stat(path, (err, stat, trie) => {
812 if (err) return cb(err)
813 if (stat.isFile()) {
814 return fileStats(path, cb)
815 } else {
816 const recursive = opts && (opts.recursive !== false)
817 const ite = statIterator(self, path, { recursive })
818 return ite.next(function loop (err, info) {
819 if (err) return cb(err)
820 if (!info) return cb(null, stats)
821 fileStats(info.path, (err, fileStats) => {
822 if (err && err.errno !== 2) return cb(err)
823 if (!fileStats) return ite.next(loop)
824 stats.set(info.path, fileStats)
825 return ite.next(loop)
826 })
827 })
828 }
829 })
830
831 function onstats (err, path, fileStats) {
832 if (err) return cb(err)
833 stats.set(path, fileStats)
834 }
835
836 function fileStats (path, cb) {
837 const total = emptyStats()
838 return self.stat(path, (err, stat, trie) => {
839 if (err) return cb(err)
840 return self._getContent(trie.feed, (err, contentState) => {
841 if (err) return cb(err)
842 const downloadedBlocks = contentState.feed.downloaded(stat.offset, stat.offset + stat.blocks)
843 total.blocks = stat.blocks
844 total.size = stat.size
845 total.downloadedBlocks = downloadedBlocks
846 // TODO: This is not possible to implement now. Need a better byte length index in hypercore.
847 // total.downloadedBytes = 0
848 return cb(null, total)
849 })
850 })
851 }
852
853 function emptyStats () {
854 return {
855 blocks: 0,
856 size: 0,
857 downloadedBlocks: 0,
858 }
859 }
860 }
861
862 watchStats (path, opts) {
863 const self = this
864 var timer = setInterval(collectStats, (opts && opts.statsInveral) || 2000)
865 var collecting = false
866 var destroyed = false
867
868 const handle = new EventEmitter()
869 Object.assign(handle, {
870 destroy
871 })
872 return handle
873
874 function collectStats () {
875 if (collecting) return
876 collecting = true
877 self.stats(path, opts, (err, stats) => {
878 if (err) return destroy(err)
879 collecting = false
880 handle.stats = stats
881 handle.emit('update')
882 })
883 }
884
885 function destroy (err) {
886 handle.emit('destroy', err)
887 clearInterval(timer)
888 destroyed = true
889 }
890 }
891
892 download (path, opts, cb) {
893 if (typeof opts === 'function') {
894 cb = opts
895 opts = null
896 }
897 opts = opts || {}
898 const self = this
899 const ranges = new Map()
900 var pending = 0
901 var destroyed = false
902
903 const handle = new EventEmitter()
904 Object.assign(handle, {
905 destroy
906 })
907
908 self.stat(path, (err, stat, trie) => {
909 if (err) return destroy(err)
910 if (stat.isFile()) {
911 downloadFile(path, stat, trie, destroy)
912 } else {
913 const recursive = opts.recursive !== false
914 const noMounts = opts.noMounts !== false
915 const ite = statIterator(self, path, { recursive, noMounts, random: true })
916 downloadNext(ite)
917 }
918 })
919
920 return handle
921
922 function downloadNext (ite) {
923 if (destroyed) return
924 ite.next((err, info) => {
925 if (err) return destroy(err)
926 if (!info) {
927 if (!ranges.size) return destroy(null)
928 else return
929 }
930 const { path, stat, trie } = info
931 if (!stat.blocks || stat.mount || stat.isDirectory()) return downloadNext(ite)
932 downloadFile(path, stat, trie, err => {
933 if (err) return destroy(err)
934 return downloadNext(ite)
935 })
936 if (pending < ((opts && opts.maxConcurrent) || 50)) {
937 return downloadNext(ite)
938 }
939 })
940 }
941
942 function downloadFile (path, stat, trie, cb) {
943 pending++
944 self._getContent(trie.feed, (err, contentState) => {
945 if (err) return destroy(err)
946 const feed = contentState.feed
947 const range = feed.download({
948 start: stat.offset,
949 end: stat.offset + stat.blocks
950 }, err => {
951 pending--
952 if (err) return cb(err)
953 ranges.delete(path)
954 return cb(null)
955 })
956 ranges.set(path, { range, feed })
957 })
958 }
959
960 function destroy (err) {
961 if (destroyed) return null
962 destroyed = true
963 for (const [path, { feed, range }] of ranges) {
964 feed.undownload(range)
965 }
966 if (err) {
967 handle.emit('error', err)
968 if (cb) return cb(err)
969 } else {
970 handle.emit('finish')
971 if (cb) return cb(null)
972 }
973 }
974 }
975
976 watch (name, onchange) {
977 name = fixName(name)
978 return this.db.watch(name, onchange)
979 }
980
981 mount (path, key, opts, cb) {
982 if (typeof opts === 'function') return this.mount(path, key, null, opts)
983 const self = this
984
985 path = fixName(path)
986 opts = opts || {}
987
988 const statOpts = {
989 uid: opts.uid,
990 gid: opts.gid
991 }
992 statOpts.mount = {
993 key,
994 version: opts.version,
995 hash: opts.hash,
996 hypercore: !!opts.hypercore
997 }
998 statOpts.directory = !opts.hypercore
999
1000 if (opts.hypercore) {
1001 const core = this.corestore.get({
1002 key,
1003 ...opts,
1004 parents: [this.key],
1005 sparse: this.sparse
1006 })
1007 core.ready(err => {
1008 if (err) return cb(err)
1009 this.emit('content-feed', core)
1010 statOpts.size = core.byteLength
1011 statOpts.blocks = core.length
1012 return mountCore()
1013 })
1014 } else {
1015 return process.nextTick(mountTrie, null)
1016 }
1017
1018 function mountCore () {
1019 self._createStat(path, statOpts, (err, st) => {
1020 if (err) return cb(err)
1021 return self.db.put(path, st.encode(), cb)
1022 })
1023 }
1024
1025 function mountTrie () {
1026 self._createStat(path, statOpts, (err, st) => {
1027 if (err) return cb(err)
1028 self.db.mount(path, key, { ...opts, value: st.encode() }, err => {
1029 return self.db.loadMount(path, cb)
1030 })
1031 })
1032 }
1033 }
1034
1035 unmount (path, cb) {
1036 this.stat(path, (err, st) => {
1037 if (err) return cb(err)
1038 if (!st.mount) return cb(new Error('Can only unmount mounts.'))
1039 if (st.mount.hypercore) {
1040 return this.unlink(path, cb)
1041 } else {
1042 return this.db.unmount(path, cb)
1043 }
1044 })
1045 }
1046
1047 symlink (target, linkName, cb) {
1048 target = unixify(target)
1049 linkName = fixName(linkName)
1050
1051 this.lstat(linkName, (err, stat) => {
1052 if (err && (err.errno !== 2)) return cb(err)
1053 if (!err) return cb(new errors.PathAlreadyExists(linkName))
1054 const st = Stat.symlink({
1055 linkname: target
1056 })
1057 return this._putStat(linkName, st, cb)
1058 })
1059 }
1060
1061 createMountStream (opts) {
1062 return createMountStream(this, opts)
1063 }
1064
1065 getAllMounts (opts, cb) {
1066 if (typeof opts === 'function') return this.getAllMounts(null, opts)
1067 const mounts = new Map()
1068
1069 this.ready(err => {
1070 if (err) return cb(err)
1071 collect(this.createMountStream(opts), (err, mountList) => {
1072 if (err) return cb(err)
1073 for (const { path, metadata, content } of mountList) {
1074 mounts.set(path, { metadata, content })
1075 }
1076 return cb(null, mounts)
1077 })
1078 })
1079 }
1080
1081 extension (name, message) {
1082 this.metadata.extension(name, message)
1083 }
1084
1085 get peers () {
1086 return this.metadata.peers
1087 }
1088
1089 setMetadata (path, key, value, cb) {
1090 const metadata = {}
1091 metadata[key] = value
1092 this._update(path, { metadata }, cb)
1093 }
1094
1095 removeMetadata (path, key, cb) {
1096 const metadata = {}
1097 metadata[key] = null
1098 this._update(path, { metadata }, cb)
1099 }
1100
1101 copy (from, to, cb) {
1102 this.stat(from, (err, stat) => {
1103 if (err) return cb(err)
1104 this.create(to, stat, cb)
1105 })
1106 }
1107
1108 // Tag-related methods.
1109
1110 createTag (name, version, cb) {
1111 return this.tags.create(name, version, cb)
1112 }
1113
1114 getAllTags (cb) {
1115 return this.tags.getAll(cb)
1116 }
1117
1118 deleteTag (name, cb) {
1119 return this.tags.delete(name, cb)
1120 }
1121
1122 getTaggedVersion (name, cb) {
1123 return this.tags.get(name, cb)
1124 }
1125}
1126
1127function isObject (val) {
1128 return !!val && typeof val !== 'string' && !Buffer.isBuffer(val)
1129}
1130
1131function ifNotExists (oldNode, newNode, cb) {
1132 if (oldNode) return cb(new errors.PathAlreadyExists(oldNode.key))
1133 return cb(null, true)
1134}
1135
1136function fixName (name) {
1137 name = unixify(name)
1138 if (!name.startsWith('/')) name = '/' + name
1139 return name
1140}
1141
1142function noop () {}