UNPKG

14 kBJavaScriptView Raw
1'use strict'
2const EE = require('events')
3const Stream = require('stream')
4const Yallist = require('yallist')
5const SD = require('string_decoder').StringDecoder
6
7const EOF = Symbol('EOF')
8const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
9const EMITTED_END = Symbol('emittedEnd')
10const EMITTING_END = Symbol('emittingEnd')
11const CLOSED = Symbol('closed')
12const READ = Symbol('read')
13const FLUSH = Symbol('flush')
14const FLUSHCHUNK = Symbol('flushChunk')
15const ENCODING = Symbol('encoding')
16const DECODER = Symbol('decoder')
17const FLOWING = Symbol('flowing')
18const PAUSED = Symbol('paused')
19const RESUME = Symbol('resume')
20const BUFFERLENGTH = Symbol('bufferLength')
21const BUFFERPUSH = Symbol('bufferPush')
22const BUFFERSHIFT = Symbol('bufferShift')
23const OBJECTMODE = Symbol('objectMode')
24const DESTROYED = Symbol('destroyed')
25
26// TODO remove when Node v8 support drops
27const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
28const ASYNCITERATOR = doIter && Symbol.asyncIterator
29 || Symbol('asyncIterator not implemented')
30const ITERATOR = doIter && Symbol.iterator
31 || Symbol('iterator not implemented')
32
33// events that mean 'the stream is over'
34// these are treated specially, and re-emitted
35// if they are listened for after emitting.
36const isEndish = ev =>
37 ev === 'end' ||
38 ev === 'finish' ||
39 ev === 'prefinish'
40
41const isArrayBuffer = b => b instanceof ArrayBuffer ||
42 typeof b === 'object' &&
43 b.constructor &&
44 b.constructor.name === 'ArrayBuffer' &&
45 b.byteLength >= 0
46
47const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
48
49module.exports = class Minipass extends Stream {
50 constructor (options) {
51 super()
52 this[FLOWING] = false
53 // whether we're explicitly paused
54 this[PAUSED] = false
55 this.pipes = new Yallist()
56 this.buffer = new Yallist()
57 this[OBJECTMODE] = options && options.objectMode || false
58 if (this[OBJECTMODE])
59 this[ENCODING] = null
60 else
61 this[ENCODING] = options && options.encoding || null
62 if (this[ENCODING] === 'buffer')
63 this[ENCODING] = null
64 this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
65 this[EOF] = false
66 this[EMITTED_END] = false
67 this[EMITTING_END] = false
68 this[CLOSED] = false
69 this.writable = true
70 this.readable = true
71 this[BUFFERLENGTH] = 0
72 this[DESTROYED] = false
73 }
74
75 get bufferLength () { return this[BUFFERLENGTH] }
76
77 get encoding () { return this[ENCODING] }
78 set encoding (enc) {
79 if (this[OBJECTMODE])
80 throw new Error('cannot set encoding in objectMode')
81
82 if (this[ENCODING] && enc !== this[ENCODING] &&
83 (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
84 throw new Error('cannot change encoding')
85
86 if (this[ENCODING] !== enc) {
87 this[DECODER] = enc ? new SD(enc) : null
88 if (this.buffer.length)
89 this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
90 }
91
92 this[ENCODING] = enc
93 }
94
95 setEncoding (enc) {
96 this.encoding = enc
97 }
98
99 get objectMode () { return this[OBJECTMODE] }
100 set objectMode (ॐ ) { this[OBJECTMODE] = this[OBJECTMODE] || !!ॐ }
101
102 write (chunk, encoding, cb) {
103 if (this[EOF])
104 throw new Error('write after end')
105
106 if (this[DESTROYED]) {
107 this.emit('error', Object.assign(
108 new Error('Cannot call write after a stream was destroyed'),
109 { code: 'ERR_STREAM_DESTROYED' }
110 ))
111 return true
112 }
113
114 if (typeof encoding === 'function')
115 cb = encoding, encoding = 'utf8'
116
117 if (!encoding)
118 encoding = 'utf8'
119
120 // convert array buffers and typed array views into buffers
121 // at some point in the future, we may want to do the opposite!
122 // leave strings and buffers as-is
123 // anything else switches us into object mode
124 if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
125 if (isArrayBufferView(chunk))
126 chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
127 else if (isArrayBuffer(chunk))
128 chunk = Buffer.from(chunk)
129 else if (typeof chunk !== 'string')
130 // use the setter so we throw if we have encoding set
131 this.objectMode = true
132 }
133
134 // this ensures at this point that the chunk is a buffer or string
135 // don't buffer it up or send it to the decoder
136 if (!this.objectMode && !chunk.length) {
137 const ret = this.flowing
138 if (this[BUFFERLENGTH] !== 0)
139 this.emit('readable')
140 if (cb)
141 cb()
142 return ret
143 }
144
145 // fast-path writing strings of same encoding to a stream with
146 // an empty buffer, skipping the buffer/decoder dance
147 if (typeof chunk === 'string' && !this[OBJECTMODE] &&
148 // unless it is a string already ready for us to use
149 !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
150 chunk = Buffer.from(chunk, encoding)
151 }
152
153 if (Buffer.isBuffer(chunk) && this[ENCODING])
154 chunk = this[DECODER].write(chunk)
155
156 try {
157 return this.flowing
158 ? (this.emit('data', chunk), this.flowing)
159 : (this[BUFFERPUSH](chunk), false)
160 } finally {
161 if (this[BUFFERLENGTH] !== 0)
162 this.emit('readable')
163 if (cb)
164 cb()
165 }
166 }
167
168 read (n) {
169 if (this[DESTROYED])
170 return null
171
172 try {
173 if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
174 return null
175
176 if (this[OBJECTMODE])
177 n = null
178
179 if (this.buffer.length > 1 && !this[OBJECTMODE]) {
180 if (this.encoding)
181 this.buffer = new Yallist([
182 Array.from(this.buffer).join('')
183 ])
184 else
185 this.buffer = new Yallist([
186 Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
187 ])
188 }
189
190 return this[READ](n || null, this.buffer.head.value)
191 } finally {
192 this[MAYBE_EMIT_END]()
193 }
194 }
195
196 [READ] (n, chunk) {
197 if (n === chunk.length || n === null)
198 this[BUFFERSHIFT]()
199 else {
200 this.buffer.head.value = chunk.slice(n)
201 chunk = chunk.slice(0, n)
202 this[BUFFERLENGTH] -= n
203 }
204
205 this.emit('data', chunk)
206
207 if (!this.buffer.length && !this[EOF])
208 this.emit('drain')
209
210 return chunk
211 }
212
213 end (chunk, encoding, cb) {
214 if (typeof chunk === 'function')
215 cb = chunk, chunk = null
216 if (typeof encoding === 'function')
217 cb = encoding, encoding = 'utf8'
218 if (chunk)
219 this.write(chunk, encoding)
220 if (cb)
221 this.once('end', cb)
222 this[EOF] = true
223 this.writable = false
224
225 // if we haven't written anything, then go ahead and emit,
226 // even if we're not reading.
227 // we'll re-emit if a new 'end' listener is added anyway.
228 // This makes MP more suitable to write-only use cases.
229 if (this.flowing || !this[PAUSED])
230 this[MAYBE_EMIT_END]()
231 return this
232 }
233
234 // don't let the internal resume be overwritten
235 [RESUME] () {
236 if (this[DESTROYED])
237 return
238
239 this[PAUSED] = false
240 this[FLOWING] = true
241 this.emit('resume')
242 if (this.buffer.length)
243 this[FLUSH]()
244 else if (this[EOF])
245 this[MAYBE_EMIT_END]()
246 else
247 this.emit('drain')
248 }
249
250 resume () {
251 return this[RESUME]()
252 }
253
254 pause () {
255 this[FLOWING] = false
256 this[PAUSED] = true
257 }
258
259 get destroyed () {
260 return this[DESTROYED]
261 }
262
263 get flowing () {
264 return this[FLOWING]
265 }
266
267 get paused () {
268 return this[PAUSED]
269 }
270
271 [BUFFERPUSH] (chunk) {
272 if (this[OBJECTMODE])
273 this[BUFFERLENGTH] += 1
274 else
275 this[BUFFERLENGTH] += chunk.length
276 return this.buffer.push(chunk)
277 }
278
279 [BUFFERSHIFT] () {
280 if (this.buffer.length) {
281 if (this[OBJECTMODE])
282 this[BUFFERLENGTH] -= 1
283 else
284 this[BUFFERLENGTH] -= this.buffer.head.value.length
285 }
286 return this.buffer.shift()
287 }
288
289 [FLUSH] () {
290 do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
291
292 if (!this.buffer.length && !this[EOF])
293 this.emit('drain')
294 }
295
296 [FLUSHCHUNK] (chunk) {
297 return chunk ? (this.emit('data', chunk), this.flowing) : false
298 }
299
300 pipe (dest, opts) {
301 if (this[DESTROYED])
302 return
303
304 const ended = this[EMITTED_END]
305 opts = opts || {}
306 if (dest === process.stdout || dest === process.stderr)
307 opts.end = false
308 else
309 opts.end = opts.end !== false
310
311 const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
312 this.pipes.push(p)
313
314 dest.on('drain', p.ondrain)
315 this[RESUME]()
316 // piping an ended stream ends immediately
317 if (ended && p.opts.end)
318 p.dest.end()
319 return dest
320 }
321
322 addListener (ev, fn) {
323 return this.on(ev, fn)
324 }
325
326 on (ev, fn) {
327 try {
328 return super.on(ev, fn)
329 } finally {
330 if (ev === 'data' && !this.pipes.length && !this.flowing)
331 this[RESUME]()
332 else if (isEndish(ev) && this[EMITTED_END]) {
333 super.emit(ev)
334 this.removeAllListeners(ev)
335 }
336 }
337 }
338
339 get emittedEnd () {
340 return this[EMITTED_END]
341 }
342
343 [MAYBE_EMIT_END] () {
344 if (!this[EMITTING_END] &&
345 !this[EMITTED_END] &&
346 !this[DESTROYED] &&
347 this.buffer.length === 0 &&
348 this[EOF]) {
349 this[EMITTING_END] = true
350 this.emit('end')
351 this.emit('prefinish')
352 this.emit('finish')
353 if (this[CLOSED])
354 this.emit('close')
355 this[EMITTING_END] = false
356 }
357 }
358
359 emit (ev, data) {
360 // error and close are only events allowed after calling destroy()
361 if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
362 return
363 else if (ev === 'data') {
364 if (!data)
365 return
366
367 if (this.pipes.length)
368 this.pipes.forEach(p =>
369 p.dest.write(data) === false && this.pause())
370 } else if (ev === 'end') {
371 // only actual end gets this treatment
372 if (this[EMITTED_END] === true)
373 return
374
375 this[EMITTED_END] = true
376 this.readable = false
377
378 if (this[DECODER]) {
379 data = this[DECODER].end()
380 if (data) {
381 this.pipes.forEach(p => p.dest.write(data))
382 super.emit('data', data)
383 }
384 }
385
386 this.pipes.forEach(p => {
387 p.dest.removeListener('drain', p.ondrain)
388 if (p.opts.end)
389 p.dest.end()
390 })
391 } else if (ev === 'close') {
392 this[CLOSED] = true
393 // don't emit close before 'end' and 'finish'
394 if (!this[EMITTED_END] && !this[DESTROYED])
395 return
396 }
397
398 // TODO: replace with a spread operator when Node v4 support drops
399 const args = new Array(arguments.length)
400 args[0] = ev
401 args[1] = data
402 if (arguments.length > 2) {
403 for (let i = 2; i < arguments.length; i++) {
404 args[i] = arguments[i]
405 }
406 }
407
408 try {
409 return super.emit.apply(this, args)
410 } finally {
411 if (!isEndish(ev))
412 this[MAYBE_EMIT_END]()
413 else
414 this.removeAllListeners(ev)
415 }
416 }
417
418 // const all = await stream.collect()
419 collect () {
420 const buf = []
421 if (!this[OBJECTMODE])
422 buf.dataLength = 0
423 // set the promise first, in case an error is raised
424 // by triggering the flow here.
425 const p = this.promise()
426 this.on('data', c => {
427 buf.push(c)
428 if (!this[OBJECTMODE])
429 buf.dataLength += c.length
430 })
431 return p.then(() => buf)
432 }
433
434 // const data = await stream.concat()
435 concat () {
436 return this[OBJECTMODE]
437 ? Promise.reject(new Error('cannot concat in objectMode'))
438 : this.collect().then(buf =>
439 this[OBJECTMODE]
440 ? Promise.reject(new Error('cannot concat in objectMode'))
441 : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
442 }
443
444 // stream.promise().then(() => done, er => emitted error)
445 promise () {
446 return new Promise((resolve, reject) => {
447 this.on(DESTROYED, () => reject(new Error('stream destroyed')))
448 this.on('end', () => resolve())
449 this.on('error', er => reject(er))
450 })
451 }
452
453 // for await (let chunk of stream)
454 [ASYNCITERATOR] () {
455 const next = () => {
456 const res = this.read()
457 if (res !== null)
458 return Promise.resolve({ done: false, value: res })
459
460 if (this[EOF])
461 return Promise.resolve({ done: true })
462
463 let resolve = null
464 let reject = null
465 const onerr = er => {
466 this.removeListener('data', ondata)
467 this.removeListener('end', onend)
468 reject(er)
469 }
470 const ondata = value => {
471 this.removeListener('error', onerr)
472 this.removeListener('end', onend)
473 this.pause()
474 resolve({ value: value, done: !!this[EOF] })
475 }
476 const onend = () => {
477 this.removeListener('error', onerr)
478 this.removeListener('data', ondata)
479 resolve({ done: true })
480 }
481 const ondestroy = () => onerr(new Error('stream destroyed'))
482 return new Promise((res, rej) => {
483 reject = rej
484 resolve = res
485 this.once(DESTROYED, ondestroy)
486 this.once('error', onerr)
487 this.once('end', onend)
488 this.once('data', ondata)
489 })
490 }
491
492 return { next }
493 }
494
495 // for (let chunk of stream)
496 [ITERATOR] () {
497 const next = () => {
498 const value = this.read()
499 const done = value === null
500 return { value, done }
501 }
502 return { next }
503 }
504
505 destroy (er) {
506 if (this[DESTROYED]) {
507 if (er)
508 this.emit('error', er)
509 else
510 this.emit(DESTROYED)
511 return this
512 }
513
514 this[DESTROYED] = true
515
516 // throw away all buffered data, it's never coming out
517 this.buffer = new Yallist()
518 this[BUFFERLENGTH] = 0
519
520 if (typeof this.close === 'function' && !this[CLOSED])
521 this.close()
522
523 if (er)
524 this.emit('error', er)
525 else // if no error to emit, still reject pending promises
526 this.emit(DESTROYED)
527
528 return this
529 }
530
531 static isStream (s) {
532 return !!s && (s instanceof Minipass || s instanceof Stream ||
533 s instanceof EE && (
534 typeof s.pipe === 'function' || // readable
535 (typeof s.write === 'function' && typeof s.end === 'function') // writable
536 ))
537 }
538}