UNPKG

9.43 kBJavaScriptView Raw
1const byteStream = require('byte-stream')
2const through = require('through2')
3const pumpify = require('pumpify')
4
5const errors = require('./errors')
6
7const { linux: linuxConstants, parse } = require('filesystem-constants')
8const {
9 O_RDONLY,
10 O_WRONLY,
11 O_RDWR,
12 O_CREAT,
13 O_TRUNC,
14 O_APPEND,
15 O_SYNC,
16 O_EXCL,
17 O_ACCMODE
18} = linuxConstants
19
20class FileDescriptor {
21 constructor (drive, path, stat, contentState, readable, writable, appending, creating) {
22 this.drive = drive
23 this.stat = stat
24 this.path = path
25 this.contentState = contentState
26
27 this.readable = readable
28 this.writable = writable
29 this.creating = creating
30 this.appending = appending
31
32 this.position = null
33 this.blockPosition = stat ? stat.offset : null
34 this.blockOffset = 0
35
36 this._err = null
37 if (this.writable) {
38 if (this.appending) {
39 this._appendStream = this.drive.createReadStream(this.path)
40 this.position = this.stat.size
41 }
42 }
43
44 this._batcher = byteStream({ time: 100, limit: 4096 * 16 })
45
46 this._range = null
47 }
48
49 read (buffer, offset, len, pos, cb) {
50 if (!this.readable) return cb(new errors.BadFileDescriptor('File descriptor not open for reading.'))
51 this._readAcc(buffer, offset, len, pos, 0, cb)
52 }
53
54 _readAcc (buffer, offset, len, pos, totalRead, cb) {
55 if (this.position !== null && this.position === pos) this._read(buffer, offset, len, totalRead, cb)
56 else this._seekAndRead(buffer, offset, len, pos, totalRead, cb)
57 }
58
59 write (buffer, offset, len, pos, cb) {
60 if (!this.writable) return cb(new errors.BadFileDescriptor('File descriptor not open for writing.'))
61 if (!this.stat && !this.creating) {
62 return process.nextTick(cb, new errors.BadFileDescriptor('File descriptor not open in create mode.'))
63 }
64 if (this.position !== null && pos !== this.position) {
65 return process.nextTick(cb, new errors.BadFileDescriptor('Random-access writes are not currently supported.'))
66 }
67 if (this.appending && pos < this.stat.size) {
68 return process.nextTick(cb, new errors.BadFileDescriptor('Position cannot be less than the file size when appending.'))
69 }
70 if (!this._writeStream && !this.appending && pos) {
71 return process.nextTick(cb, new errors.BadFileDescriptor('Random-access writes are not currently supported.'))
72 }
73
74 const self = this
75 const slice = buffer.slice(offset, len)
76
77 if (!this._writeStream) {
78 this._writeStream = createWriteStream(this.drive, this.stat, this.path)
79 this.drive._writingFds.set(this.path, this)
80 }
81
82 this._writeStream.on('error', done)
83
84 // TODO: This is a temporary (bad) way of supporting appends.
85 if (this._appendStream) {
86 this.position = this.stat.size
87 // pump does not support the `end` option.
88 this._appendStream.pipe(this._writeStream, { end: false })
89
90 this._appendStream.on('error', err => this._writeStream.destroy(err))
91 this._writeStream.on('error', err => this._appendStream.destroy(err))
92
93 return this._appendStream.on('end', doWrite)
94 }
95
96 return doWrite()
97
98 function done (err) {
99 self._writeStream.removeListener('error', done)
100 self._writeStream.removeListener('drain', done)
101 if (err) return cb(err)
102 self.position += slice.length
103 self.stat.size += slice.length
104 return cb(null, slice.length, buffer)
105 }
106
107 function doWrite (err) {
108 self._appendStream = null
109 if (err) return cb(err)
110 if (self._err) return cb(self._err)
111 if (self._writeStream.destroyed) return cb(new errors.BadFileDescriptor('Write stream was destroyed.'))
112
113 if (self._writeStream.write(slice) === false) {
114 self._writeStream.once('drain', done)
115 } else {
116 process.nextTick(done)
117 }
118 }
119 }
120
121 truncate (size, cb) {
122 if (!this.writable) return cb(new errors.BadFileDescriptor('File descriptor is not writable'))
123 // TODO: Handle all the different truncation scenarios (wait for inode table).
124 if (size) return cb(new errors.InvalidArgument('Non-zero sizes are not currently supported in ftruncate.'))
125 this.drive._update(this.path, { size: 0, blocks: 0 }, (err, st) => {
126 if (err) return cb(err)
127 this.stat = st
128 return cb(null)
129 })
130 }
131
132 close (cb) {
133 // TODO: If we ever support multiple file descriptors for one path at one time, this will need updating.
134 if (this.writable) this.drive._writingFds.delete(this.path)
135 if (this._writeStream) {
136 if (this._writeStream.destroyed) {
137 this._writeStream = null
138 } else {
139 return this._writeStream.end(err => {
140 if (err) return cb(err)
141 this._writeStream = null
142 return cb(null)
143 })
144 }
145 }
146 if (this._range) {
147 this.contentState.feed.undownload(this._range)
148 // TODO: Ensure that the range being cancelled isn't being read by another operation.
149 this.contentState.feed.cancel(this._range.start, this._range.end)
150 this._range = null
151 }
152 process.nextTick(cb, null)
153 }
154
155 /**
156 * Will currently request until the end of the file linearly.
157 *
158 * TODO: This behavior should be more customizable in the future.
159 */
160 _refreshDownload (start, cb) {
161 // const end = Math.min(this.stat.blocks + this.stat.offset, start + 16)
162 const end = this.stat.offset + this.stat.blocks
163
164 if (this._range) {
165 this.contentState.feed.undownload(this._range)
166 }
167
168 this._range = this.contentState.feed.download({ start, end, linear: true }, cb || noop)
169 }
170
171 _seekAndRead (buffer, offset, len, pos, totalRead, cb) {
172 const start = this.stat.offset
173 const end = start + this.stat.blocks
174
175 this.contentState.feed.seek(this.stat.byteOffset + pos, { start, end }, (err, blk, blockOffset) => {
176 if (err) return cb(err)
177 this.position = pos
178 this.blockPosition = blk
179 this.blockOffset = blockOffset
180
181 this._refreshDownload(blk)
182 this._read(buffer, offset, len, totalRead, cb)
183 })
184 }
185
186 _read (buffer, offset, len, totalRead, cb) {
187 const self = this
188
189 const position = this.position
190 readNextBlock()
191
192 function readNextBlock () {
193 self._readBlock(buffer, offset + totalRead, Math.max(len - totalRead, 0), (err, bytesRead) => {
194 if (err) return cb(err)
195 if (!bytesRead) return cb(null, totalRead, buffer)
196
197 totalRead += bytesRead
198
199 if (totalRead < len) {
200 return self._readAcc(buffer, offset, len, position + bytesRead, totalRead, cb)
201 }
202 return cb(null, totalRead, buffer)
203 })
204 }
205 }
206
207 _readBlock (buffer, offset, len, cb) {
208 const buf = buffer.slice(offset, offset + len)
209 const blkOffset = this.blockOffset
210 const blk = this.blockPosition
211
212 if (this._range && (blk < this._range.start || blk > this._range.end)) {
213 this._refreshDownload(blk)
214 }
215
216 if ((this.stat.offset + this.stat.blocks) <= blk || blk < this.stat.offset) {
217 return process.nextTick(cb, null, 0, buffer)
218 }
219
220 this.contentState.feed.get(blk, (err, data) => {
221 if (err) return cb(err)
222 if (blkOffset) data = data.slice(blkOffset)
223
224 data.copy(buf)
225 const read = Math.min(data.length, buf.length)
226
227 if (blk === this.blockPosition && blkOffset === this.blockOffset) {
228 this.position += read
229 if (read === data.length) {
230 this.blockPosition++
231 this.blockOffset = 0
232 } else {
233 this.blockOffset = blkOffset + read
234 }
235 }
236
237 cb(null, read, buffer)
238 })
239 }
240}
241
242module.exports = function create (drive, name, flags, cb) {
243 try {
244 flags = parse(linuxConstants, flags)
245 } catch (err) {
246 return process.nextTick(cb, new errors.InvalidArgument(err.message))
247 }
248
249 const accmode = flags & O_ACCMODE
250 const writable = !!(accmode & (O_WRONLY | O_RDWR))
251 const readable = accmode === 0 || !!(accmode & O_RDWR)
252 const appending = !!(flags & O_APPEND)
253 const truncating = !!(flags & O_TRUNC)
254 const creating = !!(flags & O_CREAT)
255 const canExist = !(flags & O_EXCL)
256
257 drive.stat(name, { trie: true }, (err, st, trie) => {
258 if (err && (err.errno !== 2)) return cb(err)
259 if (st && !canExist) return cb(new errors.PathAlreadyExists(name))
260 if (!st && (!writable || !creating)) return cb(new errors.FileNotFound(name))
261
262 drive._getContent(trie.feed, (err, contentState) => {
263 if (err) return cb(err)
264 const fd = new FileDescriptor(drive, name, st, contentState, readable, writable, appending, creating)
265 if (!contentState.feed.writable && writable) return cb(new errors.InvalidArgument('Cannot open a writable fd on a read-only drive.'))
266 if (truncating) {
267 return drive._upsert(name, { size: 0, blocks: 0 }, (err, st) => {
268 if (err) return cb(err)
269 fd.stat = st
270 return cb(null, fd)
271 })
272 }
273 if (creating || (writable && !appending)) {
274 return drive.create(name, (err, st) => {
275 if (err) return cb(err)
276 fd.stat = st
277 return cb(null, fd)
278 })
279 } else {
280 return cb(null, fd)
281 }
282 })
283 })
284}
285
286function createWriteStream (drive, opts, path) {
287 const writeStream = drive.createWriteStream(path, opts)
288 const batcher = byteStream({ time: 100, limit: 4096 * 16 })
289 return pumpify(batcher, through.obj((chunk, enc, cb) => {
290 cb(null, Buffer.concat(chunk))
291 }), writeStream)
292}
293
294function noop () {}