1 | import { Transform } from 'stream'
|
2 | import { TransformOptions, TransformTyped } from '../stream.model'
|
3 |
|
4 | export interface TransformBufferOptions extends TransformOptions {
|
5 | batchSize: number
|
6 | }
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | export function transformBuffer<IN = Record<string, any>>(
|
14 | opt: TransformBufferOptions,
|
15 | ): TransformTyped<IN, IN[]> {
|
16 | const { batchSize } = opt
|
17 |
|
18 | let buf: IN[] = []
|
19 |
|
20 | return new Transform({
|
21 | objectMode: true,
|
22 | ...opt,
|
23 | transform(chunk, _encoding, cb) {
|
24 | buf.push(chunk)
|
25 |
|
26 | if (buf.length >= batchSize) {
|
27 | cb(null, buf)
|
28 | buf = []
|
29 | } else {
|
30 | cb()
|
31 | }
|
32 | },
|
33 | final(this: Transform, cb) {
|
34 | if (buf.length) {
|
35 | this.push(buf)
|
36 | buf = []
|
37 | }
|
38 |
|
39 | cb()
|
40 | },
|
41 | })
|
42 | }
|