UNPKG

831 BPlain TextView Raw
1import { Readable, Writable } from 'stream'
2import { WritableTyped } from '../stream.model'
3
4/**
5 * Allows to stop the Readable stream after the pipeline has processed X number of rows.
6 * It counts OUTPUT rows (not input), because this Writable is always at the end of the Pipeline.
7 * It ensures that everything has been processed before issuing a STOP on the readable.
8 */
9export function writableLimit<T>(readable: Readable, limit: number): WritableTyped<T> {
10 let i = 0
11
12 return new Writable({
13 objectMode: true,
14 write(chunk, _, cb) {
15 if (limit === 0) return cb() // no limit, just passthrough
16
17 i++
18
19 if (i === limit) {
20 console.log(`writableLimit of ${limit} reached`)
21 readable.destroy()
22 cb() // do we need it?
23 } else {
24 cb() // passthrough
25 }
26 },
27 })
28}