UNPKG

14.3 kBJavaScriptView Raw
1'use strict'
2const EE = require('events')
3const Stream = require('stream')
4const Yallist = require('yallist')
5const SD = require('string_decoder').StringDecoder
6
7const EOF = Symbol('EOF')
8const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
9const EMITTED_END = Symbol('emittedEnd')
10const EMITTING_END = Symbol('emittingEnd')
11const CLOSED = Symbol('closed')
12const READ = Symbol('read')
13const FLUSH = Symbol('flush')
14const FLUSHCHUNK = Symbol('flushChunk')
15const ENCODING = Symbol('encoding')
16const DECODER = Symbol('decoder')
17const FLOWING = Symbol('flowing')
18const PAUSED = Symbol('paused')
19const RESUME = Symbol('resume')
20const BUFFERLENGTH = Symbol('bufferLength')
21const BUFFERPUSH = Symbol('bufferPush')
22const BUFFERSHIFT = Symbol('bufferShift')
23const OBJECTMODE = Symbol('objectMode')
24const DESTROYED = Symbol('destroyed')
25
26// TODO remove when Node v8 support drops
27const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
28const ASYNCITERATOR = doIter && Symbol.asyncIterator
29 || Symbol('asyncIterator not implemented')
30const ITERATOR = doIter && Symbol.iterator
31 || Symbol('iterator not implemented')
32
33// events that mean 'the stream is over'
34// these are treated specially, and re-emitted
35// if they are listened for after emitting.
36const isEndish = ev =>
37 ev === 'end' ||
38 ev === 'finish' ||
39 ev === 'prefinish'
40
41const isArrayBuffer = b => b instanceof ArrayBuffer ||
42 typeof b === 'object' &&
43 b.constructor &&
44 b.constructor.name === 'ArrayBuffer' &&
45 b.byteLength >= 0
46
47const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
48
49module.exports = class Minipass extends Stream {
50 constructor (options) {
51 super()
52 this[FLOWING] = false
53 // whether we're explicitly paused
54 this[PAUSED] = false
55 this.pipes = new Yallist()
56 this.buffer = new Yallist()
57 this[OBJECTMODE] = options && options.objectMode || false
58 if (this[OBJECTMODE])
59 this[ENCODING] = null
60 else
61 this[ENCODING] = options && options.encoding || null
62 if (this[ENCODING] === 'buffer')
63 this[ENCODING] = null
64 this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
65 this[EOF] = false
66 this[EMITTED_END] = false
67 this[EMITTING_END] = false
68 this[CLOSED] = false
69 this.writable = true
70 this.readable = true
71 this[BUFFERLENGTH] = 0
72 this[DESTROYED] = false
73 }
74
75 get bufferLength () { return this[BUFFERLENGTH] }
76
77 get encoding () { return this[ENCODING] }
78 set encoding (enc) {
79 if (this[OBJECTMODE])
80 throw new Error('cannot set encoding in objectMode')
81
82 if (this[ENCODING] && enc !== this[ENCODING] &&
83 (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
84 throw new Error('cannot change encoding')
85
86 if (this[ENCODING] !== enc) {
87 this[DECODER] = enc ? new SD(enc) : null
88 if (this.buffer.length)
89 this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
90 }
91
92 this[ENCODING] = enc
93 }
94
95 setEncoding (enc) {
96 this.encoding = enc
97 }
98
99 get objectMode () { return this[OBJECTMODE] }
100 set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
101
102 write (chunk, encoding, cb) {
103 if (this[EOF])
104 throw new Error('write after end')
105
106 if (this[DESTROYED]) {
107 this.emit('error', Object.assign(
108 new Error('Cannot call write after a stream was destroyed'),
109 { code: 'ERR_STREAM_DESTROYED' }
110 ))
111 return true
112 }
113
114 if (typeof encoding === 'function')
115 cb = encoding, encoding = 'utf8'
116
117 if (!encoding)
118 encoding = 'utf8'
119
120 // convert array buffers and typed array views into buffers
121 // at some point in the future, we may want to do the opposite!
122 // leave strings and buffers as-is
123 // anything else switches us into object mode
124 if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
125 if (isArrayBufferView(chunk))
126 chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
127 else if (isArrayBuffer(chunk))
128 chunk = Buffer.from(chunk)
129 else if (typeof chunk !== 'string')
130 // use the setter so we throw if we have encoding set
131 this.objectMode = true
132 }
133
134 // this ensures at this point that the chunk is a buffer or string
135 // don't buffer it up or send it to the decoder
136 if (!this.objectMode && !chunk.length) {
137 if (this[BUFFERLENGTH] !== 0)
138 this.emit('readable')
139 if (cb)
140 cb()
141 return this.flowing
142 }
143
144 // fast-path writing strings of same encoding to a stream with
145 // an empty buffer, skipping the buffer/decoder dance
146 if (typeof chunk === 'string' && !this[OBJECTMODE] &&
147 // unless it is a string already ready for us to use
148 !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
149 chunk = Buffer.from(chunk, encoding)
150 }
151
152 if (Buffer.isBuffer(chunk) && this[ENCODING])
153 chunk = this[DECODER].write(chunk)
154
155 if (this.flowing) {
156 // if we somehow have something in the buffer, but we think we're
157 // flowing, then we need to flush all that out first, or we get
158 // chunks coming in out of order. Can't emit 'drain' here though,
159 // because we're mid-write, so that'd be bad.
160 if (this[BUFFERLENGTH] !== 0)
161 this[FLUSH](true)
162 this.emit('data', chunk)
163 } else
164 this[BUFFERPUSH](chunk)
165
166 if (this[BUFFERLENGTH] !== 0)
167 this.emit('readable')
168
169 if (cb)
170 cb()
171
172 return this.flowing
173 }
174
175 read (n) {
176 if (this[DESTROYED])
177 return null
178
179 try {
180 if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
181 return null
182
183 if (this[OBJECTMODE])
184 n = null
185
186 if (this.buffer.length > 1 && !this[OBJECTMODE]) {
187 if (this.encoding)
188 this.buffer = new Yallist([
189 Array.from(this.buffer).join('')
190 ])
191 else
192 this.buffer = new Yallist([
193 Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
194 ])
195 }
196
197 return this[READ](n || null, this.buffer.head.value)
198 } finally {
199 this[MAYBE_EMIT_END]()
200 }
201 }
202
203 [READ] (n, chunk) {
204 if (n === chunk.length || n === null)
205 this[BUFFERSHIFT]()
206 else {
207 this.buffer.head.value = chunk.slice(n)
208 chunk = chunk.slice(0, n)
209 this[BUFFERLENGTH] -= n
210 }
211
212 this.emit('data', chunk)
213
214 if (!this.buffer.length && !this[EOF])
215 this.emit('drain')
216
217 return chunk
218 }
219
220 end (chunk, encoding, cb) {
221 if (typeof chunk === 'function')
222 cb = chunk, chunk = null
223 if (typeof encoding === 'function')
224 cb = encoding, encoding = 'utf8'
225 if (chunk)
226 this.write(chunk, encoding)
227 if (cb)
228 this.once('end', cb)
229 this[EOF] = true
230 this.writable = false
231
232 // if we haven't written anything, then go ahead and emit,
233 // even if we're not reading.
234 // we'll re-emit if a new 'end' listener is added anyway.
235 // This makes MP more suitable to write-only use cases.
236 if (this.flowing || !this[PAUSED])
237 this[MAYBE_EMIT_END]()
238 return this
239 }
240
241 // don't let the internal resume be overwritten
242 [RESUME] () {
243 if (this[DESTROYED])
244 return
245
246 this[PAUSED] = false
247 this[FLOWING] = true
248 this.emit('resume')
249 if (this.buffer.length)
250 this[FLUSH]()
251 else if (this[EOF])
252 this[MAYBE_EMIT_END]()
253 else
254 this.emit('drain')
255 }
256
257 resume () {
258 return this[RESUME]()
259 }
260
261 pause () {
262 this[FLOWING] = false
263 this[PAUSED] = true
264 }
265
266 get destroyed () {
267 return this[DESTROYED]
268 }
269
270 get flowing () {
271 return this[FLOWING]
272 }
273
274 get paused () {
275 return this[PAUSED]
276 }
277
278 [BUFFERPUSH] (chunk) {
279 if (this[OBJECTMODE])
280 this[BUFFERLENGTH] += 1
281 else
282 this[BUFFERLENGTH] += chunk.length
283 return this.buffer.push(chunk)
284 }
285
286 [BUFFERSHIFT] () {
287 if (this.buffer.length) {
288 if (this[OBJECTMODE])
289 this[BUFFERLENGTH] -= 1
290 else
291 this[BUFFERLENGTH] -= this.buffer.head.value.length
292 }
293 return this.buffer.shift()
294 }
295
296 [FLUSH] (noDrain) {
297 do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
298
299 if (!noDrain && !this.buffer.length && !this[EOF])
300 this.emit('drain')
301 }
302
303 [FLUSHCHUNK] (chunk) {
304 return chunk ? (this.emit('data', chunk), this.flowing) : false
305 }
306
307 pipe (dest, opts) {
308 if (this[DESTROYED])
309 return
310
311 const ended = this[EMITTED_END]
312 opts = opts || {}
313 if (dest === process.stdout || dest === process.stderr)
314 opts.end = false
315 else
316 opts.end = opts.end !== false
317
318 const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
319 this.pipes.push(p)
320
321 dest.on('drain', p.ondrain)
322 this[RESUME]()
323 // piping an ended stream ends immediately
324 if (ended && p.opts.end)
325 p.dest.end()
326 return dest
327 }
328
329 addListener (ev, fn) {
330 return this.on(ev, fn)
331 }
332
333 on (ev, fn) {
334 try {
335 return super.on(ev, fn)
336 } finally {
337 if (ev === 'data' && !this.pipes.length && !this.flowing)
338 this[RESUME]()
339 else if (isEndish(ev) && this[EMITTED_END]) {
340 super.emit(ev)
341 this.removeAllListeners(ev)
342 }
343 }
344 }
345
346 get emittedEnd () {
347 return this[EMITTED_END]
348 }
349
350 [MAYBE_EMIT_END] () {
351 if (!this[EMITTING_END] &&
352 !this[EMITTED_END] &&
353 !this[DESTROYED] &&
354 this.buffer.length === 0 &&
355 this[EOF]) {
356 this[EMITTING_END] = true
357 this.emit('end')
358 this.emit('prefinish')
359 this.emit('finish')
360 if (this[CLOSED])
361 this.emit('close')
362 this[EMITTING_END] = false
363 }
364 }
365
366 emit (ev, data) {
367 // error and close are only events allowed after calling destroy()
368 if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
369 return
370 else if (ev === 'data') {
371 if (!data)
372 return
373
374 if (this.pipes.length)
375 this.pipes.forEach(p =>
376 p.dest.write(data) === false && this.pause())
377 } else if (ev === 'end') {
378 // only actual end gets this treatment
379 if (this[EMITTED_END] === true)
380 return
381
382 this[EMITTED_END] = true
383 this.readable = false
384
385 if (this[DECODER]) {
386 data = this[DECODER].end()
387 if (data) {
388 this.pipes.forEach(p => p.dest.write(data))
389 super.emit('data', data)
390 }
391 }
392
393 this.pipes.forEach(p => {
394 p.dest.removeListener('drain', p.ondrain)
395 if (p.opts.end)
396 p.dest.end()
397 })
398 } else if (ev === 'close') {
399 this[CLOSED] = true
400 // don't emit close before 'end' and 'finish'
401 if (!this[EMITTED_END] && !this[DESTROYED])
402 return
403 }
404
405 // TODO: replace with a spread operator when Node v4 support drops
406 const args = new Array(arguments.length)
407 args[0] = ev
408 args[1] = data
409 if (arguments.length > 2) {
410 for (let i = 2; i < arguments.length; i++) {
411 args[i] = arguments[i]
412 }
413 }
414
415 try {
416 return super.emit.apply(this, args)
417 } finally {
418 if (!isEndish(ev))
419 this[MAYBE_EMIT_END]()
420 else
421 this.removeAllListeners(ev)
422 }
423 }
424
425 // const all = await stream.collect()
426 collect () {
427 const buf = []
428 if (!this[OBJECTMODE])
429 buf.dataLength = 0
430 // set the promise first, in case an error is raised
431 // by triggering the flow here.
432 const p = this.promise()
433 this.on('data', c => {
434 buf.push(c)
435 if (!this[OBJECTMODE])
436 buf.dataLength += c.length
437 })
438 return p.then(() => buf)
439 }
440
441 // const data = await stream.concat()
442 concat () {
443 return this[OBJECTMODE]
444 ? Promise.reject(new Error('cannot concat in objectMode'))
445 : this.collect().then(buf =>
446 this[OBJECTMODE]
447 ? Promise.reject(new Error('cannot concat in objectMode'))
448 : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
449 }
450
451 // stream.promise().then(() => done, er => emitted error)
452 promise () {
453 return new Promise((resolve, reject) => {
454 this.on(DESTROYED, () => reject(new Error('stream destroyed')))
455 this.on('end', () => resolve())
456 this.on('error', er => reject(er))
457 })
458 }
459
460 // for await (let chunk of stream)
461 [ASYNCITERATOR] () {
462 const next = () => {
463 const res = this.read()
464 if (res !== null)
465 return Promise.resolve({ done: false, value: res })
466
467 if (this[EOF])
468 return Promise.resolve({ done: true })
469
470 let resolve = null
471 let reject = null
472 const onerr = er => {
473 this.removeListener('data', ondata)
474 this.removeListener('end', onend)
475 reject(er)
476 }
477 const ondata = value => {
478 this.removeListener('error', onerr)
479 this.removeListener('end', onend)
480 this.pause()
481 resolve({ value: value, done: !!this[EOF] })
482 }
483 const onend = () => {
484 this.removeListener('error', onerr)
485 this.removeListener('data', ondata)
486 resolve({ done: true })
487 }
488 const ondestroy = () => onerr(new Error('stream destroyed'))
489 return new Promise((res, rej) => {
490 reject = rej
491 resolve = res
492 this.once(DESTROYED, ondestroy)
493 this.once('error', onerr)
494 this.once('end', onend)
495 this.once('data', ondata)
496 })
497 }
498
499 return { next }
500 }
501
502 // for (let chunk of stream)
503 [ITERATOR] () {
504 const next = () => {
505 const value = this.read()
506 const done = value === null
507 return { value, done }
508 }
509 return { next }
510 }
511
512 destroy (er) {
513 if (this[DESTROYED]) {
514 if (er)
515 this.emit('error', er)
516 else
517 this.emit(DESTROYED)
518 return this
519 }
520
521 this[DESTROYED] = true
522
523 // throw away all buffered data, it's never coming out
524 this.buffer = new Yallist()
525 this[BUFFERLENGTH] = 0
526
527 if (typeof this.close === 'function' && !this[CLOSED])
528 this.close()
529
530 if (er)
531 this.emit('error', er)
532 else // if no error to emit, still reject pending promises
533 this.emit(DESTROYED)
534
535 return this
536 }
537
538 static isStream (s) {
539 return !!s && (s instanceof Minipass || s instanceof Stream ||
540 s instanceof EE && (
541 typeof s.pipe === 'function' || // readable
542 (typeof s.write === 'function' && typeof s.end === 'function') // writable
543 ))
544 }
545}