1 | const byteStream = require('byte-stream')
|
2 | const through = require('through2')
|
3 | const pumpify = require('pumpify')
|
4 |
|
5 | const errors = require('./errors')
|
6 |
|
7 | const { linux: linuxConstants, parse } = require('filesystem-constants')
|
8 | const {
|
9 | O_RDONLY,
|
10 | O_WRONLY,
|
11 | O_RDWR,
|
12 | O_CREAT,
|
13 | O_TRUNC,
|
14 | O_APPEND,
|
15 | O_SYNC,
|
16 | O_EXCL,
|
17 | O_ACCMODE
|
18 | } = linuxConstants
|
19 |
|
20 | class FileDescriptor {
|
21 | constructor (drive, path, stat, contentState, readable, writable, appending, creating) {
|
22 | this.drive = drive
|
23 | this.stat = stat
|
24 | this.path = path
|
25 | this.contentState = contentState
|
26 |
|
27 | this.readable = readable
|
28 | this.writable = writable
|
29 | this.creating = creating
|
30 | this.appending = appending
|
31 |
|
32 | this.position = null
|
33 | this.blockPosition = stat ? stat.offset : null
|
34 | this.blockOffset = 0
|
35 |
|
36 | this._err = null
|
37 | if (this.writable) {
|
38 | if (this.appending) {
|
39 | this._appendStream = this.drive.createReadStream(this.path)
|
40 | this.position = this.stat.size
|
41 | }
|
42 | }
|
43 |
|
44 | this._batcher = byteStream({ time: 100, limit: 4096 * 16 })
|
45 |
|
46 | this._range = null
|
47 | }
|
48 |
|
49 | read (buffer, offset, len, pos, cb) {
|
50 | if (!this.readable) return cb(new errors.BadFileDescriptor('File descriptor not open for reading.'))
|
51 | this._readAcc(buffer, offset, len, pos, 0, cb)
|
52 | }
|
53 |
|
54 | _readAcc (buffer, offset, len, pos, totalRead, cb) {
|
55 | if (this.position !== null && this.position === pos) this._read(buffer, offset, len, totalRead, cb)
|
56 | else this._seekAndRead(buffer, offset, len, pos, totalRead, cb)
|
57 | }
|
58 |
|
59 | write (buffer, offset, len, pos, cb) {
|
60 | if (!this.writable) return cb(new errors.BadFileDescriptor('File descriptor not open for writing.'))
|
61 | if (!this.stat && !this.creating) {
|
62 | return process.nextTick(cb, new errors.BadFileDescriptor('File descriptor not open in create mode.'))
|
63 | }
|
64 | if (this.position !== null && pos !== this.position) {
|
65 | return process.nextTick(cb, new errors.BadFileDescriptor('Random-access writes are not currently supported.'))
|
66 | }
|
67 | if (this.appending && pos < this.stat.size) {
|
68 | return process.nextTick(cb, new errors.BadFileDescriptor('Position cannot be less than the file size when appending.'))
|
69 | }
|
70 | if (!this._writeStream && !this.appending && pos) {
|
71 | return process.nextTick(cb, new errors.BadFileDescriptor('Random-access writes are not currently supported.'))
|
72 | }
|
73 |
|
74 | const self = this
|
75 | const slice = buffer.slice(offset, len)
|
76 |
|
77 | if (!this._writeStream) {
|
78 | this._writeStream = createWriteStream(this.drive, this.stat, this.path)
|
79 | this.drive._writingFds.set(this.path, this)
|
80 | }
|
81 |
|
82 | this._writeStream.on('error', done)
|
83 |
|
84 |
|
85 | if (this._appendStream) {
|
86 | this.position = this.stat.size
|
87 |
|
88 | this._appendStream.pipe(this._writeStream, { end: false })
|
89 |
|
90 | this._appendStream.on('error', err => this._writeStream.destroy(err))
|
91 | this._writeStream.on('error', err => this._appendStream.destroy(err))
|
92 |
|
93 | return this._appendStream.on('end', doWrite)
|
94 | }
|
95 |
|
96 | return doWrite()
|
97 |
|
98 | function done (err) {
|
99 | self._writeStream.removeListener('error', done)
|
100 | self._writeStream.removeListener('drain', done)
|
101 | if (err) return cb(err)
|
102 | self.position += slice.length
|
103 | self.stat.size += slice.length
|
104 | return cb(null, slice.length, buffer)
|
105 | }
|
106 |
|
107 | function doWrite (err) {
|
108 | self._appendStream = null
|
109 | if (err) return cb(err)
|
110 | if (self._err) return cb(self._err)
|
111 | if (self._writeStream.destroyed) return cb(new errors.BadFileDescriptor('Write stream was destroyed.'))
|
112 |
|
113 | if (self._writeStream.write(slice) === false) {
|
114 | self._writeStream.once('drain', done)
|
115 | } else {
|
116 | process.nextTick(done)
|
117 | }
|
118 | }
|
119 | }
|
120 |
|
121 | truncate (size, cb) {
|
122 | if (!this.writable) return cb(new errors.BadFileDescriptor('File descriptor is not writable'))
|
123 |
|
124 | if (size) return cb(new errors.InvalidArgument('Non-zero sizes are not currently supported in ftruncate.'))
|
125 | this.drive._update(this.path, { size: 0, blocks: 0 }, (err, st) => {
|
126 | if (err) return cb(err)
|
127 | this.stat = st
|
128 | return cb(null)
|
129 | })
|
130 | }
|
131 |
|
132 | close (cb) {
|
133 |
|
134 | if (this.writable) this.drive._writingFds.delete(this.path)
|
135 | if (this._writeStream) {
|
136 | if (this._writeStream.destroyed) {
|
137 | this._writeStream = null
|
138 | } else {
|
139 | return this._writeStream.end(err => {
|
140 | if (err) return cb(err)
|
141 | this._writeStream = null
|
142 | return cb(null)
|
143 | })
|
144 | }
|
145 | }
|
146 | if (this._range) {
|
147 | this.contentState.feed.undownload(this._range)
|
148 |
|
149 | this.contentState.feed.cancel(this._range.start, this._range.end)
|
150 | this._range = null
|
151 | }
|
152 | process.nextTick(cb, null)
|
153 | }
|
154 |
|
155 | |
156 |
|
157 |
|
158 |
|
159 |
|
160 | _refreshDownload (start, cb) {
|
161 |
|
162 | const end = this.stat.offset + this.stat.blocks
|
163 |
|
164 | if (this._range) {
|
165 | this.contentState.feed.undownload(this._range)
|
166 | }
|
167 |
|
168 | this._range = this.contentState.feed.download({ start, end, linear: true }, cb || noop)
|
169 | }
|
170 |
|
171 | _seekAndRead (buffer, offset, len, pos, totalRead, cb) {
|
172 | const start = this.stat.offset
|
173 | const end = start + this.stat.blocks
|
174 |
|
175 | this.contentState.feed.seek(this.stat.byteOffset + pos, { start, end }, (err, blk, blockOffset) => {
|
176 | if (err) return cb(err)
|
177 | this.position = pos
|
178 | this.blockPosition = blk
|
179 | this.blockOffset = blockOffset
|
180 |
|
181 | this._refreshDownload(blk)
|
182 | this._read(buffer, offset, len, totalRead, cb)
|
183 | })
|
184 | }
|
185 |
|
186 | _read (buffer, offset, len, totalRead, cb) {
|
187 | const self = this
|
188 |
|
189 | const position = this.position
|
190 | readNextBlock()
|
191 |
|
192 | function readNextBlock () {
|
193 | self._readBlock(buffer, offset + totalRead, Math.max(len - totalRead, 0), (err, bytesRead) => {
|
194 | if (err) return cb(err)
|
195 | if (!bytesRead) return cb(null, totalRead, buffer)
|
196 |
|
197 | totalRead += bytesRead
|
198 |
|
199 | if (totalRead < len) {
|
200 | return self._readAcc(buffer, offset, len, position + bytesRead, totalRead, cb)
|
201 | }
|
202 | return cb(null, totalRead, buffer)
|
203 | })
|
204 | }
|
205 | }
|
206 |
|
207 | _readBlock (buffer, offset, len, cb) {
|
208 | const buf = buffer.slice(offset, offset + len)
|
209 | const blkOffset = this.blockOffset
|
210 | const blk = this.blockPosition
|
211 |
|
212 | if (this._range && (blk < this._range.start || blk > this._range.end)) {
|
213 | this._refreshDownload(blk)
|
214 | }
|
215 |
|
216 | if ((this.stat.offset + this.stat.blocks) <= blk || blk < this.stat.offset) {
|
217 | return process.nextTick(cb, null, 0, buffer)
|
218 | }
|
219 |
|
220 | this.contentState.feed.get(blk, (err, data) => {
|
221 | if (err) return cb(err)
|
222 | if (blkOffset) data = data.slice(blkOffset)
|
223 |
|
224 | data.copy(buf)
|
225 | const read = Math.min(data.length, buf.length)
|
226 |
|
227 | if (blk === this.blockPosition && blkOffset === this.blockOffset) {
|
228 | this.position += read
|
229 | if (read === data.length) {
|
230 | this.blockPosition++
|
231 | this.blockOffset = 0
|
232 | } else {
|
233 | this.blockOffset = blkOffset + read
|
234 | }
|
235 | }
|
236 |
|
237 | cb(null, read, buffer)
|
238 | })
|
239 | }
|
240 | }
|
241 |
|
242 | module.exports = function create (drive, name, flags, cb) {
|
243 | try {
|
244 | flags = parse(linuxConstants, flags)
|
245 | } catch (err) {
|
246 | return process.nextTick(cb, new errors.InvalidArgument(err.message))
|
247 | }
|
248 |
|
249 | const accmode = flags & O_ACCMODE
|
250 | const writable = !!(accmode & (O_WRONLY | O_RDWR))
|
251 | const readable = accmode === 0 || !!(accmode & O_RDWR)
|
252 | const appending = !!(flags & O_APPEND)
|
253 | const truncating = !!(flags & O_TRUNC)
|
254 | const creating = !!(flags & O_CREAT)
|
255 | const canExist = !(flags & O_EXCL)
|
256 |
|
257 | drive.stat(name, { trie: true }, (err, st, trie) => {
|
258 | if (err && (err.errno !== 2)) return cb(err)
|
259 | if (st && !canExist) return cb(new errors.PathAlreadyExists(name))
|
260 | if (!st && (!writable || !creating)) return cb(new errors.FileNotFound(name))
|
261 |
|
262 | drive._getContent(trie.feed, (err, contentState) => {
|
263 | if (err) return cb(err)
|
264 | const fd = new FileDescriptor(drive, name, st, contentState, readable, writable, appending, creating)
|
265 | if (!contentState.feed.writable && writable) return cb(new errors.InvalidArgument('Cannot open a writable fd on a read-only drive.'))
|
266 | if (truncating) {
|
267 | return drive._upsert(name, { size: 0, blocks: 0 }, (err, st) => {
|
268 | if (err) return cb(err)
|
269 | fd.stat = st
|
270 | return cb(null, fd)
|
271 | })
|
272 | }
|
273 | if (creating || (writable && !appending)) {
|
274 | return drive.create(name, (err, st) => {
|
275 | if (err) return cb(err)
|
276 | fd.stat = st
|
277 | return cb(null, fd)
|
278 | })
|
279 | } else {
|
280 | return cb(null, fd)
|
281 | }
|
282 | })
|
283 | })
|
284 | }
|
285 |
|
286 | function createWriteStream (drive, opts, path) {
|
287 | const writeStream = drive.createWriteStream(path, opts)
|
288 | const batcher = byteStream({ time: 100, limit: 4096 * 16 })
|
289 | return pumpify(batcher, through.obj((chunk, enc, cb) => {
|
290 | cb(null, Buffer.concat(chunk))
|
291 | }), writeStream)
|
292 | }
|
293 |
|
294 | function noop () {}
|