UNPKG

34.9 kBJavaScriptView Raw
1const pathRoot = require('path')
2const path = pathRoot.posix || pathRoot
3const { EventEmitter } = require('events')
4
5const collect = require('stream-collector')
6const thunky = require('thunky')
7const ThunkyMap = require('thunky-map')
8const unixify = require('unixify')
9const duplexify = require('duplexify')
10const pump = require('pump')
11const { Transform } = require('streamx')
12
13const coreByteStream = require('hypercore-byte-stream')
14const Nanoresource = require('nanoresource/emitter')
15const HypercoreProtocol = require('hypercore-protocol')
16const MountableHypertrie = require('mountable-hypertrie')
17const Corestore = require('corestore')
18const { Stat } = require('hyperdrive-schemas')
19
20const createFileDescriptor = require('./lib/fd')
21const errors = require('./lib/errors')
22const defaultCorestore = require('./lib/storage')
23const TagManager = require('./lib/tagging')
24const HyperdrivePromises = require('./promises')
25const { contentKeyPair, contentOptions, ContentState } = require('./lib/content')
26const { statIterator, createStatStream, createMountStream, createReaddirStream, readdirIterator } = require('./lib/iterator')
27
28// 20 is arbitrary, just to make the fds > stdio etc
29const STDIO_CAP = 20
30const WRITE_STREAM_BLOCK_SIZE = 524288
31const NOOP_FILE_PATH = ' '
32
33module.exports = (...args) => new Hyperdrive(...args)
34module.exports.constants = require('filesystem-constants').linux
35
36class Hyperdrive extends Nanoresource {
37 constructor (storage, key, opts) {
38 super()
39
40 if (isObject(key)) {
41 opts = key
42 key = null
43 }
44 if (!opts) opts = {}
45
46 this.opts = opts
47 this.key = key
48 this.discoveryKey = null
49 this.live = true
50 this.sparse = opts.sparse !== false
51 this.sparseMetadata = opts.sparseMetadata !== false
52
53 this.promises = new HyperdrivePromises(this)
54
55 this._namespace = opts.namespace
56 this.corestore = defaultCorestore(storage, {
57 ...opts,
58 valueEncoding: 'binary',
59 // TODO: Support mixed sparsity.
60 sparse: this.sparse || this.sparseMetadata,
61 extensions: opts.extensions
62 })
63
64 if (this.corestore !== storage) this.corestore.on('error', err => this.emit('error', err))
65 if (opts.namespace) {
66 this.corestore = this.corestore.namespace(opts.namespace)
67 }
68
69 // Set in ready.
70 this.metadata = null
71 this.db = opts._db
72 this.tags = new TagManager(this)
73 this.isCheckout = !!this.db
74
75 this._contentStates = opts._contentStates || new ThunkyMap(this._contentStateFromMetadata.bind(this))
76 this._fds = []
77 this._writingFds = new Map()
78 this._unlistens = []
79 this._unmirror = null
80
81 this._metadataOpts = {
82 key,
83 sparse: this.sparseMetadata,
84 keyPair: opts.keyPair,
85 extensions: opts.extensions
86 }
87
88 this.ready(onReady)
89
90 const self = this
91
92 function onReady (err) {
93 if (err) return self.emit('error', err)
94 self.emit('ready')
95
96 if (self._contentStates.has(self.db.feed)) return
97 self._getContent(self.db.feed, noop)
98 }
99 }
100
101 get version () {
102 // TODO: The trie version starts at 1, so the empty hyperdrive version is also 1. This should be 0.
103 return this.db.version
104 }
105
106 get writable () {
107 return this.metadata.writable
108 }
109
110 get contentWritable () {
111 const contentState = this._contentStates.cache.get(this.db.feed)
112 if (!contentState) return false
113 return contentState.feed.writable
114 }
115
116 ready (cb) {
117 return this.open(cb)
118 }
119
120 _open (cb) {
121 const self = this
122 return this.corestore.ready(err => {
123 if (err) return cb(err)
124 this.metadata = this.corestore.default(this._metadataOpts)
125 this.db = this.db || new MountableHypertrie(this.corestore, this.key, {
126 feed: this.metadata,
127 sparse: this.sparseMetadata,
128 extension: this.opts.extension !== false
129 })
130 this.db.on('hypertrie', onhypertrie)
131 this.db.on('error', onerror)
132
133 self.metadata.on('error', onerror)
134 self.metadata.on('append', update)
135 self.metadata.on('extension', extension)
136 self.metadata.on('peer-add', peeradd)
137 self.metadata.on('peer-open', peeropen)
138 self.metadata.on('peer-remove', peerremove)
139
140 this._unlistens.push(() => {
141 self.db.removeListener('error', onerror)
142 self.db.removeListener('hypertrie', onhypertrie)
143 self.metadata.removeListener('error', onerror)
144 self.metadata.removeListener('append', update)
145 self.metadata.removeListener('extension', extension)
146 self.metadata.removeListener('peer-add', peeradd)
147 self.metadata.removeListener('peer-open', peeropen)
148 self.metadata.removeListener('peer-remove', peerremove)
149 })
150
151 return self.metadata.ready(err => {
152 if (err) return done(err)
153
154 /**
155 * TODO: Update comment to reflect mounts.
156 *
157 * If a db is provided as input, ensure that a contentFeed is also provided, then return (this is a checkout).
158 * If the metadata feed is writable:
159 * If the metadata feed has length 0, then the db should be initialized with the content feed key as metadata.
160 * Else, initialize the db without metadata and load the content feed key from the header.
161 * If the metadata feed is readable:
162 * Initialize the db without metadata and load the content feed key from the header.
163 */
164 if (self.metadata.writable && !self.metadata.length && !self.isCheckout) {
165 initialize()
166 } else {
167 restore()
168 }
169 })
170 })
171
172 /**
173 * The first time the hyperdrive is created, we initialize both the db (metadata feed) and the content feed here.
174 */
175 function initialize () {
176 self._contentStateFromKey(null, (err, contentState) => {
177 if (err) return done(err)
178 // warm up the thunky map
179 self._contentStates.cache.set(self.db.feed, contentState)
180 self.db.setMetadata(contentState.feed.key)
181 self.db.ready(err => {
182 if (err) return done(err)
183 return done(null)
184 })
185 })
186 }
187
188 /**
189 * If the hyperdrive has already been created, wait for the db (metadata feed) to load.
190 * If the metadata feed is writable, we can immediately load the content feed from its private key.
191 * (Otherwise, we need to read the feed's metadata block first)
192 */
193 function restore (keyPair) {
194 self.db.ready(err => {
195 if (err) return done(err)
196 self.metadata.has(0, (err, hasMetadataBlock) => {
197 if (err) return done(err)
198 if (hasMetadataBlock) self._getContent(self.db.feed, done)
199 else done(null)
200 })
201 })
202 }
203
204 function done (err) {
205 if (err) return cb(err)
206 self.key = self.metadata.key
207 self.discoveryKey = self.metadata.discoveryKey
208 return cb(null)
209 }
210
211 function onerror (err) {
212 if (err) self.emit('error', err)
213 return cb(err)
214 }
215
216 function update () {
217 self.emit('update')
218 }
219
220 function extension(name, message, peer) {
221 self.emit('extension', name, message, peer)
222 }
223
224 function peeradd(peer) {
225 self.emit('peer-add', peer)
226 }
227
228 function peeropen(peer) {
229 self.emit('peer-open', peer)
230 }
231
232 function peerremove(peer) {
233 self.emit('peer-remove', peer)
234 }
235
236 function onhypertrie (trie) {
237 self.emit('metadata-feed', trie.feed)
238 self.emit('mount', trie)
239 }
240 }
241
242 _getContent (metadata, cb) {
243 this._contentStates.get(metadata, cb)
244 }
245
246 _contentStateFromMetadata (metadata, cb) {
247 MountableHypertrie.getMetadata(metadata, (err, publicKey) => {
248 if (err) return cb(err)
249 this._contentStateFromKey(publicKey, cb)
250 })
251 }
252
253 _contentStateFromKey (publicKey, cb) {
254 const contentOpts = { key: publicKey, ...contentOptions(this), cache: { data: false } }
255 try {
256 var feed = this.corestore.get(contentOpts)
257 } catch (err) {
258 return cb(err)
259 }
260
261 const contentErrorListener = err => this.emit('error', err)
262 feed.on('error', contentErrorListener)
263 this._unlistens.push(() => feed.removeListener('error', contentErrorListener))
264 feed.ready(err => {
265 if (err) return cb(err)
266 this.emit('content-feed', feed)
267 return cb(null, new ContentState(feed))
268 })
269 }
270
271 _putStat (name, stat, opts, cb) {
272 if (typeof opts === 'function') return this._putStat(name, stat, null, opts)
273 try {
274 var encoded = stat.encode()
275 } catch (err) {
276 return cb(err)
277 }
278 this.db.put(name, encoded, opts, err => {
279 if (err) return cb(err)
280 return cb(null, stat)
281 })
282 }
283
284 _update (name, stat, cb) {
285 name = fixName(name)
286
287 this.db.get(name, (err, st) => {
288 if (err) return cb(err)
289 if (!st) return cb(new errors.FileNotFound(name))
290 try {
291 var decoded = Stat.decode(st.value)
292 } catch (err) {
293 return cb(err)
294 }
295 const oldMetadata = decoded.metadata
296 const newStat = Object.assign(decoded, stat)
297 if (stat.metadata) {
298 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
299 }
300 return this._putStat(name, newStat, { flags: st.flags }, cb)
301 })
302 }
303
304 _upsert (name, stat, cb) {
305 name = fixName(name)
306
307 this.db.get(name, (err, st) => {
308 if (err) return cb(err)
309 if (!st) {
310 var decoded = Stat.file()
311 }
312 else {
313 try {
314 var decoded = Stat.decode(st.value)
315 } catch (err) {
316 return cb(err)
317 }
318 }
319 const oldMetadata = decoded.metadata
320 const newStat = Object.assign(decoded, stat)
321 if (stat.metadata) {
322 newStat.metadata = Object.assign({}, oldMetadata || {}, stat.metadata)
323 }
324 return this._putStat(name, newStat, { flags: st ? st.flags : 0 }, cb)
325 })
326 }
327
328 getContent (cb) {
329 if (!this.db) return cb(null, null)
330 this._getContent(this.db.feed, (err, contentState) => {
331 if (err) return cb(err)
332 return cb(null, contentState.feed)
333 })
334 }
335
336 open (name, flags, cb) {
337 if (!name || typeof name === 'function') return super.open(name)
338 name = fixName(name)
339
340 this.ready(err => {
341 if (err) return cb(err)
342 createFileDescriptor(this, name, flags, (err, fd) => {
343 if (err) return cb(err)
344 cb(null, STDIO_CAP + (this._fds.push(fd) - 1) * 2)
345 })
346 })
347 }
348
349 read (fd, buf, offset, len, pos, cb) {
350 if (typeof pos === 'function') {
351 cb = pos
352 pos = null
353 }
354
355 const desc = this._fds[(fd - STDIO_CAP) / 2]
356 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
357 if (pos == null) pos = desc.position
358 desc.read(buf, offset, len, pos, cb)
359 }
360
361 write (fd, buf, offset, len, pos, cb) {
362 if (typeof pos === 'function') {
363 cb = pos
364 pos = null
365 }
366
367 const desc = this._fds[(fd - STDIO_CAP) / 2]
368 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
369 if (pos == null) pos = desc.position
370 desc.write(buf, offset, len, pos, cb)
371 }
372
373 createReadStream (name, opts) {
374 if (!opts) opts = {}
375 const self = this
376
377 name = fixName(name)
378
379 const length = typeof opts.end === 'number' ? 1 + opts.end - (opts.start || 0) : typeof opts.length === 'number' ? opts.length : -1
380 const stream = coreByteStream({
381 ...opts,
382 highWaterMark: opts.highWaterMark || 64 * 1024
383 })
384
385 this.ready(err => {
386 if (err) return stream.destroy(err)
387 return this.stat(name, { file: true }, (err, st, trie) => {
388 if (err) return stream.destroy(err)
389 if (st.mount && st.mount.hypercore) {
390 const feed = self.corestore.get({
391 key: st.mount.key,
392 sparse: self.sparse
393 })
394 return feed.ready(err => {
395 if (err) return stream.destroy(err)
396 st.blocks = feed.length
397 return oncontent(st, { feed })
398 })
399 }
400 return this._getContent(trie.feed, (err, contentState) => {
401 if (err) return stream.destroy(err)
402 return oncontent(st, contentState)
403 })
404 })
405 })
406
407 function oncontent (st, contentState) {
408 if (st.mount && st.mount.hypercore) {
409 var byteOffset = 0
410 var blockOffset = 0
411 var blockLength = st.blocks
412 } else {
413 blockOffset = st.offset
414 blockLength = st.blocks
415 byteOffset = opts.start ? st.byteOffset + opts.start : (length === -1 ? -1 : st.byteOffset)
416 }
417
418 const byteLength = length
419
420 stream.start({
421 feed: contentState.feed,
422 blockOffset,
423 blockLength,
424 byteOffset,
425 byteLength
426 })
427 }
428
429 return stream
430 }
431
432 createDiffStream (other, prefix, opts) {
433 if (other instanceof Hyperdrive) other = other.version
434 if (typeof prefix === 'object') return this.createDiffStream(other, '/', prefix)
435 prefix = prefix || '/'
436
437 const diffStream = this.db.createDiffStream(other, prefix, opts)
438 return pump(
439 diffStream,
440 new Transform({
441 transform (chunk, cb) {
442 const entry = { type: chunk.type, name: chunk.key }
443 if (chunk.left) entry.seq = chunk.left.seq
444 if (chunk.right) entry.previous = { seq: chunk.right.seq }
445 if (chunk.left && entry.type !== 'mount' && entry.type !== 'unmount') {
446 try {
447 entry.value = Stat.decode(chunk.left.value)
448 } catch (err) {
449 return cb(err)
450 }
451 } else if (chunk.left) {
452 entry.value = chunk.left.info
453 }
454 return cb(null, entry)
455 }
456 })
457 )
458 }
459
460 createDirectoryStream (name, opts) {
461 if (!opts) opts = {}
462 name = fixName(name)
463 return createReaddirStream(this, name, {
464 ...opts,
465 includeStats: true
466 })
467 }
468
469 createWriteStream (name, opts) {
470 if (!opts) opts = {}
471 name = fixName(name)
472
473 const self = this
474 var release
475
476 const proxy = duplexify()
477 proxy.setReadable(false)
478
479 // TODO: support piping through a "split" stream like rabin
480
481 this.ready(err => {
482 if (err) return proxy.destroy(err)
483 this.stat(name, { trie: true }, (err, stat, trie) => {
484 if (err && (err.errno !== 2)) return proxy.destroy(err)
485 this._getContent(trie.feed, (err, contentState) => {
486 if (err) return proxy.destroy(err)
487 if (opts.wait === false && contentState.isLocked()) return cb(new Error('Content is locked.'))
488 contentState.lock(_release => {
489 release = _release
490 append(contentState)
491 })
492 })
493 })
494 })
495
496 return proxy
497
498 function append (contentState) {
499 if (proxy.destroyed) return release()
500
501 const byteOffset = contentState.feed.byteLength
502 const offset = contentState.feed.length
503
504 self.emit('appending', name, opts)
505
506 // TODO: revert the content feed if this fails!!!! (add an option to the write stream for this (atomic: true))
507 const stream = contentState.feed.createWriteStream({ maxBlockSize: WRITE_STREAM_BLOCK_SIZE })
508
509 proxy.on('close', ondone)
510 proxy.on('finish', ondone)
511
512 proxy.setWritable(stream)
513 proxy.on('prefinish', function () {
514 const stat = Stat.file({
515 ...opts,
516 offset,
517 byteOffset,
518 size: contentState.feed.byteLength - byteOffset,
519 blocks: contentState.feed.length - offset
520 })
521 proxy.cork()
522 self._putStat(name, stat, function (err) {
523 if (err) return proxy.destroy(err)
524 self.emit('append', name, opts)
525 proxy.uncork()
526 })
527 })
528 }
529
530 function ondone () {
531 proxy.removeListener('close', ondone)
532 proxy.removeListener('finish', ondone)
533 release()
534 }
535 }
536
537 create (name, opts, cb) {
538 if (typeof opts === 'function') return this.create(name, null, opts)
539
540 name = fixName(name)
541
542 this.ready(err => {
543 if (err) return cb(err)
544 this.lstat(name, { file: true, trie: true }, (err, stat) => {
545 if (err && err.errno !== 2) return cb(err)
546 if (stat) return cb(null, stat)
547 const st = Stat.file(opts)
548 return this._putStat(name, st, cb)
549 })
550 })
551 }
552
553 readFile (name, opts, cb) {
554 if (typeof opts === 'function') return this.readFile(name, null, opts)
555 if (typeof opts === 'string') opts = { encoding: opts }
556 if (!opts) opts = {}
557
558 name = fixName(name)
559
560 collect(this.createReadStream(name, opts), function (err, bufs) {
561 if (err) return cb(err)
562 let buf = bufs.length === 1 ? bufs[0] : Buffer.concat(bufs)
563 cb(null, opts.encoding && opts.encoding !== 'binary' ? buf.toString(opts.encoding) : buf)
564 })
565 }
566
567 writeFile (name, buf, opts, cb) {
568 if (typeof opts === 'function') return this.writeFile(name, buf, null, opts)
569 if (typeof opts === 'string') opts = { encoding: opts }
570 if (!opts) opts = {}
571 if (typeof buf === 'string') buf = Buffer.from(buf, opts.encoding || 'utf-8')
572 if (!cb) cb = noop
573
574 name = fixName(name)
575
576 // Make sure that the buffer is chunked into blocks of 0.5MB.
577 let stream = this.createWriteStream(name, opts)
578
579 // TODO: Do we need to maintain the error state? What's triggering 'finish' after 'error'?
580 var errored = false
581
582 stream.on('error', err => {
583 errored = true
584 return cb(err)
585 })
586 stream.on('finish', () => {
587 if (!errored) return cb(null)
588 })
589 stream.end(buf)
590 }
591
592 truncate (name, size, cb) {
593 name = fixName(name)
594
595 this.lstat(name, { file: true, trie: true }, (err, st) => {
596 if (err && err.errno !== 2) return cb(err)
597 if (!st || !size) return this.create(name, cb)
598 if (size === st.size) return cb(null)
599 if (size < st.size) {
600 const readStream = this.createReadStream(name, { length: size })
601 const writeStream = this.createWriteStream(name)
602 return pump(readStream, writeStream, cb)
603 } else {
604 this.open(name, 'a', (err, fd) => {
605 if (err) return cb(err)
606 const length = size - st.size
607 this.write(fd, Buffer.alloc(length), 0, length, st.size, err => {
608 if (err) return cb(err)
609 this.close(fd, cb)
610 })
611 })
612 }
613 })
614 }
615
616 ftruncate (fd, size, cb) {
617 const desc = this._fds[(fd - STDIO_CAP) / 2]
618 if (!desc) return process.nextTick(cb, new errors.BadFileDescriptor(`Bad file descriptor: ${fd}`))
619 return desc.truncate(size, cb)
620 }
621
622 _createStat (name, opts, cb) {
623 const self = this
624
625 const statConstructor = (opts && opts.directory) ? Stat.directory : Stat.file
626 const shouldForce = !!opts.force
627
628 this.ready(err => {
629 if (err) return cb(err)
630 this.db.get(name, (err, node, trie) => {
631 if (err) return cb(err)
632 if (node && !shouldForce) return cb(new errors.PathAlreadyExists(name))
633 onexisting(node, trie)
634 })
635 })
636
637 function onexisting (node, trie) {
638 self.ready(err => {
639 if (err) return cb(err)
640 self._getContent(trie.feed, (err, contentState) => {
641 if (err) return cb(err)
642 const st = statConstructor({
643 ...opts,
644 offset: contentState.feed.length,
645 byteOffset: contentState.feed.byteLength
646 })
647 return cb(null, st)
648 })
649 })
650 }
651 }
652
653 mkdir (name, opts, cb) {
654 if (typeof opts === 'function') return this.mkdir(name, null, opts)
655 if (typeof opts === 'number') opts = { mode: opts }
656 if (!opts) opts = {}
657 if (!cb) cb = noop
658
659 name = fixName(name)
660 opts.directory = true
661
662 this._createStat(name, opts, (err, st) => {
663 if (err) return cb(err)
664 this._putStat(name, st, {
665 condition: ifNotExists
666 }, cb)
667 })
668 }
669
670 _statDirectory (name, opts, cb) {
671 const ite = this.db.iterator(name)
672 ite.next((err, st) => {
673 if (err) return cb(err)
674 if (name !== '/' && !st) return cb(new errors.FileNotFound(name))
675 if (name === '/') return cb(null, Stat.directory(), this.db)
676 const trie = st[MountableHypertrie.Symbols.TRIE]
677 const mount = st[MountableHypertrie.Symbols.MOUNT]
678 const innerPath = st[MountableHypertrie.Symbols.INNER_PATH]
679 try {
680 st = Stat.decode(st.value)
681 } catch (err) {
682 return cb(err)
683 }
684 const noMode = Object.assign({}, st, { mode: 0 })
685 return cb(null, Stat.directory(noMode), trie, mount, innerPath)
686 })
687 }
688
689 readlink (name, cb) {
690 this.lstat(name, function (err, st) {
691 if (err) return cb(err)
692 cb(null, st.linkname)
693 })
694 }
695
696 lstat (name, opts, cb) {
697 if (typeof opts === 'function') return this.lstat(name, null, opts)
698 if (!opts) opts = {}
699 const self = this
700 name = fixName(name)
701
702 this.ready(err => {
703 if (err) return cb(err)
704 this.db.get(name, opts, onstat)
705 })
706
707 function onstat (err, node, trie, mount, mountPath) {
708 if (err) return cb(err)
709 if (!node && opts.trie) return cb(null, null, trie, mount, mountPath)
710 if (!node && opts.file) return cb(new errors.FileNotFound(name))
711 if (!node) return self._statDirectory(name, opts, cb)
712 try {
713 var st = Stat.decode(node.value)
714 } catch (err) {
715 return cb(err)
716 }
717 const writingFd = self._writingFds.get(name)
718 if (writingFd) {
719 st.size = writingFd.stat.size
720 }
721 cb(null, st, trie, mount, mountPath)
722 }
723 }
724
725 stat (name, opts, cb) {
726 if (typeof opts === 'function') return this.stat(name, null, opts)
727 if (!opts) opts = {}
728
729 this.lstat(name, opts, (err, stat, trie, mount, mountPath) => {
730 if (err) return cb(err)
731 if (!stat) return cb(null, null, trie, name, mount, mountPath)
732 if (stat.linkname) {
733 if (path.isAbsolute(stat.linkname)) return this.stat(stat.linkname, opts, cb)
734 const relativeStat = path.resolve('/', path.dirname(name), stat.linkname)
735 return this.stat(relativeStat, opts, cb)
736 }
737 return cb(null, stat, trie, name, mount, mountPath)
738 })
739 }
740
741 info (name, cb) {
742 name = fixName(name)
743 const noopPath = path.join(name, NOOP_FILE_PATH)
744 this.stat(noopPath, { trie: true }, (err, stat, trie, _, mountInfo, mountPath) => {
745 if (err) return cb(err)
746 return cb(null, {
747 feed: trie.feed,
748 mountPath: fixName(noopPath.slice(0, noopPath.length - mountPath.length)),
749 mountInfo
750 })
751 })
752 }
753
754 access (name, opts, cb) {
755 if (typeof opts === 'function') return this.access(name, null, opts)
756 if (!opts) opts = {}
757 name = fixName(name)
758
759 this.stat(name, opts, err => {
760 cb(err)
761 })
762 }
763
764 exists (name, opts, cb) {
765 if (typeof opts === 'function') return this.exists(name, null, opts)
766 if (!opts) opts = {}
767
768 this.access(name, opts, err => {
769 cb(!err)
770 })
771 }
772
773 readdir (name, opts, cb) {
774 if (typeof opts === 'function') return this.readdir(name, null, opts)
775 name = fixName(name)
776
777 const readdirStream = createReaddirStream(this, name, opts)
778 return collect(readdirStream, (err, entries) => {
779 if (err) return cb(err)
780 return cb(null, entries)
781 })
782 }
783
784 _del (name, cb) {
785 this.ready(err => {
786 if (err) return cb(err)
787 this.db.del(name, (err, node) => {
788 if (err) return cb(err)
789 if (!node) return cb(new errors.FileNotFound(name))
790 return cb(null)
791 })
792 })
793 }
794
795 unlink (name, cb) {
796 name = fixName(name)
797 this._del(name, cb || noop)
798 }
799
800 rmdir (name, cb) {
801 if (!cb) cb = noop
802 name = fixName(name)
803 const self = this
804
805 const ite = readdirIterator(this, name)
806 ite.next((err, val) => {
807 if (err) return cb(err)
808 if (val) return cb(new errors.DirectoryNotEmpty(name))
809 self._del(name, cb)
810 })
811 }
812
813 replicate (isInitiator, opts) {
814 // support replicate({ initiator: bool }) also
815 if (typeof isInitiator === 'object' && isInitiator && !opts) {
816 opts = isInitiator
817 isInitiator = !!opts.initiator
818 }
819 const stream = (opts && opts.stream) || new HypercoreProtocol(isInitiator, { ...opts })
820 this.ready(err => {
821 if (err) return stream.destroy(err)
822 this.corestore.replicate(isInitiator, { ...opts, stream })
823 })
824 return stream
825 }
826
827 checkout (version, opts) {
828 opts = {
829 ...opts,
830 _db: this.db.checkout(version),
831 _contentStates: this._contentStates,
832 }
833 return new Hyperdrive(this.corestore, this.key, opts)
834 }
835
836 _closeFile (fd, cb) {
837 const idx = (fd - STDIO_CAP) / 2
838 const desc = this._fds[idx]
839 if (!desc) return process.nextTick(cb, new Error('Invalid file descriptor'))
840 this._fds[idx] = null
841 while (this._fds.length && !this._fds[this._fds.length - 1]) this._fds.pop()
842 desc.close(cb)
843 }
844
845 _close (cb) {
846 this.db.close(err => {
847 for (const unlisten of this._unlistens) {
848 unlisten()
849 }
850 this._unlistens = []
851 this.emit('close')
852 cb(err)
853 })
854 }
855
856 close (fd, cb) {
857 if (typeof fd === 'number') return this._closeFile(fd, cb || noop)
858 super.close(false, fd)
859 }
860
861 destroyStorage (cb) {
862 const metadata = this.db.feed
863 this._getContent(metadata, (err, contentState) => {
864 const content = contentState.feed
865 metadata.destroyStorage(() => {
866 content.destroyStorage(() => {
867 this.close(cb)
868 })
869 })
870 })
871 }
872
873 stats (path, opts, cb) {
874 if (typeof opts === 'function') return this.stats(path, null, opts)
875 const self = this
876 const stats = new Map()
877
878 this.stat(path, (err, stat, trie) => {
879 if (err) return cb(err)
880 if (stat.isFile()) {
881 return fileStats(path, cb)
882 } else {
883 const recursive = opts && (opts.recursive !== false)
884 const ite = statIterator(self, path, { recursive })
885 return ite.next(function loop (err, info) {
886 if (err) return cb(err)
887 if (!info) return cb(null, stats)
888 fileStats(info.path, (err, fileStats) => {
889 if (err && err.errno !== 2) return cb(err)
890 if (!fileStats) return ite.next(loop)
891 stats.set(info.path, fileStats)
892 return ite.next(loop)
893 })
894 })
895 }
896 })
897
898 function onstats (err, path, fileStats) {
899 if (err) return cb(err)
900 stats.set(path, fileStats)
901 }
902
903 function fileStats (path, cb) {
904 const total = emptyStats()
905 return self.stat(path, (err, stat, trie) => {
906 if (err) return cb(err)
907 return self._getContent(trie.feed, (err, contentState) => {
908 if (err) return cb(err)
909 contentState.feed.downloaded(stat.offset, stat.offset + stat.blocks, (err, downloadedBlocks) => {
910 if (err) return cb(err)
911 total.blocks = stat.blocks
912 total.size = stat.size
913 total.downloadedBlocks = downloadedBlocks
914 // TODO: This is not possible to implement now. Need a better byte length index in hypercore.
915 // total.downloadedBytes = 0
916 return cb(null, total)
917 })
918 })
919 })
920 }
921
922 function emptyStats () {
923 return {
924 blocks: 0,
925 size: 0,
926 downloadedBlocks: 0,
927 }
928 }
929 }
930
931 watchStats (path, opts) {
932 const self = this
933 var timer = setInterval(collectStats, (opts && opts.statsInveral) || 2000)
934 var collecting = false
935 var destroyed = false
936
937 const handle = new EventEmitter()
938 Object.assign(handle, {
939 destroy
940 })
941 return handle
942
943 function collectStats () {
944 if (collecting) return
945 collecting = true
946 self.stats(path, opts, (err, stats) => {
947 if (err) return destroy(err)
948 collecting = false
949 handle.stats = stats
950 handle.emit('update')
951 })
952 }
953
954 function destroy (err) {
955 handle.emit('destroy', err)
956 clearInterval(timer)
957 destroyed = true
958 }
959 }
960
961 mirror () {
962 const self = this
963 if (this._unmirror) return this._unmirror
964 const mirrorRanges = new Map()
965
966 this.on('content-feed', oncore)
967 this.on('metadata-feed', oncore)
968 this.getAllMounts({ content: true }, (err, mounts) => {
969 if (err) return this.emit('error', err)
970 for (const { metadata, content } of mounts.values()) {
971 oncore(metadata)
972 oncore(content)
973 }
974 })
975
976 this._unmirror = unmirror
977 return this._unmirror
978
979 function unmirror () {
980 if (!self._unmirror) return
981 self._unmirror = null
982 self.removeListener('content-feed', oncore)
983 self.removeListener('metadata-feed', oncore)
984 for (const [ core, range ] of mirrorRanges) {
985 core.undownload(range)
986 }
987 }
988
989 function oncore (core) {
990 if (!core) return
991 if (!self._unmirror || self._unmirror !== unmirror || mirrorRanges.has(core)) return
992 mirrorRanges.set(core, core.download({ start: 0, end: -1 }))
993 }
994 }
995
996 download (path, opts, cb) {
997 if (typeof opts === 'function') {
998 cb = opts
999 opts = null
1000 }
1001 opts = opts || {}
1002 if (!cb) cb = noop
1003
1004 const self = this
1005 const ranges = new Map()
1006 var pending = 0
1007 var destroyed = false
1008
1009 const handle = new EventEmitter()
1010 Object.assign(handle, {
1011 destroy
1012 })
1013
1014 self.stat(path, (err, stat, trie) => {
1015 if (err) return destroy(err)
1016 if (stat.isFile()) {
1017 downloadFile(path, stat, trie, destroy)
1018 } else {
1019 const recursive = opts.recursive !== false
1020 const noMounts = opts.noMounts !== false
1021 const ite = statIterator(self, path, { recursive, noMounts, random: true })
1022 downloadNext(ite)
1023 }
1024 })
1025
1026 return handle
1027
1028 function downloadNext (ite) {
1029 if (destroyed) return
1030 ite.next((err, info) => {
1031 if (err) return destroy(err)
1032 if (!info) {
1033 if (!ranges.size) return destroy(null)
1034 else return
1035 }
1036 const { path, stat, trie } = info
1037 if (!stat.blocks || stat.mount || stat.isDirectory()) return downloadNext(ite)
1038 downloadFile(path, stat, trie, err => {
1039 if (err) return destroy(err)
1040 return downloadNext(ite)
1041 })
1042 if (pending < ((opts && opts.maxConcurrent) || 50)) {
1043 return downloadNext(ite)
1044 }
1045 })
1046 }
1047
1048 function downloadFile (path, stat, trie, cb) {
1049 pending++
1050 self._getContent(trie.feed, (err, contentState) => {
1051 if (err) return destroy(err)
1052 const feed = contentState.feed
1053 const range = feed.download({
1054 start: stat.offset,
1055 end: stat.offset + stat.blocks
1056 }, err => {
1057 pending--
1058 if (err) return cb(err)
1059 ranges.delete(path)
1060 return cb(null)
1061 })
1062 ranges.set(path, { range, feed })
1063 })
1064 }
1065
1066 function destroy (err) {
1067 if (destroyed) return null
1068 destroyed = true
1069 for (const [path, { feed, range }] of ranges) {
1070 feed.undownload(range)
1071 }
1072 if (err) return cb(err)
1073 return cb(null)
1074 }
1075 }
1076
1077 watch (name, onchange) {
1078 name = fixName(name)
1079 return this.db.watch(name, onchange)
1080 }
1081
1082 mount (path, key, opts, cb) {
1083 if (typeof opts === 'function') return this.mount(path, key, null, opts)
1084 const self = this
1085
1086 path = fixName(path)
1087 opts = opts || {}
1088
1089 const statOpts = {
1090 uid: opts.uid,
1091 gid: opts.gid
1092 }
1093 statOpts.mount = {
1094 key,
1095 version: opts.version,
1096 hash: opts.hash,
1097 hypercore: !!opts.hypercore
1098 }
1099 statOpts.directory = !opts.hypercore
1100
1101 if (opts.hypercore) {
1102 const core = this.corestore.get({
1103 key,
1104 ...opts,
1105 parents: [this.key],
1106 sparse: this.sparse
1107 })
1108 core.ready(err => {
1109 if (err) return cb(err)
1110 this.emit('content-feed', core)
1111 statOpts.size = core.byteLength
1112 statOpts.blocks = core.length
1113 return mountCore()
1114 })
1115 } else {
1116 return process.nextTick(mountTrie, null)
1117 }
1118
1119 function mountCore () {
1120 self._createStat(path, statOpts, (err, st) => {
1121 if (err) return cb(err)
1122 return self.db.put(path, st.encode(), cb)
1123 })
1124 }
1125
1126 function mountTrie () {
1127 self._createStat(path, statOpts, (err, st) => {
1128 if (err) return cb(err)
1129 self.db.mount(path, key, { ...opts, value: st.encode() }, err => {
1130 return self.db.loadMount(path, cb)
1131 })
1132 })
1133 }
1134 }
1135
1136 unmount (path, cb) {
1137 this.stat(path, (err, st) => {
1138 if (err) return cb(err)
1139 if (!st.mount) return cb(new Error('Can only unmount mounts.'))
1140 if (st.mount.hypercore) {
1141 return this.unlink(path, cb)
1142 } else {
1143 return this.db.unmount(path, cb)
1144 }
1145 })
1146 }
1147
1148 symlink (target, linkName, cb) {
1149 target = unixify(target)
1150 linkName = fixName(linkName)
1151
1152 this.lstat(linkName, (err, stat) => {
1153 if (err && (err.errno !== 2)) return cb(err)
1154 if (!err) return cb(new errors.PathAlreadyExists(linkName))
1155 const st = Stat.symlink({
1156 linkname: target
1157 })
1158 return this._putStat(linkName, st, cb)
1159 })
1160 }
1161
1162 createMountStream (opts) {
1163 return createMountStream(this, opts)
1164 }
1165
1166 getAllMounts (opts, cb) {
1167 if (typeof opts === 'function') return this.getAllMounts(null, opts)
1168 const mounts = new Map()
1169
1170 this.ready(err => {
1171 if (err) return cb(err)
1172 collect(this.createMountStream(opts), (err, mountList) => {
1173 if (err) return cb(err)
1174 for (const { path, metadata, content } of mountList) {
1175 mounts.set(path, { metadata, content })
1176 }
1177 return cb(null, mounts)
1178 })
1179 })
1180 }
1181
1182 extension (name, message) {
1183 this.metadata.extension(name, message)
1184 }
1185
1186 registerExtension (name, handlers) {
1187 return this.metadata.registerExtension(name, handlers)
1188 }
1189
1190 get peers () {
1191 return this.metadata.peers
1192 }
1193
1194 setMetadata (path, key, value, cb) {
1195 const metadata = {}
1196 metadata[key] = value
1197 this._update(path, { metadata }, cb)
1198 }
1199
1200 removeMetadata (path, key, cb) {
1201 const metadata = {}
1202 metadata[key] = null
1203 this._update(path, { metadata }, cb)
1204 }
1205
1206 copy (from, to, cb) {
1207 this.stat(from, (err, stat) => {
1208 if (err) return cb(err)
1209 this.create(to, stat, cb)
1210 })
1211 }
1212
1213 // Tag-related methods.
1214
1215 createTag (name, version, cb) {
1216 return this.tags.create(name, version, cb)
1217 }
1218
1219 getAllTags (cb) {
1220 return this.tags.getAll(cb)
1221 }
1222
1223 deleteTag (name, cb) {
1224 return this.tags.delete(name, cb)
1225 }
1226
1227 getTaggedVersion (name, cb) {
1228 return this.tags.get(name, cb)
1229 }
1230}
1231
1232function isObject (val) {
1233 return !!val && typeof val !== 'string' && !Buffer.isBuffer(val)
1234}
1235
1236function ifNotExists (oldNode, newNode, cb) {
1237 if (oldNode) return cb(new errors.PathAlreadyExists(oldNode.key))
1238 return cb(null, true)
1239}
1240
1241function fixName (name) {
1242 name = unixify(name)
1243 if (!name.startsWith('/')) name = '/' + name
1244 return name
1245}
1246
1247function noop () {}