UNPKG

21.4 kBMarkdownView Raw
1# minipass
2
3A _very_ minimal implementation of a [PassThrough
4stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
5
6[It's very
7fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0)
8for objects, strings, and buffers.
9
10Supports pipe()ing (including multi-pipe() and backpressure transmission),
11buffering data until either a `data` event handler or `pipe()` is added (so
12you don't lose the first chunk), and most other cases where PassThrough is
13a good idea.
14
15There is a `read()` method, but it's much more efficient to consume data
16from this stream via `'data'` events or by calling `pipe()` into some other
17stream. Calling `read()` requires the buffer to be flattened in some
18cases, which requires copying memory.
19
20There is also no `unpipe()` method. Once you start piping, there is no
21stopping it!
22
23If you set `objectMode: true` in the options, then whatever is written will
24be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
25ensure proper Streams semantics when `read(n)` is called.
26
27`objectMode` can also be set by doing `stream.objectMode = true`, or by
28writing any non-string/non-buffer data. `objectMode` cannot be set to
29false once it is set.
30
31This is not a `through` or `through2` stream. It doesn't transform the
32data, it just passes it right through. If you want to transform the data,
33extend the class, and override the `write()` method. Once you're done
34transforming the data however you want, call `super.write()` with the
35transform output.
36
37For some examples of streams that extend Minipass in various ways, check
38out:
39
40- [minizlib](http://npm.im/minizlib)
41- [fs-minipass](http://npm.im/fs-minipass)
42- [tar](http://npm.im/tar)
43- [minipass-collect](http://npm.im/minipass-collect)
44- [minipass-flush](http://npm.im/minipass-flush)
45- [minipass-pipeline](http://npm.im/minipass-pipeline)
46- [tap](http://npm.im/tap)
47- [tap-parser](http://npm.im/tap)
48- [treport](http://npm.im/tap)
49- [minipass-fetch](http://npm.im/minipass-fetch)
50- [pacote](http://npm.im/pacote)
51- [make-fetch-happen](http://npm.im/make-fetch-happen)
52- [cacache](http://npm.im/cacache)
53- [ssri](http://npm.im/ssri)
54- [npm-registry-fetch](http://npm.im/npm-registry-fetch)
55- [minipass-json-stream](http://npm.im/minipass-json-stream)
56- [minipass-sized](http://npm.im/minipass-sized)
57
58## Differences from Node.js Streams
59
60There are several things that make Minipass streams different from (and in
61some ways superior to) Node.js core streams.
62
63Please read these caveats if you are familiar with noode-core streams and
64intend to use Minipass streams in your programs.
65
66### Timing
67
68Minipass streams are designed to support synchronous use-cases. Thus, data
69is emitted as soon as it is available, always. It is buffered until read,
70but no longer. Another way to look at it is that Minipass streams are
71exactly as synchronous as the logic that writes into them.
72
73This can be surprising if your code relies on `PassThrough.write()` always
74providing data on the next tick rather than the current one, or being able
75to call `resume()` and not have the entire buffer disappear immediately.
76
77However, without this synchronicity guarantee, there would be no way for
78Minipass to achieve the speeds it does, or support the synchronous use
79cases that it does. Simply put, waiting takes time.
80
81This non-deferring approach makes Minipass streams much easier to reason
82about, especially in the context of Promises and other flow-control
83mechanisms.
84
85### No High/Low Water Marks
86
87Node.js core streams will optimistically fill up a buffer, returning `true`
88on all writes until the limit is hit, even if the data has nowhere to go.
89Then, they will not attempt to draw more data in until the buffer size dips
90below a minimum value.
91
92Minipass streams are much simpler. The `write()` method will return `true`
93if the data has somewhere to go (which is to say, given the timing
94guarantees, that the data is already there by the time `write()` returns).
95
96If the data has nowhere to go, then `write()` returns false, and the data
97sits in a buffer, to be drained out immediately as soon as anyone consumes
98it.
99
100### Hazards of Buffering (or: Why Minipass Is So Fast)
101
102Since data written to a Minipass stream is immediately written all the way
103through the pipeline, and `write()` always returns true/false based on
104whether the data was fully flushed, backpressure is communicated
105immediately to the upstream caller. This minimizes buffering.
106
107Consider this case:
108
109```js
110const {PassThrough} = require('stream')
111const p1 = new PassThrough({ highWaterMark: 1024 })
112const p2 = new PassThrough({ highWaterMark: 1024 })
113const p3 = new PassThrough({ highWaterMark: 1024 })
114const p4 = new PassThrough({ highWaterMark: 1024 })
115
116p1.pipe(p2).pipe(p3).pipe(p4)
117p4.on('data', () => console.log('made it through'))
118
119// this returns false and buffers, then writes to p2 on next tick (1)
120// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
121// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
122// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
123// on next tick (4)
124// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
125// 'drain' on next tick (5)
126// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
127// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
128// tick (7)
129
130p1.write(Buffer.alloc(2048)) // returns false
131```
132
133Along the way, the data was buffered and deferred at each stage, and
134multiple event deferrals happened, for an unblocked pipeline where it was
135perfectly safe to write all the way through!
136
137Furthermore, setting a `highWaterMark` of `1024` might lead someone reading
138the code to think an advisory maximum of 1KiB is being set for the
139pipeline. However, the actual advisory buffering level is the _sum_ of
140`highWaterMark` values, since each one has its own bucket.
141
142Consider the Minipass case:
143
144```js
145const m1 = new Minipass()
146const m2 = new Minipass()
147const m3 = new Minipass()
148const m4 = new Minipass()
149
150m1.pipe(m2).pipe(m3).pipe(m4)
151m4.on('data', () => console.log('made it through'))
152
153// m1 is flowing, so it writes the data to m2 immediately
154// m2 is flowing, so it writes the data to m3 immediately
155// m3 is flowing, so it writes the data to m4 immediately
156// m4 is flowing, so it fires the 'data' event immediately, returns true
157// m4's write returned true, so m3 is still flowing, returns true
158// m3's write returned true, so m2 is still flowing, returns true
159// m2's write returned true, so m1 is still flowing, returns true
160// No event deferrals or buffering along the way!
161
162m1.write(Buffer.alloc(2048)) // returns true
163```
164
165It is extremely unlikely that you _don't_ want to buffer any data written,
166or _ever_ buffer data that can be flushed all the way through. Neither
167node-core streams nor Minipass ever fail to buffer written data, but
168node-core streams do a lot of unnecessary buffering and pausing.
169
170As always, the faster implementation is the one that does less stuff and
171waits less time to do it.
172
173### Immediately emit `end` for empty streams (when not paused)
174
175If a stream is not paused, and `end()` is called before writing any data
176into it, then it will emit `end` immediately.
177
178If you have logic that occurs on the `end` event which you don't want to
179potentially happen immediately (for example, closing file descriptors,
180moving on to the next entry in an archive parse stream, etc.) then be sure
181to call `stream.pause()` on creation, and then `stream.resume()` once you
182are ready to respond to the `end` event.
183
184### Emit `end` When Asked
185
186One hazard of immediately emitting `'end'` is that you may not yet have had
187a chance to add a listener. In order to avoid this hazard, Minipass
188streams safely re-emit the `'end'` event if a new listener is added after
189`'end'` has been emitted.
190
191Ie, if you do `stream.on('end', someFunction)`, and the stream has already
192emitted `end`, then it will call the handler right away. (You can think of
193this somewhat like attaching a new `.then(fn)` to a previously-resolved
194Promise.)
195
196To prevent calling handlers multiple times who would not expect multiple
197ends to occur, all listeners are removed from the `'end'` event whenever it
198is emitted.
199
200### Impact of "immediate flow" on Tee-streams
201
202A "tee stream" is a stream piping to multiple destinations:
203
204```js
205const tee = new Minipass()
206t.pipe(dest1)
207t.pipe(dest2)
208t.write('foo') // goes to both destinations
209```
210
211Since Minipass streams _immediately_ process any pending data through the
212pipeline when a new pipe destination is added, this can have surprising
213effects, especially when a stream comes in from some other function and may
214or may not have data in its buffer.
215
216```js
217// WARNING! WILL LOSE DATA!
218const src = new Minipass()
219src.write('foo')
220src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
221src.pipe(dest2) // gets nothing!
222```
223
224The solution is to create a dedicated tee-stream junction that pipes to
225both locations, and then pipe to _that_ instead.
226
227```js
228// Safe example: tee to both places
229const src = new Minipass()
230src.write('foo')
231const tee = new Minipass()
232tee.pipe(dest1)
233tee.pipe(dest2)
234src.pipe(tee) // tee gets 'foo', pipes to both locations
235```
236
237The same caveat applies to `on('data')` event listeners. The first one
238added will _immediately_ receive all of the data, leaving nothing for the
239second:
240
241```js
242// WARNING! WILL LOSE DATA!
243const src = new Minipass()
244src.write('foo')
245src.on('data', handler1) // receives 'foo' right away
246src.on('data', handler2) // nothing to see here!
247```
248
249Using a dedicated tee-stream can be used in this case as well:
250
251```js
252// Safe example: tee to both data handlers
253const src = new Minipass()
254src.write('foo')
255const tee = new Minipass()
256tee.on('data', handler1)
257tee.on('data', handler2)
258src.pipe(tee)
259```
260
261## USAGE
262
263It's a stream! Use it like a stream and it'll most likely do what you
264want.
265
266```js
267const Minipass = require('minipass')
268const mp = new Minipass(options) // optional: { encoding, objectMode }
269mp.write('foo')
270mp.pipe(someOtherStream)
271mp.end('bar')
272```
273
274### OPTIONS
275
276* `encoding` How would you like the data coming _out_ of the stream to be
277 encoded? Accepts any values that can be passed to `Buffer.toString()`.
278* `objectMode` Emit data exactly as it comes in. This will be flipped on
279 by default if you write() something other than a string or Buffer at any
280 point. Setting `objectMode: true` will prevent setting any encoding
281 value.
282
283### API
284
285Implements the user-facing portions of Node.js's `Readable` and `Writable`
286streams.
287
288### Methods
289
290* `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the
291 base Minipass class, the same data will come out.) Returns `false` if
292 the stream will buffer the next write, or true if it's still in "flowing"
293 mode.
294* `end([chunk, [encoding]], [callback])` - Signal that you have no more
295 data to write. This will queue an `end` event to be fired when all the
296 data has been consumed.
297* `setEncoding(encoding)` - Set the encoding for data coming of the stream.
298 This can only be done once.
299* `pause()` - No more data for a while, please. This also prevents `end`
300 from being emitted for empty streams until the stream is resumed.
301* `resume()` - Resume the stream. If there's data in the buffer, it is all
302 discarded. Any buffered events are immediately emitted.
303* `pipe(dest)` - Send all output to the stream provided. There is no way
304 to unpipe. When data is emitted, it is immediately written to any and
305 all pipe destinations.
306* `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
307 events are given special treatment, however. (See below under "events".)
308* `promise()` - Returns a Promise that resolves when the stream emits
309 `end`, or rejects if the stream emits `error`.
310* `collect()` - Return a Promise that resolves on `end` with an array
311 containing each chunk of data that was emitted, or rejects if the stream
312 emits `error`. Note that this consumes the stream data.
313* `concat()` - Same as `collect()`, but concatenates the data into a single
314 Buffer object. Will reject the returned promise if the stream is in
315 objectMode, or if it goes into objectMode by the end of the data.
316* `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not
317 provided, then consume all of it. If `n` bytes are not available, then
318 it returns null. **Note** consuming streams in this way is less
319 efficient, and can lead to unnecessary Buffer copying.
320* `destroy([er])` - Destroy the stream. If an error is provided, then an
321 `'error'` event is emitted. If the stream has a `close()` method, and
322 has not emitted a `'close'` event yet, then `stream.close()` will be
323 called. Any Promises returned by `.promise()`, `.collect()` or
324 `.concat()` will be rejected. After being destroyed, writing to the
325 stream will emit an error. No more data will be emitted if the stream is
326 destroyed, even if it was previously buffered.
327
328### Properties
329
330* `bufferLength` Read-only. Total number of bytes buffered, or in the case
331 of objectMode, the total number of objects.
332* `encoding` The encoding that has been set. (Setting this is equivalent
333 to calling `setEncoding(enc)` and has the same prohibition against
334 setting multiple times.)
335* `flowing` Read-only. Boolean indicating whether a chunk written to the
336 stream will be immediately emitted.
337* `emittedEnd` Read-only. Boolean indicating whether the end-ish events
338 (ie, `end`, `prefinish`, `finish`) have been emitted. Note that
339 listening on any end-ish event will immediateyl re-emit it if it has
340 already been emitted.
341* `writable` Whether the stream is writable. Default `true`. Set to
342 `false` when `end()`
343* `readable` Whether the stream is readable. Default `true`.
344* `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written
345 to the stream that have not yet been emitted. (It's probably a bad idea
346 to mess with this.)
347* `pipes` A [yallist](http://npm.im/yallist) linked list of streams that
348 this stream is piping into. (It's probably a bad idea to mess with
349 this.)
350* `destroyed` A getter that indicates whether the stream was destroyed.
351* `paused` True if the stream has been explicitly paused, otherwise false.
352* `objectMode` Indicates whether the stream is in `objectMode`. Once set
353 to `true`, it cannot be set to `false`.
354
355### Events
356
357* `data` Emitted when there's data to read. Argument is the data to read.
358 This is never emitted while not flowing. If a listener is attached, that
359 will resume the stream.
360* `end` Emitted when there's no more data to read. This will be emitted
361 immediately for empty streams when `end()` is called. If a listener is
362 attached, and `end` was already emitted, then it will be emitted again.
363 All listeners are removed when `end` is emitted.
364* `prefinish` An end-ish event that follows the same logic as `end` and is
365 emitted in the same conditions where `end` is emitted. Emitted after
366 `'end'`.
367* `finish` An end-ish event that follows the same logic as `end` and is
368 emitted in the same conditions where `end` is emitted. Emitted after
369 `'prefinish'`.
370* `close` An indication that an underlying resource has been released.
371 Minipass does not emit this event, but will defer it until after `end`
372 has been emitted, since it throws off some stream libraries otherwise.
373* `drain` Emitted when the internal buffer empties, and it is again
374 suitable to `write()` into the stream.
375* `readable` Emitted when data is buffered and ready to be read by a
376 consumer.
377* `resume` Emitted when stream changes state from buffering to flowing
378 mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event
379 listener is added.)
380
381### Static Methods
382
383* `Minipass.isStream(stream)` Returns `true` if the argument is a stream,
384 and false otherwise. To be considered a stream, the object must be
385 either an instance of Minipass, or an EventEmitter that has either a
386 `pipe()` method, or both `write()` and `end()` methods. (Pretty much any
387 stream in node-land will return `true` for this.)
388
389## EXAMPLES
390
391Here are some examples of things you can do with Minipass streams.
392
393### simple "are you done yet" promise
394
395```js
396mp.promise().then(() => {
397 // stream is finished
398}, er => {
399 // stream emitted an error
400})
401```
402
403### collecting
404
405```js
406mp.collect().then(all => {
407 // all is an array of all the data emitted
408 // encoding is supported in this case, so
409 // so the result will be a collection of strings if
410 // an encoding is specified, or buffers/objects if not.
411 //
412 // In an async function, you may do
413 // const data = await stream.collect()
414})
415```
416
417### collecting into a single blob
418
419This is a bit slower because it concatenates the data into one chunk for
420you, but if you're going to do it yourself anyway, it's convenient this
421way:
422
423```js
424mp.concat().then(onebigchunk => {
425 // onebigchunk is a string if the stream
426 // had an encoding set, or a buffer otherwise.
427})
428```
429
430### iteration
431
432You can iterate over streams synchronously or asynchronously in platforms
433that support it.
434
435Synchronous iteration will end when the currently available data is
436consumed, even if the `end` event has not been reached. In string and
437buffer mode, the data is concatenated, so unless multiple writes are
438occurring in the same tick as the `read()`, sync iteration loops will
439generally only have a single iteration.
440
441To consume chunks in this way exactly as they have been written, with no
442flattening, create the stream with the `{ objectMode: true }` option.
443
444```js
445const mp = new Minipass({ objectMode: true })
446mp.write('a')
447mp.write('b')
448for (let letter of mp) {
449 console.log(letter) // a, b
450}
451mp.write('c')
452mp.write('d')
453for (let letter of mp) {
454 console.log(letter) // c, d
455}
456mp.write('e')
457mp.end()
458for (let letter of mp) {
459 console.log(letter) // e
460}
461for (let letter of mp) {
462 console.log(letter) // nothing
463}
464```
465
466Asynchronous iteration will continue until the end event is reached,
467consuming all of the data.
468
469```js
470const mp = new Minipass({ encoding: 'utf8' })
471
472// some source of some data
473let i = 5
474const inter = setInterval(() => {
475 if (i --> 0)
476 mp.write(Buffer.from('foo\n', 'utf8'))
477 else {
478 mp.end()
479 clearInterval(inter)
480 }
481}, 100)
482
483// consume the data with asynchronous iteration
484async function consume () {
485 for await (let chunk of mp) {
486 console.log(chunk)
487 }
488 return 'ok'
489}
490
491consume().then(res => console.log(res))
492// logs `foo\n` 5 times, and then `ok`
493```
494
495### subclass that `console.log()`s everything written into it
496
497```js
498class Logger extends Minipass {
499 write (chunk, encoding, callback) {
500 console.log('WRITE', chunk, encoding)
501 return super.write(chunk, encoding, callback)
502 }
503 end (chunk, encoding, callback) {
504 console.log('END', chunk, encoding)
505 return super.end(chunk, encoding, callback)
506 }
507}
508
509someSource.pipe(new Logger()).pipe(someDest)
510```
511
512### same thing, but using an inline anonymous class
513
514```js
515// js classes are fun
516someSource
517 .pipe(new (class extends Minipass {
518 emit (ev, ...data) {
519 // let's also log events, because debugging some weird thing
520 console.log('EMIT', ev)
521 return super.emit(ev, ...data)
522 }
523 write (chunk, encoding, callback) {
524 console.log('WRITE', chunk, encoding)
525 return super.write(chunk, encoding, callback)
526 }
527 end (chunk, encoding, callback) {
528 console.log('END', chunk, encoding)
529 return super.end(chunk, encoding, callback)
530 }
531 }))
532 .pipe(someDest)
533```
534
535### subclass that defers 'end' for some reason
536
537```js
538class SlowEnd extends Minipass {
539 emit (ev, ...args) {
540 if (ev === 'end') {
541 console.log('going to end, hold on a sec')
542 setTimeout(() => {
543 console.log('ok, ready to end now')
544 super.emit('end', ...args)
545 }, 100)
546 } else {
547 return super.emit(ev, ...args)
548 }
549 }
550}
551```
552
553### transform that creates newline-delimited JSON
554
555```js
556class NDJSONEncode extends Minipass {
557 write (obj, cb) {
558 try {
559 // JSON.stringify can throw, emit an error on that
560 return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
561 } catch (er) {
562 this.emit('error', er)
563 }
564 }
565 end (obj, cb) {
566 if (typeof obj === 'function') {
567 cb = obj
568 obj = undefined
569 }
570 if (obj !== undefined) {
571 this.write(obj)
572 }
573 return super.end(cb)
574 }
575}
576```
577
578### transform that parses newline-delimited JSON
579
580```js
581class NDJSONDecode extends Minipass {
582 constructor (options) {
583 // always be in object mode, as far as Minipass is concerned
584 super({ objectMode: true })
585 this._jsonBuffer = ''
586 }
587 write (chunk, encoding, cb) {
588 if (typeof chunk === 'string' &&
589 typeof encoding === 'string' &&
590 encoding !== 'utf8') {
591 chunk = Buffer.from(chunk, encoding).toString()
592 } else if (Buffer.isBuffer(chunk))
593 chunk = chunk.toString()
594 }
595 if (typeof encoding === 'function') {
596 cb = encoding
597 }
598 const jsonData = (this._jsonBuffer + chunk).split('\n')
599 this._jsonBuffer = jsonData.pop()
600 for (let i = 0; i < jsonData.length; i++) {
601 let parsed
602 try {
603 super.write(parsed)
604 } catch (er) {
605 this.emit('error', er)
606 continue
607 }
608 }
609 if (cb)
610 cb()
611 }
612}
613```