1 |
|
2 | import fs from 'fs'
|
3 | import os from 'os'
|
4 | import parallel from 'run-parallel'
|
5 | import path from 'path'
|
6 | import queueMicrotask from 'queue-microtask'
|
7 | import RAF from 'random-access-file'
|
8 | import randombytes from 'randombytes'
|
9 | import thunky from 'thunky'
|
10 | import getFileRegex from 'filename-reserved-regex'
|
11 |
|
12 | const RESERVED_FILENAME_REGEX = getFileRegex()
|
13 |
|
14 | let TMP
|
15 | try {
|
16 | TMP = fs.statSync('/tmp') && '/tmp'
|
17 | } catch (err) {
|
18 | TMP = os.tmpdir()
|
19 | }
|
20 |
|
21 | export default class Storage {
|
22 | constructor (chunkLength, opts = {}) {
|
23 | this.chunkLength = Number(chunkLength)
|
24 | if (!this.chunkLength) throw new Error('First argument must be a chunk length')
|
25 | this.name = opts.name || path.join('fs-chunk-store', randombytes(20).toString('hex'))
|
26 | this.addUID = opts.addUID
|
27 |
|
28 | if (opts.files) {
|
29 | this.path = opts.path
|
30 | if (!Array.isArray(opts.files)) {
|
31 | throw new Error('`files` option must be an array')
|
32 | }
|
33 | this.files = opts.files.map((file, i, files) => {
|
34 | if (file.path == null) throw new Error('File is missing `path` property')
|
35 | if (file.length == null) throw new Error('File is missing `length` property')
|
36 | if (file.offset == null) {
|
37 | if (i === 0) {
|
38 | file.offset = 0
|
39 | } else {
|
40 | const prevFile = files[i - 1]
|
41 | file.offset = prevFile.offset + prevFile.length
|
42 | }
|
43 | }
|
44 | let newPath = path.dirname(file.path)
|
45 | const filename = path.basename(file.path)
|
46 | if (this.path) {
|
47 | newPath = this.addUID ? path.resolve(path.join(this.path, this.name, newPath)) : path.resolve(path.join(this.path, newPath))
|
48 | }
|
49 | newPath = path.join(newPath, filename.replace(RESERVED_FILENAME_REGEX, ''))
|
50 | return { path: newPath, length: file.length, offset: file.offset }
|
51 | })
|
52 | this.length = this.files.reduce((sum, file) => { return sum + file.length }, 0)
|
53 | if (opts.length != null && opts.length !== this.length) {
|
54 | throw new Error('total `files` length is not equal to explicit `length` option')
|
55 | }
|
56 | } else {
|
57 | const len = Number(opts.length) || Infinity
|
58 | this.files = [{
|
59 | offset: 0,
|
60 | path: path.resolve(opts.path || path.join(TMP, this.name)),
|
61 | length: len
|
62 | }]
|
63 | this.length = len
|
64 | }
|
65 |
|
66 | this.chunkMap = []
|
67 | this.closed = false
|
68 |
|
69 | this.files.forEach(file => {
|
70 | file.open = thunky(cb => {
|
71 | if (this.closed) return cb(new Error('Storage is closed'))
|
72 | fs.mkdir(path.dirname(file.path), { recursive: true }, err => {
|
73 | if (err) return cb(err)
|
74 | if (this.closed) return cb(new Error('Storage is closed'))
|
75 | cb(null, new RAF(file.path))
|
76 | })
|
77 | })
|
78 | })
|
79 |
|
80 |
|
81 |
|
82 |
|
83 | if (this.length !== Infinity) {
|
84 | this.lastChunkLength = (this.length % this.chunkLength) || this.chunkLength
|
85 | this.lastChunkIndex = Math.ceil(this.length / this.chunkLength) - 1
|
86 |
|
87 | this.files.forEach(file => {
|
88 | const fileStart = file.offset
|
89 | const fileEnd = file.offset + file.length
|
90 |
|
91 | const firstChunk = Math.floor(fileStart / this.chunkLength)
|
92 | const lastChunk = Math.floor((fileEnd - 1) / this.chunkLength)
|
93 |
|
94 | for (let p = firstChunk; p <= lastChunk; ++p) {
|
95 | const chunkStart = p * this.chunkLength
|
96 | const chunkEnd = chunkStart + this.chunkLength
|
97 |
|
98 | const from = (fileStart < chunkStart) ? 0 : fileStart - chunkStart
|
99 | const to = (fileEnd > chunkEnd) ? this.chunkLength : fileEnd - chunkStart
|
100 | const offset = (fileStart > chunkStart) ? 0 : chunkStart - fileStart
|
101 |
|
102 | if (!this.chunkMap[p]) this.chunkMap[p] = []
|
103 |
|
104 | this.chunkMap[p].push({
|
105 | from,
|
106 | to,
|
107 | offset,
|
108 | file
|
109 | })
|
110 | }
|
111 | })
|
112 | }
|
113 | }
|
114 |
|
115 | put (index, buf, cb) {
|
116 | if (typeof cb !== 'function') cb = noop
|
117 | if (this.closed) return nextTick(cb, new Error('Storage is closed'))
|
118 |
|
119 | const isLastChunk = (index === this.lastChunkIndex)
|
120 | if (isLastChunk && buf.length !== this.lastChunkLength) {
|
121 | return nextTick(cb, new Error('Last chunk length must be ' + this.lastChunkLength))
|
122 | }
|
123 | if (!isLastChunk && buf.length !== this.chunkLength) {
|
124 | return nextTick(cb, new Error('Chunk length must be ' + this.chunkLength))
|
125 | }
|
126 |
|
127 | if (this.length === Infinity) {
|
128 | this.files[0].open((err, file) => {
|
129 | if (err) return cb(err)
|
130 | file.write(index * this.chunkLength, buf, cb)
|
131 | })
|
132 | } else {
|
133 | const targets = this.chunkMap[index]
|
134 | if (!targets) return nextTick(cb, new Error('no files matching the request range'))
|
135 | const tasks = targets.map((target) => {
|
136 | return (cb) => {
|
137 | target.file.open((err, file) => {
|
138 | if (err) return cb(err)
|
139 | file.write(target.offset, buf.slice(target.from, target.to), cb)
|
140 | })
|
141 | }
|
142 | })
|
143 | parallel(tasks, cb)
|
144 | }
|
145 | }
|
146 |
|
147 | get (index, opts, cb) {
|
148 | if (typeof opts === 'function') return this.get(index, null, opts)
|
149 | if (this.closed) return nextTick(cb, new Error('Storage is closed'))
|
150 |
|
151 | const chunkLength = (index === this.lastChunkIndex)
|
152 | ? this.lastChunkLength
|
153 | : this.chunkLength
|
154 |
|
155 | const rangeFrom = (opts && opts.offset) || 0
|
156 | const rangeTo = (opts && opts.length) ? rangeFrom + opts.length : chunkLength
|
157 |
|
158 | if (rangeFrom < 0 || rangeFrom < 0 || rangeTo > chunkLength) {
|
159 | return nextTick(cb, new Error('Invalid offset and/or length'))
|
160 | }
|
161 |
|
162 | if (this.length === Infinity) {
|
163 | if (rangeFrom === rangeTo) return nextTick(cb, null, Buffer.from(0))
|
164 | this.files[0].open((err, file) => {
|
165 | if (err) return cb(err)
|
166 | const offset = (index * this.chunkLength) + rangeFrom
|
167 | file.read(offset, rangeTo - rangeFrom, cb)
|
168 | })
|
169 | } else {
|
170 | let targets = this.chunkMap[index]
|
171 | if (!targets) return nextTick(cb, new Error('no files matching the request range'))
|
172 | if (opts) {
|
173 | targets = targets.filter((target) => {
|
174 | return target.to > rangeFrom && target.from < rangeTo
|
175 | })
|
176 | if (targets.length === 0) {
|
177 | return nextTick(cb, new Error('no files matching the requested range'))
|
178 | }
|
179 | }
|
180 | if (rangeFrom === rangeTo) return nextTick(cb, null, Buffer.from(0))
|
181 |
|
182 | const tasks = targets.map((target) => {
|
183 | return (cb) => {
|
184 | let from = target.from
|
185 | let to = target.to
|
186 | let offset = target.offset
|
187 |
|
188 | if (opts) {
|
189 | if (to > rangeTo) to = rangeTo
|
190 | if (from < rangeFrom) {
|
191 | offset += (rangeFrom - from)
|
192 | from = rangeFrom
|
193 | }
|
194 | }
|
195 |
|
196 | target.file.open((err, file) => {
|
197 | if (err) return cb(err)
|
198 | file.read(offset, to - from, cb)
|
199 | })
|
200 | }
|
201 | })
|
202 |
|
203 | parallel(tasks, (err, buffers) => {
|
204 | if (err) return cb(err)
|
205 | cb(null, Buffer.concat(buffers))
|
206 | })
|
207 | }
|
208 | }
|
209 |
|
210 | close (cb) {
|
211 | if (this.closed) return nextTick(cb, new Error('Storage is closed'))
|
212 | this.closed = true
|
213 |
|
214 | const tasks = this.files.map((file) => {
|
215 | return (cb) => {
|
216 | file.open((err, file) => {
|
217 |
|
218 | if (err) return cb(null)
|
219 | file.close(cb)
|
220 | })
|
221 | }
|
222 | })
|
223 | parallel(tasks, cb)
|
224 | }
|
225 |
|
226 | destroy (cb) {
|
227 | this.close(() => {
|
228 | if (this.addUID && this.path) {
|
229 | fs.rm(path.resolve(path.join(this.path, this.name)), { recursive: true, maxBusyTries: 10 }, cb)
|
230 | } else {
|
231 | const tasks = this.files.map((file) => {
|
232 | return (cb) => {
|
233 | fs.rm(file.path, { recursive: true, maxRetries: 10 }, err => {
|
234 | err && err.code === 'ENOENT' ? cb() : cb(err)
|
235 | })
|
236 | }
|
237 | })
|
238 | parallel(tasks, cb)
|
239 | }
|
240 | })
|
241 | }
|
242 | }
|
243 |
|
244 | function nextTick (cb, err, val) {
|
245 | queueMicrotask(() => {
|
246 | if (cb) cb(err, val)
|
247 | })
|
248 | }
|
249 |
|
250 | function noop () {}
|