UNPKG

9.02 kBJavaScriptView Raw
1'use strict'
2const MiniPass = require('minipass')
3const EE = require('events').EventEmitter
4const fs = require('fs')
5
6// for writev
7const binding = process.binding('fs')
8const writeBuffers = binding.writeBuffers
9/* istanbul ignore next */
10const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
11
12const _autoClose = Symbol('_autoClose')
13const _close = Symbol('_close')
14const _ended = Symbol('_ended')
15const _fd = Symbol('_fd')
16const _finished = Symbol('_finished')
17const _flags = Symbol('_flags')
18const _flush = Symbol('_flush')
19const _handleChunk = Symbol('_handleChunk')
20const _makeBuf = Symbol('_makeBuf')
21const _mode = Symbol('_mode')
22const _needDrain = Symbol('_needDrain')
23const _onerror = Symbol('_onerror')
24const _onopen = Symbol('_onopen')
25const _onread = Symbol('_onread')
26const _onwrite = Symbol('_onwrite')
27const _open = Symbol('_open')
28const _path = Symbol('_path')
29const _pos = Symbol('_pos')
30const _queue = Symbol('_queue')
31const _read = Symbol('_read')
32const _readSize = Symbol('_readSize')
33const _reading = Symbol('_reading')
34const _remain = Symbol('_remain')
35const _size = Symbol('_size')
36const _write = Symbol('_write')
37const _writing = Symbol('_writing')
38const _defaultFlag = Symbol('_defaultFlag')
39
40class ReadStream extends MiniPass {
41 constructor (path, opt) {
42 opt = opt || {}
43 super(opt)
44
45 this.writable = false
46
47 if (typeof path !== 'string')
48 throw new TypeError('path must be a string')
49
50 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
51 this[_path] = path
52 this[_readSize] = opt.readSize || 16*1024*1024
53 this[_reading] = false
54 this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
55 this[_remain] = this[_size]
56 this[_autoClose] = typeof opt.autoClose === 'boolean' ?
57 opt.autoClose : true
58
59 if (typeof this[_fd] === 'number')
60 this[_read]()
61 else
62 this[_open]()
63 }
64
65 get fd () { return this[_fd] }
66 get path () { return this[_path] }
67
68 write () {
69 throw new TypeError('this is a readable stream')
70 }
71
72 end () {
73 throw new TypeError('this is a readable stream')
74 }
75
76 [_open] () {
77 fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
78 }
79
80 [_onopen] (er, fd) {
81 if (er)
82 this[_onerror](er)
83 else {
84 this[_fd] = fd
85 this.emit('open', fd)
86 this[_read]()
87 }
88 }
89
90 [_makeBuf] () {
91 return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
92 }
93
94 [_read] () {
95 if (!this[_reading]) {
96 this[_reading] = true
97 const buf = this[_makeBuf]()
98 /* istanbul ignore if */
99 if (buf.length === 0) return process.nextTick(() => this[_onread](null, 0, buf))
100 fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
101 this[_onread](er, br, buf))
102 }
103 }
104
105 [_onread] (er, br, buf) {
106 this[_reading] = false
107 if (er)
108 this[_onerror](er)
109 else if (this[_handleChunk](br, buf))
110 this[_read]()
111 }
112
113 [_close] () {
114 if (this[_autoClose] && typeof this[_fd] === 'number') {
115 fs.close(this[_fd], _ => this.emit('close'))
116 this[_fd] = null
117 }
118 }
119
120 [_onerror] (er) {
121 this[_reading] = true
122 this[_close]()
123 this.emit('error', er)
124 }
125
126 [_handleChunk] (br, buf) {
127 let ret = false
128 // no effect if infinite
129 this[_remain] -= br
130 if (br > 0)
131 ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
132
133 if (br === 0 || this[_remain] <= 0) {
134 ret = false
135 this[_close]()
136 super.end()
137 }
138
139 return ret
140 }
141
142 emit (ev, data) {
143 switch (ev) {
144 case 'prefinish':
145 case 'finish':
146 break
147
148 case 'drain':
149 if (typeof this[_fd] === 'number')
150 this[_read]()
151 break
152
153 default:
154 return super.emit(ev, data)
155 }
156 }
157}
158
159class ReadStreamSync extends ReadStream {
160 [_open] () {
161 let threw = true
162 try {
163 this[_onopen](null, fs.openSync(this[_path], 'r'))
164 threw = false
165 } finally {
166 if (threw)
167 this[_close]()
168 }
169 }
170
171 [_read] () {
172 let threw = true
173 try {
174 if (!this[_reading]) {
175 this[_reading] = true
176 do {
177 const buf = this[_makeBuf]()
178 /* istanbul ignore next */
179 const br = buf.length === 0 ? 0 : fs.readSync(this[_fd], buf, 0, buf.length, null)
180 if (!this[_handleChunk](br, buf))
181 break
182 } while (true)
183 this[_reading] = false
184 }
185 threw = false
186 } finally {
187 if (threw)
188 this[_close]()
189 }
190 }
191
192 [_close] () {
193 if (this[_autoClose] && typeof this[_fd] === 'number') {
194 try {
195 fs.closeSync(this[_fd])
196 } catch (er) {}
197 this[_fd] = null
198 this.emit('close')
199 }
200 }
201}
202
203class WriteStream extends EE {
204 constructor (path, opt) {
205 opt = opt || {}
206 super(opt)
207 this.readable = false
208 this[_writing] = false
209 this[_ended] = false
210 this[_needDrain] = false
211 this[_queue] = []
212 this[_path] = path
213 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
214 this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
215 this[_pos] = typeof opt.start === 'number' ? opt.start : null
216 this[_autoClose] = typeof opt.autoClose === 'boolean' ?
217 opt.autoClose : true
218
219 // truncating makes no sense when writing into the middle
220 const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
221 this[_defaultFlag] = opt.flags === undefined
222 this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
223
224 if (this[_fd] === null)
225 this[_open]()
226 }
227
228 get fd () { return this[_fd] }
229 get path () { return this[_path] }
230
231 [_onerror] (er) {
232 this[_close]()
233 this[_writing] = true
234 this.emit('error', er)
235 }
236
237 [_open] () {
238 fs.open(this[_path], this[_flags], this[_mode],
239 (er, fd) => this[_onopen](er, fd))
240 }
241
242 [_onopen] (er, fd) {
243 if (this[_defaultFlag] &&
244 this[_flags] === 'r+' &&
245 er && er.code === 'ENOENT') {
246 this[_flags] = 'w'
247 this[_open]()
248 } else if (er)
249 this[_onerror](er)
250 else {
251 this[_fd] = fd
252 this.emit('open', fd)
253 this[_flush]()
254 }
255 }
256
257 end (buf, enc) {
258 if (buf)
259 this.write(buf, enc)
260
261 this[_ended] = true
262
263 // synthetic after-write logic, where drain/finish live
264 if (!this[_writing] && !this[_queue].length &&
265 typeof this[_fd] === 'number')
266 this[_onwrite](null, 0)
267 }
268
269 write (buf, enc) {
270 if (typeof buf === 'string')
271 buf = new Buffer(buf, enc)
272
273 if (this[_ended]) {
274 this.emit('error', new Error('write() after end()'))
275 return false
276 }
277
278 if (this[_fd] === null || this[_writing] || this[_queue].length) {
279 this[_queue].push(buf)
280 this[_needDrain] = true
281 return false
282 }
283
284 this[_writing] = true
285 this[_write](buf)
286 return true
287 }
288
289 [_write] (buf) {
290 fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
291 this[_onwrite](er, bw))
292 }
293
294 [_onwrite] (er, bw) {
295 if (er)
296 this[_onerror](er)
297 else {
298 if (this[_pos] !== null)
299 this[_pos] += bw
300 if (this[_queue].length)
301 this[_flush]()
302 else {
303 this[_writing] = false
304
305 if (this[_ended] && !this[_finished]) {
306 this[_finished] = true
307 this[_close]()
308 this.emit('finish')
309 } else if (this[_needDrain]) {
310 this[_needDrain] = false
311 this.emit('drain')
312 }
313 }
314 }
315 }
316
317 [_flush] () {
318 if (this[_queue].length === 0) {
319 if (this[_ended])
320 this[_onwrite](null, 0)
321 } else if (this[_queue].length === 1)
322 this[_write](this[_queue].pop())
323 else {
324 const iovec = this[_queue]
325 this[_queue] = []
326 writev(this[_fd], iovec, this[_pos],
327 (er, bw) => this[_onwrite](er, bw))
328 }
329 }
330
331 [_close] () {
332 if (this[_autoClose] && typeof this[_fd] === 'number') {
333 fs.close(this[_fd], _ => this.emit('close'))
334 this[_fd] = null
335 }
336 }
337}
338
339class WriteStreamSync extends WriteStream {
340 [_open] () {
341 let fd
342 try {
343 fd = fs.openSync(this[_path], this[_flags], this[_mode])
344 } catch (er) {
345 if (this[_defaultFlag] &&
346 this[_flags] === 'r+' &&
347 er && er.code === 'ENOENT') {
348 this[_flags] = 'w'
349 return this[_open]()
350 } else
351 throw er
352 }
353 this[_onopen](null, fd)
354 }
355
356 [_close] () {
357 if (this[_autoClose] && typeof this[_fd] === 'number') {
358 try {
359 fs.closeSync(this[_fd])
360 } catch (er) {}
361 this[_fd] = null
362 this.emit('close')
363 }
364 }
365
366 [_write] (buf) {
367 try {
368 this[_onwrite](null,
369 fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
370 } catch (er) {
371 this[_onwrite](er, 0)
372 }
373 }
374}
375
376const writev = (fd, iovec, pos, cb) => {
377 const done = (er, bw) => cb(er, bw, iovec)
378 const req = new FSReqWrap()
379 req.oncomplete = done
380 binding.writeBuffers(fd, iovec, pos, req)
381}
382
383exports.ReadStream = ReadStream
384exports.ReadStreamSync = ReadStreamSync
385
386exports.WriteStream = WriteStream
387exports.WriteStreamSync = WriteStreamSync