1 | import { Transform } from 'stream'
|
2 | import { AsyncMapper, CommonLogger } from '@naturalcycles/js-lib'
|
3 | import { TransformOptions, TransformTyped } from '../stream.model'
|
4 |
|
5 | export interface TransformTapOptions extends TransformOptions {
|
6 | logger?: CommonLogger
|
7 | }
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 | export function transformTap<IN>(
|
16 | fn: AsyncMapper<IN, any>,
|
17 | opt: TransformTapOptions = {},
|
18 | ): TransformTyped<IN, IN> {
|
19 | const { logger = console } = opt
|
20 | let index = -1
|
21 |
|
22 | return new Transform({
|
23 | objectMode: true,
|
24 | ...opt,
|
25 | async transform(chunk: IN, _, cb) {
|
26 |
|
27 |
|
28 | try {
|
29 | await fn(chunk, ++index)
|
30 | } catch (err) {
|
31 | logger.error(err)
|
32 |
|
33 | }
|
34 |
|
35 | cb(null, chunk)
|
36 | },
|
37 | })
|
38 | }
|