UNPKG

34 kBJavaScriptView Raw
1const pathRoot = require('path')
2const path = pathRoot.posix || pathRoot
3const { EventEmitter } = require('events')
4
5const collect = require('stream-collector')
6const thunky = require('thunky')
7const ThunkyMap = require('thunky-map')
8const unixify = require('unixify')
9const duplexify = require('duplexify')
10const through = require('through2')
11const pumpify = require('pumpify')
12const pump = require('pump')
13
14const coreByteStream = require('hypercore-byte-stream')
15const Nanoresource = require('nanoresource/emitter')
16const HypercoreProtocol = require('hypercore-protocol')
17const MountableHypertrie = require('mountable-hypertrie')
18const Corestore = require('corestore')
19const { Stat } = require('hyperdrive-schemas')
20
21const createFileDescriptor = require('./lib/fd')
22const errors = require('./lib/errors')
23const defaultCorestore = require('./lib/storage')
24const TagManager = require('./lib/tagging')
25const { contentKeyPair, contentOptions, ContentState } = require('./lib/content')
26const { statIterator, createStatStream, createMountStream, createReaddirStream, readdirIterator } = require('./lib/iterator')
27
28// 20 is arbitrary, just to make the fds > stdio etc
29const STDIO_CAP = 20
30const WRITE_STREAM_BLOCK_SIZE = 524288
31
32module.exports = (...args) => new Hyperdrive(...args)
33module.exports.constants = require('filesystem-constants').linux
34
35class Hyperdrive extends Nanoresource {
36 constructor (storage, key, opts) {
37 super()
38
39 if (isObject(key)) {
40 opts = key
41 key = null
42 }
43 if (!opts) opts = {}
44
45 this.opts = opts
46 this.key = key
47 this.discoveryKey = null
48 this.live = true
49 this.sparse = opts.sparse !== false
50 this.sparseMetadata = opts.sparseMetadata !== false
51
52 this._namespace = opts.namespace
53 this.corestore = defaultCorestore(storage, {
54 ...opts,
55 valueEncoding: 'binary',
56 // TODO: Support mixed sparsity.
57 sparse: this.sparse || this.sparseMetadata,
58 extensions: opts.extensions
59 })
60
61 if (this.corestore !== storage) this.corestore.on('error', err => this.emit('error', err))
62 if (opts.namespace) {
63 this.corestore = this.corestore.namespace(opts.namespace)
64 }
65
66 // Set in ready.
67 this.metadata = null
68 this.db = opts._db
69 this.tags = new TagManager(this)
70 this.isCheckout = !!this.db
71
72 this._contentStates = opts._contentStates || new ThunkyMap(this._contentStateFromMetadata.bind(this))
73 this._fds = []
74 this._writingFds = new Map()
75 this._unlistens = []
76 this._unmirror = null
77
78 this._metadataOpts = {
79 key,
80 sparse: this.sparseMetadata,
81 keyPair: opts.keyPair,
82 extensions: opts.extensions
83 }
84
85 this.ready(onReady)
86
87 const self = this
88
89 function onReady (err) {
90 if (err) return self.emit('error', err)
91 self.emit('ready')
92
93 if (self._contentStates.has(self.db.feed)) return
94 self._getContent(self.db.feed, noop)
95 }
96 }
97
98 get version () {
99 // TODO: The trie version starts at 1, so the empty hyperdrive version is also 1. This should be 0.
100 return this.db.version
101 }
102
103 get writable () {
104 return this.metadata.writable
105 }
106
107 get contentWritable () {
108 const contentState = this._contentStates.cache.get(this.db.feed)
109 if (!contentState) return false
110 return contentState.feed.writable
111 }
112
113 ready (cb) {
114 return this.open(cb)
115 }
116
117 _open (cb) {
118 const self = this
119 return this.corestore.ready(err => {
120 if (err) return cb(err)
121 this.metadata = this.corestore.default(this._metadataOpts)
122 this.metadata.ifAvailable.wait()
123 this.db = this.db || new MountableHypertrie(this.corestore, this.key, {
124 feed: this.metadata,
125 sparse: this.sparseMetadata
126 })
127 this.db.on('hypertrie', onhypertrie)
128 this.db.on('error', onerror)
129
130 self.metadata.on('error', onerror)
131 self.metadata.on('append', update)
132 self.metadata.on('extension', extension)
133 self.metadata.on('peer-add', peeradd)
134 self.metadata.on('peer-remove', peerremove)
135
136 this._unlistens.push(() => {
137 self.db.removeListener('error', onerror)
138 self.db.removeListener('hypertrie', onhypertrie)
139 self.metadata.removeListener('error', onerror)
140 self.metadata.removeListener('append', update)
141 self.metadata.removeListener('extension', extension)
142 self.metadata.removeListener('peer-add', peeradd)
143 self.metadata.removeListener('peer-remove', peerremove)
144 })
145
146 return self.metadata.ready(err => {
147 if (err) return done(err)
148
149 /**
150 * TODO: Update comment to reflect mounts.
151 *
152 * If a db is provided as input, ensure that a contentFeed is also provided, then return (this is a checkout).
153 * If the metadata feed is writable:
154 * If the metadata feed has length 0, then the db should be initialized with the content feed key as metadata.
155 * Else, initialize the db without metadata and load the content feed key from the header.
156 * If the metadata feed is readable:
157 * Initialize the db without metadata and load the content feed key from the header.
158 */
159 if (self.metadata.writable && !self.metadata.length && !self.isCheckout) {
160 initialize()
161 } else {
162 restore()
163 }
164 })
165 })
166
167 /**
168 * The first time the hyperdrive is created, we initialize both the db (metadata feed) and the content feed here.
169 */
170 function initialize () {
171 self._contentStateFromKey(null, (err, contentState) => {
172 if (err) return done(err)
173 // warm up the thunky map
174 self._contentStates.cache.set(self.db.feed, contentState)
175 self.db.setMetadata(contentState.feed.key)
176 self.db.ready(err => {
177 if (err) return done(err)
178 return done(null)
179 })
180 })
181 }
182
183 /**
184 * If the hyperdrive has already been created, wait for the db (metadata feed) to load.
185 * If the metadata feed is writable, we can immediately load the content feed from its private key.
186 * (Otherwise, we need to read the feed's metadata block first)
187 */
188 function restore (keyPair) {
189 self.db.ready(err => {
190 if (err) return done(err)
191 if (self.metadata.has(0)) self._getContent(self.db.feed, done)
192 else done(null)
193 })
194 }
195
196 function done (err) {
197 self.metadata.ifAvailable.continue()
198 if (err) return cb(err)
199 self.key = self.metadata.key
200 self.discoveryKey = self.metadata.discoveryKey
201 return cb(null)
202 }
203
204 function onerror (err) {
205 if (err) self.emit('error', err)
206 return cb(err)
207 }
208
209 function update () {
210 self.emit('update')
211 }
212
213 function extension(name, message, peer) {
214 self.emit('extension', name, message, peer)
215 }
216
217 function peeradd(peer) {
218 self.emit('peer-add', peer)
219 }
220
221 function peerremove(peer) {
222 self.emit('peer-remove', peer)
223 }
224
225 function onhypertrie (trie) {
226 self.emit('metadata-feed', trie.feed)
227 self.emit('mount', trie)
228 }
229 }
230
231 _getContent (metadata, cb) {
232 this._contentStates.get(metadata, cb)
233 }
234
235 _contentStateFromMetadata (metadata, cb) {
236 MountableHypertrie.getMetadata(metadata, (err, publicKey) => {
237 if (err) return cb(err)
238 this._contentStateFromKey(publicKey, cb)
239 })
240 }
241
242 _contentStateFromKey (publicKey, cb) {
243 const contentOpts = { key: publicKey, ...contentOptions(this), cache: { data: false } }
244 try {
245 var feed = this.corestore.get(contentOpts)
246 } catch (err) {
247 return cb(err)
248 }
249
250 const contentErrorListener = err => this.emit('error', err)
251 feed.on('error', contentErrorListener)
252 this._unlistens.push(() => feed.removeListener('error', contentErrorListener))
253 feed.ready(err => {
254 if (err) return cb(err)
255 this.emit('content-feed', feed)
256 return cb(null, new ContentState(feed))
257 })
258 }
259
260 _putStat (name, stat, opts, cb) {
261 if (typeof opts === 'function') return this._putStat(name, stat, null, opts)
262 try {
263 var encoded = stat.encode()
264 } catch (err) {
265 return cb(err)
266 }
267 this.db.put(name, encoded, opts, err => {
268 if (err) return cb(err)
269 return cb(null, stat)
270 })
271 }
272
273 _update (name, stat, cb) {
274 name = fixName(name)
275
276 this.db.get(name, (err, st) => {
277 if (err) return cb(err)
278 if (!st) return cb(new errors.FileNotFound(name))
279 try {
280 var decoded = Stat.decode(st.value)
281 } catch (err) {
282 return cb(err)
283 }
284 const oldMetadata = decoded.metadata
285 const newStat = Object.assign(decoded, stat)
286 if (stat.metadata) {
287 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
288 }
289 return this._putStat(name, newStat, { flags: st.flags }, cb)
290 })
291 }
292
293 _upsert (name, stat, cb) {
294 name = fixName(name)
295
296 this.db.get(name, (err, st) => {
297 if (err) return cb(err)
298 if (!st) {
299 var decoded = Stat.file()
300 }
301 else {
302 try {
303 var decoded = Stat.decode(st.value)
304 } catch (err) {
305 return cb(err)
306 }
307 }
308 const oldMetadata = decoded.metadata
309 const newStat = Object.assign(decoded, stat)
310 if (stat.metadata) {
311 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
312 }
313 return this._putStat(name, newStat, { flags: st ? st.flags : 0 }, cb)
314 })
315 }
316
317 getContent (cb) {
318 if (!this.db) return cb(null, null)
319 this._getContent(this.db.feed, (err, contentState) => {
320 if (err) return cb(err)
321 return cb(null, contentState.feed)
322 })
323 }
324
325 open (name, flags, cb) {
326 if (!name || typeof name === 'function') return super.open(name)
327 name = fixName(name)
328
329 this.ready(err => {
330 if (err) return cb(err)
331 createFileDescriptor(this, name, flags, (err, fd) => {
332 if (err) return cb(err)
333 cb(null, STDIO_CAP + (this._fds.push(fd) - 1) * 2)
334 })
335 })
336 }
337
338 read (fd, buf, offset, len, pos, cb) {
339 if (typeof pos === 'function') {
340 cb = pos
341 pos = null
342 }
343
344 const desc = this._fds[(fd - STDIO_CAP) / 2]
345 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
346 if (pos == null) pos = desc.position
347 desc.read(buf, offset, len, pos, cb)
348 }
349
350 write (fd, buf, offset, len, pos, cb) {
351 if (typeof pos === 'function') {
352 cb = pos
353 pos = null
354 }
355
356 const desc = this._fds[(fd - STDIO_CAP) / 2]
357 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
358 if (pos == null) pos = desc.position
359 desc.write(buf, offset, len, pos, cb)
360 }
361
362 createReadStream (name, opts) {
363 if (!opts) opts = {}
364 const self = this
365
366 name = fixName(name)
367
368 const length = typeof opts.end === 'number' ? 1 + opts.end - (opts.start || 0) : typeof opts.length === 'number' ? opts.length : -1
369 const stream = coreByteStream({
370 ...opts,
371 highWaterMark: opts.highWaterMark || 64 * 1024
372 })
373
374 this.ready(err => {
375 if (err) return stream.destroy(err)
376 return this.stat(name, { file: true }, (err, st, trie) => {
377 if (err) return stream.destroy(err)
378 if (st.mount && st.mount.hypercore) {
379 const feed = self.corestore.get({
380 key: st.mount.key,
381 sparse: self.sparse
382 })
383 return feed.ready(err => {
384 if (err) return stream.destroy(err)
385 st.blocks = feed.length
386 return oncontent(st, { feed })
387 })
388 }
389 return this._getContent(trie.feed, (err, contentState) => {
390 if (err) return stream.destroy(err)
391 return oncontent(st, contentState)
392 })
393 })
394 })
395
396 function oncontent (st, contentState) {
397 if (st.mount && st.mount.hypercore) {
398 var byteOffset = 0
399 var blockOffset = 0
400 var blockLength = st.blocks
401 } else {
402 blockOffset = st.offset
403 blockLength = st.blocks
404 byteOffset = opts.start ? st.byteOffset + opts.start : (length === -1 ? -1 : st.byteOffset)
405 }
406
407 const byteLength = length
408
409 stream.start({
410 feed: contentState.feed,
411 blockOffset,
412 blockLength,
413 byteOffset,
414 byteLength
415 })
416 }
417
418 return stream
419 }
420
421 createDiffStream (other, prefix, opts) {
422 if (other instanceof Hyperdrive) other = other.version
423 if (typeof prefix === 'object') return this.createDiffStream(other, '/', prefix)
424 prefix = prefix || '/'
425
426 const diffStream = this.db.createDiffStream(other, prefix, opts)
427 return pumpify.obj(
428 diffStream,
429 through.obj((chunk, enc, cb) => {
430 const entry = { type: chunk.type, name: chunk.key }
431 if (chunk.left) entry.seq = chunk.left.seq
432 if (chunk.right) entry.previous = { seq: chunk.right.seq }
433 if (chunk.left && entry.type !== 'mount' && entry.type !== 'unmount') {
434 try {
435 entry.value = Stat.decode(chunk.left.value)
436 } catch (err) {
437 return cb(err)
438 }
439 } else if (chunk.left) {
440 entry.value = chunk.left.info
441 }
442 return cb(null, entry)
443 })
444 )
445 }
446
447 createDirectoryStream (name, opts) {
448 if (!opts) opts = {}
449 name = fixName(name)
450 return createStatStream(this, name, opts)
451 }
452
453 createWriteStream (name, opts) {
454 if (!opts) opts = {}
455 name = fixName(name)
456
457 const self = this
458 var release
459
460 const proxy = duplexify()
461 proxy.setReadable(false)
462
463 // TODO: support piping through a "split" stream like rabin
464
465 this.ready(err => {
466 if (err) return proxy.destroy(err)
467 this.stat(name, { trie: true }, (err, stat, trie) => {
468 if (err && (err.errno !== 2)) return proxy.destroy(err)
469 this._getContent(trie.feed, (err, contentState) => {
470 if (err) return proxy.destroy(err)
471 if (opts.wait === false && contentState.isLocked()) return cb(new Error('Content is locked.'))
472 contentState.lock(_release => {
473 release = _release
474 append(contentState)
475 })
476 })
477 })
478 })
479
480 return proxy
481
482 function append (contentState) {
483 if (proxy.destroyed) return release()
484
485 const byteOffset = contentState.feed.byteLength
486 const offset = contentState.feed.length
487
488 self.emit('appending', name, opts)
489
490 // TODO: revert the content feed if this fails!!!! (add an option to the write stream for this (atomic: true))
491 const stream = contentState.feed.createWriteStream({ maxBlockSize: WRITE_STREAM_BLOCK_SIZE })
492
493 proxy.on('close', ondone)
494 proxy.on('finish', ondone)
495
496 proxy.setWritable(stream)
497 proxy.on('prefinish', function () {
498 const stat = Stat.file({
499 ...opts,
500 offset,
501 byteOffset,
502 size: contentState.feed.byteLength - byteOffset,
503 blocks: contentState.feed.length - offset
504 })
505 proxy.cork()
506 self._putStat(name, stat, function (err) {
507 if (err) return proxy.destroy(err)
508 self.emit('append', name, opts)
509 proxy.uncork()
510 })
511 })
512 }
513
514 function ondone () {
515 proxy.removeListener('close', ondone)
516 proxy.removeListener('finish', ondone)
517 release()
518 }
519 }
520
521 create (name, opts, cb) {
522 if (typeof opts === 'function') return this.create(name, null, opts)
523
524 name = fixName(name)
525
526 this.ready(err => {
527 if (err) return cb(err)
528 this.lstat(name, { file: true, trie: true }, (err, stat) => {
529 if (err && err.errno !== 2) return cb(err)
530 if (stat) return cb(null, stat)
531 const st = Stat.file(opts)
532 return this._putStat(name, st, cb)
533 })
534 })
535 }
536
537 readFile (name, opts, cb) {
538 if (typeof opts === 'function') return this.readFile(name, null, opts)
539 if (typeof opts === 'string') opts = { encoding: opts }
540 if (!opts) opts = {}
541
542 name = fixName(name)
543
544 collect(this.createReadStream(name, opts), function (err, bufs) {
545 if (err) return cb(err)
546 let buf = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs)
547 cb(null, opts.encoding && opts.encoding !== 'binary' ? buf.toString(opts.encoding) : buf)
548 })
549 }
550
551 writeFile (name, buf, opts, cb) {
552 if (typeof opts === 'function') return this.writeFile(name, buf, null, opts)
553 if (typeof opts === 'string') opts = { encoding: opts }
554 if (!opts) opts = {}
555 if (typeof buf === 'string') buf = Buffer.from(buf, opts.encoding || 'utf-8')
556 if (!cb) cb = noop
557
558 name = fixName(name)
559
560 // Make sure that the buffer is chunked into blocks of 0.5MB.
561 let stream = this.createWriteStream(name, opts)
562
563 // TODO: Do we need to maintain the error state? What's triggering 'finish' after 'error'?
564 var errored = false
565
566 stream.on('error', err => {
567 errored = true
568 return cb(err)
569 })
570 stream.on('finish', () => {
571 if (!errored) return cb(null)
572 })
573 stream.end(buf)
574 }
575
576 truncate (name, size, cb) {
577 name = fixName(name)
578
579 this.lstat(name, { file: true, trie: true }, (err, st) => {
580 if (err && err.errno !== 2) return cb(err)
581 if (!st || !size) return this.create(name, cb)
582 if (size === st.size) return cb(null)
583 if (size < st.size) {
584 const readStream = this.createReadStream(name, { length: size })
585 const writeStream = this.createWriteStream(name)
586 return pump(readStream, writeStream, cb)
587 } else {
588 this.open(name, 'a', (err, fd) => {
589 if (err) return cb(err)
590 const length = size - st.size
591 this.write(fd, Buffer.alloc(length), 0, length, st.size, err => {
592 if (err) return cb(err)
593 this.close(fd, cb)
594 })
595 })
596 }
597 })
598 }
599
600 ftruncate (fd, size, cb) {
601 const desc = this._fds[(fd - STDIO_CAP) / 2]
602 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
603 return desc.truncate(size, cb)
604 }
605
606 _createStat (name, opts, cb) {
607 const self = this
608
609 const statConstructor = (opts && opts.directory) ? Stat.directory : Stat.file
610 const shouldForce = !!opts.force
611
612 this.ready(err => {
613 if (err) return cb(err)
614 this.db.get(name, (err, node, trie) => {
615 if (err) return cb(err)
616 if (node && !shouldForce) return cb(new errors.PathAlreadyExists(name))
617 onexisting(node, trie)
618 })
619 })
620
621 function onexisting (node, trie) {
622 self.ready(err => {
623 if (err) return cb(err)
624 self._getContent(trie.feed, (err, contentState) => {
625 if (err) return cb(err)
626 const st = statConstructor({
627 ...opts,
628 offset: contentState.feed.length,
629 byteOffset: contentState.feed.byteLength
630 })
631 return cb(null, st)
632 })
633 })
634 }
635 }
636
637 mkdir (name, opts, cb) {
638 if (typeof opts === 'function') return this.mkdir(name, null, opts)
639 if (typeof opts === 'number') opts = { mode: opts }
640 if (!opts) opts = {}
641 if (!cb) cb = noop
642
643 name = fixName(name)
644 opts.directory = true
645
646 this._createStat(name, opts, (err, st) => {
647 if (err) return cb(err)
648 this._putStat(name, st, {
649 condition: ifNotExists
650 }, cb)
651 })
652 }
653
654 _statDirectory (name, opts, cb) {
655 const ite = this.db.iterator(name)
656 ite.next((err, st) => {
657 if (err) return cb(err)
658 if (name !== '/' && !st) return cb(new errors.FileNotFound(name))
659 if (name === '/') return cb(null, Stat.directory(), this.db)
660 const trie = st[MountableHypertrie.Symbols.TRIE]
661 const mount = st[MountableHypertrie.Symbols.MOUNT]
662 const innerPath = st[MountableHypertrie.Symbols.INNER_PATH]
663 try {
664 st = Stat.decode(st.value)
665 } catch (err) {
666 return cb(err)
667 }
668 const noMode = Object.assign({}, st, { mode: 0 })
669 return cb(null, Stat.directory(noMode), trie, mount, innerPath)
670 })
671 }
672
673 readlink (name, cb) {
674 this.lstat(name, function (err, st) {
675 if (err) return cb(err)
676 cb(null, st.linkname)
677 })
678 }
679
680 lstat (name, opts, cb) {
681 if (typeof opts === 'function') return this.lstat(name, null, opts)
682 if (!opts) opts = {}
683 const self = this
684 name = fixName(name)
685
686 this.ready(err => {
687 if (err) return cb(err)
688 this.db.get(name, opts, onstat)
689 })
690
691 function onstat (err, node, trie, mount, mountPath) {
692 if (err) return cb(err)
693 if (!node && opts.trie) return cb(null, null, trie, mount, mountPath)
694 if (!node && opts.file) return cb(new errors.FileNotFound(name))
695 if (!node) return self._statDirectory(name, opts, cb)
696 try {
697 var st = Stat.decode(node.value)
698 } catch (err) {
699 return cb(err)
700 }
701 const writingFd = self._writingFds.get(name)
702 if (writingFd) {
703 st.size = writingFd.stat.size
704 }
705 cb(null, st, trie, mount, mountPath)
706 }
707 }
708
709 stat (name, opts, cb) {
710 if (typeof opts === 'function') return this.stat(name, null, opts)
711 if (!opts) opts = {}
712
713 this.lstat(name, opts, (err, stat, trie, mount, mountPath) => {
714 if (err) return cb(err)
715 if (!stat) return cb(null, null, trie, name, mount, mountPath)
716 if (stat.linkname) {
717 if (path.isAbsolute(stat.linkname)) return this.stat(stat.linkname, opts, cb)
718 const relativeStat = path.resolve('/', path.dirname(name), stat.linkname)
719 return this.stat(relativeStat, opts, cb)
720 }
721 return cb(null, stat, trie, name, mount, mountPath)
722 })
723 }
724
725 access (name, opts, cb) {
726 if (typeof opts === 'function') return this.access(name, null, opts)
727 if (!opts) opts = {}
728 name = fixName(name)
729
730 this.stat(name, opts, err => {
731 cb(err)
732 })
733 }
734
735 exists (name, opts, cb) {
736 if (typeof opts === 'function') return this.exists(name, null, opts)
737 if (!opts) opts = {}
738
739 this.access(name, opts, err => {
740 cb(!err)
741 })
742 }
743
744 readdir (name, opts, cb) {
745 if (typeof opts === 'function') return this.readdir(name, null, opts)
746 name = fixName(name)
747
748 const readdirStream = createReaddirStream(this, name, opts)
749 return collect(readdirStream, (err, entries) => {
750 if (err) return cb(err)
751 return cb(null, entries)
752 })
753 }
754
755 _del (name, cb) {
756 this.ready(err => {
757 if (err) return cb(err)
758 this.db.del(name, (err, node) => {
759 if (err) return cb(err)
760 if (!node) return cb(new errors.FileNotFound(name))
761 return cb(null)
762 })
763 })
764 }
765
766 unlink (name, cb) {
767 name = fixName(name)
768 this._del(name, cb || noop)
769 }
770
771 rmdir (name, cb) {
772 if (!cb) cb = noop
773 name = fixName(name)
774 const self = this
775
776 const ite = readdirIterator(this, name)
777 ite.next((err, val) => {
778 if (err) return cb(err)
779 if (val) return cb(new errors.DirectoryNotEmpty(name))
780 self._del(name, cb)
781 })
782 }
783
784 replicate (isInitiator, opts) {
785 // support replicate({ initiator: bool }) also
786 if (typeof isInitiator === 'object' && isInitiator && !opts) {
787 opts = isInitiator
788 isInitiator = !!opts.initiator
789 }
790 const stream = (opts && opts.stream) || new HypercoreProtocol(isInitiator, { ...opts })
791 this.ready(err => {
792 if (err) return stream.destroy(err)
793 this.corestore.replicate(isInitiator, { ...opts, stream })
794 })
795 return stream
796 }
797
798 checkout (version, opts) {
799 opts = {
800 ...opts,
801 _db: this.db.checkout(version),
802 _contentStates: this._contentStates,
803 }
804 return new Hyperdrive(this.corestore, this.key, opts)
805 }
806
807 _closeFile (fd, cb) {
808 const idx = (fd - STDIO_CAP) / 2
809 const desc = this._fds[idx]
810 if (!desc) return process.nextTick(cb, new Error('Invalid file descriptor'))
811 this._fds[idx] = null
812 while (this._fds.length && !this._fds[this._fds.length - 1]) this._fds.pop()
813 desc.close(cb)
814 }
815
816 _close (cb) {
817 this.db.close(err => {
818 for (const unlisten of this._unlistens) {
819 unlisten()
820 }
821 this._unlistens = []
822 this.emit('close')
823 cb(err)
824 })
825 }
826
827 close (fd, cb) {
828 if (typeof fd === 'number') return this._closeFile(fd, cb || noop)
829 super.close(false, fd)
830 }
831
832 destroyStorage (cb) {
833 const metadata = this.db.feed
834 this._getContent(metadata, (err, contentState) => {
835 const content = contentState.feed
836 metadata.destroyStorage(() => {
837 content.destroyStorage(() => {
838 this.close(cb)
839 })
840 })
841 })
842 }
843
844 stats (path, opts, cb) {
845 if (typeof opts === 'function') return this.stats(path, null, opts)
846 const self = this
847 const stats = new Map()
848
849 this.stat(path, (err, stat, trie) => {
850 if (err) return cb(err)
851 if (stat.isFile()) {
852 return fileStats(path, cb)
853 } else {
854 const recursive = opts && (opts.recursive !== false)
855 const ite = statIterator(self, path, { recursive })
856 return ite.next(function loop (err, info) {
857 if (err) return cb(err)
858 if (!info) return cb(null, stats)
859 fileStats(info.path, (err, fileStats) => {
860 if (err && err.errno !== 2) return cb(err)
861 if (!fileStats) return ite.next(loop)
862 stats.set(info.path, fileStats)
863 return ite.next(loop)
864 })
865 })
866 }
867 })
868
869 function onstats (err, path, fileStats) {
870 if (err) return cb(err)
871 stats.set(path, fileStats)
872 }
873
874 function fileStats (path, cb) {
875 const total = emptyStats()
876 return self.stat(path, (err, stat, trie) => {
877 if (err) return cb(err)
878 return self._getContent(trie.feed, (err, contentState) => {
879 if (err) return cb(err)
880 const downloadedBlocks = contentState.feed.downloaded(stat.offset, stat.offset + stat.blocks)
881 total.blocks = stat.blocks
882 total.size = stat.size
883 total.downloadedBlocks = downloadedBlocks
884 // TODO: This is not possible to implement now. Need a better byte length index in hypercore.
885 // total.downloadedBytes = 0
886 return cb(null, total)
887 })
888 })
889 }
890
891 function emptyStats () {
892 return {
893 blocks: 0,
894 size: 0,
895 downloadedBlocks: 0,
896 }
897 }
898 }
899
900 watchStats (path, opts) {
901 const self = this
902 var timer = setInterval(collectStats, (opts && opts.statsInveral) || 2000)
903 var collecting = false
904 var destroyed = false
905
906 const handle = new EventEmitter()
907 Object.assign(handle, {
908 destroy
909 })
910 return handle
911
912 function collectStats () {
913 if (collecting) return
914 collecting = true
915 self.stats(path, opts, (err, stats) => {
916 if (err) return destroy(err)
917 collecting = false
918 handle.stats = stats
919 handle.emit('update')
920 })
921 }
922
923 function destroy (err) {
924 handle.emit('destroy', err)
925 clearInterval(timer)
926 destroyed = true
927 }
928 }
929
930 mirror () {
931 const self = this
932 if (this._unmirror) return this._unmirror
933 const mirrorRanges = new Map()
934
935 this.on('content-feed', oncore)
936 this.on('metadata-feed', oncore)
937 this.getAllMounts({ content: true }, (err, mounts) => {
938 if (err) return this.emit('error', err)
939 for (const { metadata, content } of mounts.values()) {
940 oncore(metadata)
941 oncore(content)
942 }
943 })
944
945 this._unmirror = unmirror
946 return this._unmirror
947
948 function unmirror () {
949 if (!self._unmirror) return
950 self._unmirror = null
951 self.removeListener('content-feed', oncore)
952 self.removeListener('metadata-feed', oncore)
953 for (const [ core, range ] of mirrorRanges) {
954 core.undownload(range)
955 }
956 }
957
958 function oncore (core) {
959 if (!core) return
960 if (!self._unmirror || self._unmirror !== unmirror || mirrorRanges.has(core)) return
961 mirrorRanges.set(core, core.download({ start: 0, end: -1 }))
962 }
963 }
964
965 download (path, opts, cb) {
966 if (typeof opts === 'function') {
967 cb = opts
968 opts = null
969 }
970 opts = opts || {}
971 const self = this
972 const ranges = new Map()
973 var pending = 0
974 var destroyed = false
975
976 const handle = new EventEmitter()
977 Object.assign(handle, {
978 destroy
979 })
980
981 self.stat(path, (err, stat, trie) => {
982 if (err) return destroy(err)
983 if (stat.isFile()) {
984 downloadFile(path, stat, trie, destroy)
985 } else {
986 const recursive = opts.recursive !== false
987 const noMounts = opts.noMounts !== false
988 const ite = statIterator(self, path, { recursive, noMounts, random: true })
989 downloadNext(ite)
990 }
991 })
992
993 return handle
994
995 function downloadNext (ite) {
996 if (destroyed) return
997 ite.next((err, info) => {
998 if (err) return destroy(err)
999 if (!info) {
1000 if (!ranges.size) return destroy(null)
1001 else return
1002 }
1003 const { path, stat, trie } = info
1004 if (!stat.blocks || stat.mount || stat.isDirectory()) return downloadNext(ite)
1005 downloadFile(path, stat, trie, err => {
1006 if (err) return destroy(err)
1007 return downloadNext(ite)
1008 })
1009 if (pending < ((opts && opts.maxConcurrent) || 50)) {
1010 return downloadNext(ite)
1011 }
1012 })
1013 }
1014
1015 function downloadFile (path, stat, trie, cb) {
1016 pending++
1017 self._getContent(trie.feed, (err, contentState) => {
1018 if (err) return destroy(err)
1019 const feed = contentState.feed
1020 const range = feed.download({
1021 start: stat.offset,
1022 end: stat.offset + stat.blocks
1023 }, err => {
1024 pending--
1025 if (err) return cb(err)
1026 ranges.delete(path)
1027 return cb(null)
1028 })
1029 ranges.set(path, { range, feed })
1030 })
1031 }
1032
1033 function destroy (err) {
1034 if (destroyed) return null
1035 destroyed = true
1036 for (const [path, { feed, range }] of ranges) {
1037 feed.undownload(range)
1038 }
1039 if (err) {
1040 handle.emit('error', err)
1041 if (cb) return cb(err)
1042 } else {
1043 handle.emit('finish')
1044 if (cb) return cb(null)
1045 }
1046 }
1047 }
1048
1049 watch (name, onchange) {
1050 name = fixName(name)
1051 return this.db.watch(name, onchange)
1052 }
1053
1054 mount (path, key, opts, cb) {
1055 if (typeof opts === 'function') return this.mount(path, key, null, opts)
1056 const self = this
1057
1058 path = fixName(path)
1059 opts = opts || {}
1060
1061 const statOpts = {
1062 uid: opts.uid,
1063 gid: opts.gid
1064 }
1065 statOpts.mount = {
1066 key,
1067 version: opts.version,
1068 hash: opts.hash,
1069 hypercore: !!opts.hypercore
1070 }
1071 statOpts.directory = !opts.hypercore
1072
1073 if (opts.hypercore) {
1074 const core = this.corestore.get({
1075 key,
1076 ...opts,
1077 parents: [this.key],
1078 sparse: this.sparse
1079 })
1080 core.ready(err => {
1081 if (err) return cb(err)
1082 this.emit('content-feed', core)
1083 statOpts.size = core.byteLength
1084 statOpts.blocks = core.length
1085 return mountCore()
1086 })
1087 } else {
1088 return process.nextTick(mountTrie, null)
1089 }
1090
1091 function mountCore () {
1092 self._createStat(path, statOpts, (err, st) => {
1093 if (err) return cb(err)
1094 return self.db.put(path, st.encode(), cb)
1095 })
1096 }
1097
1098 function mountTrie () {
1099 self._createStat(path, statOpts, (err, st) => {
1100 if (err) return cb(err)
1101 self.db.mount(path, key, { ...opts, value: st.encode() }, err => {
1102 return self.db.loadMount(path, cb)
1103 })
1104 })
1105 }
1106 }
1107
1108 unmount (path, cb) {
1109 this.stat(path, (err, st) => {
1110 if (err) return cb(err)
1111 if (!st.mount) return cb(new Error('Can only unmount mounts.'))
1112 if (st.mount.hypercore) {
1113 return this.unlink(path, cb)
1114 } else {
1115 return this.db.unmount(path, cb)
1116 }
1117 })
1118 }
1119
1120 symlink (target, linkName, cb) {
1121 target = unixify(target)
1122 linkName = fixName(linkName)
1123
1124 this.lstat(linkName, (err, stat) => {
1125 if (err && (err.errno !== 2)) return cb(err)
1126 if (!err) return cb(new errors.PathAlreadyExists(linkName))
1127 const st = Stat.symlink({
1128 linkname: target
1129 })
1130 return this._putStat(linkName, st, cb)
1131 })
1132 }
1133
1134 createMountStream (opts) {
1135 return createMountStream(this, opts)
1136 }
1137
1138 getAllMounts (opts, cb) {
1139 if (typeof opts === 'function') return this.getAllMounts(null, opts)
1140 const mounts = new Map()
1141
1142 this.ready(err => {
1143 if (err) return cb(err)
1144 collect(this.createMountStream(opts), (err, mountList) => {
1145 if (err) return cb(err)
1146 for (const { path, metadata, content } of mountList) {
1147 mounts.set(path, { metadata, content })
1148 }
1149 return cb(null, mounts)
1150 })
1151 })
1152 }
1153
1154 extension (name, message) {
1155 this.metadata.extension(name, message)
1156 }
1157
1158 get peers () {
1159 return this.metadata.peers
1160 }
1161
1162 setMetadata (path, key, value, cb) {
1163 const metadata = {}
1164 metadata[key] = value
1165 this._update(path, { metadata }, cb)
1166 }
1167
1168 removeMetadata (path, key, cb) {
1169 const metadata = {}
1170 metadata[key] = null
1171 this._update(path, { metadata }, cb)
1172 }
1173
1174 copy (from, to, cb) {
1175 this.stat(from, (err, stat) => {
1176 if (err) return cb(err)
1177 this.create(to, stat, cb)
1178 })
1179 }
1180
1181 // Tag-related methods.
1182
1183 createTag (name, version, cb) {
1184 return this.tags.create(name, version, cb)
1185 }
1186
1187 getAllTags (cb) {
1188 return this.tags.getAll(cb)
1189 }
1190
1191 deleteTag (name, cb) {
1192 return this.tags.delete(name, cb)
1193 }
1194
1195 getTaggedVersion (name, cb) {
1196 return this.tags.get(name, cb)
1197 }
1198}
1199
1200function isObject (val) {
1201 return !!val && typeof val !== 'string' && !Buffer.isBuffer(val)
1202}
1203
1204function ifNotExists (oldNode, newNode, cb) {
1205 if (oldNode) return cb(new errors.PathAlreadyExists(oldNode.key))
1206 return cb(null, true)
1207}
1208
1209function fixName (name) {
1210 name = unixify(name)
1211 if (!name.startsWith('/')) name = '/' + name
1212 return name
1213}
1214
1215function noop () {}