UNPKG

806 BPlain TextView Raw
1import { Transform } from 'stream'
2import { TransformOptions, TransformTyped } from '../stream.model'
3
4export interface TransformBufferOptions extends TransformOptions {
5 batchSize: number
6}
7
8/**
9 * Similar to RxJS bufferCount()
10 *
11 * @default batchSize is 10
12 */
13export function transformBuffer<IN = Record<string, any>>(
14 opt: TransformBufferOptions,
15): TransformTyped<IN, IN[]> {
16 const { batchSize } = opt
17
18 let buf: IN[] = []
19
20 return new Transform({
21 objectMode: true,
22 ...opt,
23 transform(chunk, _encoding, cb) {
24 buf.push(chunk)
25
26 if (buf.length >= batchSize) {
27 cb(null, buf)
28 buf = []
29 } else {
30 cb()
31 }
32 },
33 final(this: Transform, cb) {
34 if (buf.length) {
35 this.push(buf)
36 buf = []
37 }
38
39 cb()
40 },
41 })
42}