1 | 'use strict'
|
2 | const MiniPass = require('minipass')
|
3 | const EE = require('events').EventEmitter
|
4 | const fs = require('fs')
|
5 |
|
6 |
|
7 | const binding = process.binding('fs')
|
8 | const writeBuffers = binding.writeBuffers
|
9 |
|
10 | const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
|
11 |
|
12 | const _autoClose = Symbol('_autoClose')
|
13 | const _close = Symbol('_close')
|
14 | const _ended = Symbol('_ended')
|
15 | const _fd = Symbol('_fd')
|
16 | const _finished = Symbol('_finished')
|
17 | const _flags = Symbol('_flags')
|
18 | const _flush = Symbol('_flush')
|
19 | const _handleChunk = Symbol('_handleChunk')
|
20 | const _makeBuf = Symbol('_makeBuf')
|
21 | const _mode = Symbol('_mode')
|
22 | const _needDrain = Symbol('_needDrain')
|
23 | const _onerror = Symbol('_onerror')
|
24 | const _onopen = Symbol('_onopen')
|
25 | const _onread = Symbol('_onread')
|
26 | const _onwrite = Symbol('_onwrite')
|
27 | const _open = Symbol('_open')
|
28 | const _path = Symbol('_path')
|
29 | const _pos = Symbol('_pos')
|
30 | const _queue = Symbol('_queue')
|
31 | const _read = Symbol('_read')
|
32 | const _readSize = Symbol('_readSize')
|
33 | const _reading = Symbol('_reading')
|
34 | const _remain = Symbol('_remain')
|
35 | const _size = Symbol('_size')
|
36 | const _write = Symbol('_write')
|
37 | const _writing = Symbol('_writing')
|
38 | const _defaultFlag = Symbol('_defaultFlag')
|
39 |
|
40 | class ReadStream extends MiniPass {
|
41 | constructor (path, opt) {
|
42 | opt = opt || {}
|
43 | super(opt)
|
44 |
|
45 | this.writable = false
|
46 |
|
47 | if (typeof path !== 'string')
|
48 | throw new TypeError('path must be a string')
|
49 |
|
50 | this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
51 | this[_path] = path
|
52 | this[_readSize] = opt.readSize || 16*1024*1024
|
53 | this[_reading] = false
|
54 | this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
|
55 | this[_remain] = this[_size]
|
56 | this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
57 | opt.autoClose : true
|
58 |
|
59 | if (typeof this[_fd] === 'number')
|
60 | this[_read]()
|
61 | else
|
62 | this[_open]()
|
63 | }
|
64 |
|
65 | get fd () { return this[_fd] }
|
66 | get path () { return this[_path] }
|
67 |
|
68 | write () {
|
69 | throw new TypeError('this is a readable stream')
|
70 | }
|
71 |
|
72 | end () {
|
73 | throw new TypeError('this is a readable stream')
|
74 | }
|
75 |
|
76 | [_open] () {
|
77 | fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
|
78 | }
|
79 |
|
80 | [_onopen] (er, fd) {
|
81 | if (er)
|
82 | this[_onerror](er)
|
83 | else {
|
84 | this[_fd] = fd
|
85 | this.emit('open', fd)
|
86 | this[_read]()
|
87 | }
|
88 | }
|
89 |
|
90 | [_makeBuf] () {
|
91 | return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
|
92 | }
|
93 |
|
94 | [_read] () {
|
95 | if (!this[_reading]) {
|
96 | this[_reading] = true
|
97 | const buf = this[_makeBuf]()
|
98 |
|
99 | if (buf.length === 0) return process.nextTick(() => this[_onread](null, 0, buf))
|
100 | fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
|
101 | this[_onread](er, br, buf))
|
102 | }
|
103 | }
|
104 |
|
105 | [_onread] (er, br, buf) {
|
106 | this[_reading] = false
|
107 | if (er)
|
108 | this[_onerror](er)
|
109 | else if (this[_handleChunk](br, buf))
|
110 | this[_read]()
|
111 | }
|
112 |
|
113 | [_close] () {
|
114 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
115 | fs.close(this[_fd], _ => this.emit('close'))
|
116 | this[_fd] = null
|
117 | }
|
118 | }
|
119 |
|
120 | [_onerror] (er) {
|
121 | this[_reading] = true
|
122 | this[_close]()
|
123 | this.emit('error', er)
|
124 | }
|
125 |
|
126 | [_handleChunk] (br, buf) {
|
127 | let ret = false
|
128 |
|
129 | this[_remain] -= br
|
130 | if (br > 0)
|
131 | ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
|
132 |
|
133 | if (br === 0 || this[_remain] <= 0) {
|
134 | ret = false
|
135 | this[_close]()
|
136 | super.end()
|
137 | }
|
138 |
|
139 | return ret
|
140 | }
|
141 |
|
142 | emit (ev, data) {
|
143 | switch (ev) {
|
144 | case 'prefinish':
|
145 | case 'finish':
|
146 | break
|
147 |
|
148 | case 'drain':
|
149 | if (typeof this[_fd] === 'number')
|
150 | this[_read]()
|
151 | break
|
152 |
|
153 | default:
|
154 | return super.emit(ev, data)
|
155 | }
|
156 | }
|
157 | }
|
158 |
|
159 | class ReadStreamSync extends ReadStream {
|
160 | [_open] () {
|
161 | let threw = true
|
162 | try {
|
163 | this[_onopen](null, fs.openSync(this[_path], 'r'))
|
164 | threw = false
|
165 | } finally {
|
166 | if (threw)
|
167 | this[_close]()
|
168 | }
|
169 | }
|
170 |
|
171 | [_read] () {
|
172 | let threw = true
|
173 | try {
|
174 | if (!this[_reading]) {
|
175 | this[_reading] = true
|
176 | do {
|
177 | const buf = this[_makeBuf]()
|
178 |
|
179 | const br = buf.length === 0 ? 0 : fs.readSync(this[_fd], buf, 0, buf.length, null)
|
180 | if (!this[_handleChunk](br, buf))
|
181 | break
|
182 | } while (true)
|
183 | this[_reading] = false
|
184 | }
|
185 | threw = false
|
186 | } finally {
|
187 | if (threw)
|
188 | this[_close]()
|
189 | }
|
190 | }
|
191 |
|
192 | [_close] () {
|
193 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
194 | try {
|
195 | fs.closeSync(this[_fd])
|
196 | } catch (er) {}
|
197 | this[_fd] = null
|
198 | this.emit('close')
|
199 | }
|
200 | }
|
201 | }
|
202 |
|
203 | class WriteStream extends EE {
|
204 | constructor (path, opt) {
|
205 | opt = opt || {}
|
206 | super(opt)
|
207 | this.readable = false
|
208 | this[_writing] = false
|
209 | this[_ended] = false
|
210 | this[_needDrain] = false
|
211 | this[_queue] = []
|
212 | this[_path] = path
|
213 | this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
214 | this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
|
215 | this[_pos] = typeof opt.start === 'number' ? opt.start : null
|
216 | this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
217 | opt.autoClose : true
|
218 |
|
219 |
|
220 | const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
|
221 | this[_defaultFlag] = opt.flags === undefined
|
222 | this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
|
223 |
|
224 | if (this[_fd] === null)
|
225 | this[_open]()
|
226 | }
|
227 |
|
228 | get fd () { return this[_fd] }
|
229 | get path () { return this[_path] }
|
230 |
|
231 | [_onerror] (er) {
|
232 | this[_close]()
|
233 | this[_writing] = true
|
234 | this.emit('error', er)
|
235 | }
|
236 |
|
237 | [_open] () {
|
238 | fs.open(this[_path], this[_flags], this[_mode],
|
239 | (er, fd) => this[_onopen](er, fd))
|
240 | }
|
241 |
|
242 | [_onopen] (er, fd) {
|
243 | if (this[_defaultFlag] &&
|
244 | this[_flags] === 'r+' &&
|
245 | er && er.code === 'ENOENT') {
|
246 | this[_flags] = 'w'
|
247 | this[_open]()
|
248 | } else if (er)
|
249 | this[_onerror](er)
|
250 | else {
|
251 | this[_fd] = fd
|
252 | this.emit('open', fd)
|
253 | this[_flush]()
|
254 | }
|
255 | }
|
256 |
|
257 | end (buf, enc) {
|
258 | if (buf)
|
259 | this.write(buf, enc)
|
260 |
|
261 | this[_ended] = true
|
262 |
|
263 |
|
264 | if (!this[_writing] && !this[_queue].length &&
|
265 | typeof this[_fd] === 'number')
|
266 | this[_onwrite](null, 0)
|
267 | }
|
268 |
|
269 | write (buf, enc) {
|
270 | if (typeof buf === 'string')
|
271 | buf = new Buffer(buf, enc)
|
272 |
|
273 | if (this[_ended]) {
|
274 | this.emit('error', new Error('write() after end()'))
|
275 | return false
|
276 | }
|
277 |
|
278 | if (this[_fd] === null || this[_writing] || this[_queue].length) {
|
279 | this[_queue].push(buf)
|
280 | this[_needDrain] = true
|
281 | return false
|
282 | }
|
283 |
|
284 | this[_writing] = true
|
285 | this[_write](buf)
|
286 | return true
|
287 | }
|
288 |
|
289 | [_write] (buf) {
|
290 | fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
|
291 | this[_onwrite](er, bw))
|
292 | }
|
293 |
|
294 | [_onwrite] (er, bw) {
|
295 | if (er)
|
296 | this[_onerror](er)
|
297 | else {
|
298 | if (this[_pos] !== null)
|
299 | this[_pos] += bw
|
300 | if (this[_queue].length)
|
301 | this[_flush]()
|
302 | else {
|
303 | this[_writing] = false
|
304 |
|
305 | if (this[_ended] && !this[_finished]) {
|
306 | this[_finished] = true
|
307 | this[_close]()
|
308 | this.emit('finish')
|
309 | } else if (this[_needDrain]) {
|
310 | this[_needDrain] = false
|
311 | this.emit('drain')
|
312 | }
|
313 | }
|
314 | }
|
315 | }
|
316 |
|
317 | [_flush] () {
|
318 | if (this[_queue].length === 0) {
|
319 | if (this[_ended])
|
320 | this[_onwrite](null, 0)
|
321 | } else if (this[_queue].length === 1)
|
322 | this[_write](this[_queue].pop())
|
323 | else {
|
324 | const iovec = this[_queue]
|
325 | this[_queue] = []
|
326 | writev(this[_fd], iovec, this[_pos],
|
327 | (er, bw) => this[_onwrite](er, bw))
|
328 | }
|
329 | }
|
330 |
|
331 | [_close] () {
|
332 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
333 | fs.close(this[_fd], _ => this.emit('close'))
|
334 | this[_fd] = null
|
335 | }
|
336 | }
|
337 | }
|
338 |
|
339 | class WriteStreamSync extends WriteStream {
|
340 | [_open] () {
|
341 | let fd
|
342 | try {
|
343 | fd = fs.openSync(this[_path], this[_flags], this[_mode])
|
344 | } catch (er) {
|
345 | if (this[_defaultFlag] &&
|
346 | this[_flags] === 'r+' &&
|
347 | er && er.code === 'ENOENT') {
|
348 | this[_flags] = 'w'
|
349 | return this[_open]()
|
350 | } else
|
351 | throw er
|
352 | }
|
353 | this[_onopen](null, fd)
|
354 | }
|
355 |
|
356 | [_close] () {
|
357 | if (this[_autoClose] && typeof this[_fd] === 'number') {
|
358 | try {
|
359 | fs.closeSync(this[_fd])
|
360 | } catch (er) {}
|
361 | this[_fd] = null
|
362 | this.emit('close')
|
363 | }
|
364 | }
|
365 |
|
366 | [_write] (buf) {
|
367 | try {
|
368 | this[_onwrite](null,
|
369 | fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
|
370 | } catch (er) {
|
371 | this[_onwrite](er, 0)
|
372 | }
|
373 | }
|
374 | }
|
375 |
|
376 | const writev = (fd, iovec, pos, cb) => {
|
377 | const done = (er, bw) => cb(er, bw, iovec)
|
378 | const req = new FSReqWrap()
|
379 | req.oncomplete = done
|
380 | binding.writeBuffers(fd, iovec, pos, req)
|
381 | }
|
382 |
|
383 | exports.ReadStream = ReadStream
|
384 | exports.ReadStreamSync = ReadStreamSync
|
385 |
|
386 | exports.WriteStream = WriteStream
|
387 | exports.WriteStreamSync = WriteStreamSync
|