UNPKG

845 BPlain TextView Raw
1import { Transform } from 'stream'
2import { AsyncMapper } from '@naturalcycles/js-lib'
3import { TransformOptions, TransformTyped } from '../stream.model'
4
5/**
6 * Similar to RxJS `tap` - allows to run a function for each stream item, without affecting the result.
7 * Item is passed through to the output.
8 *
9 * Can also act as a counter, since `index` is passed to `fn`
10 */
11export function transformTap<IN>(
12 fn: AsyncMapper<IN, any>,
13 opt: TransformOptions = {},
14): TransformTyped<IN, IN> {
15 let index = 0
16
17 return new Transform({
18 objectMode: true,
19 ...opt,
20 async transform(chunk: IN, _encoding, cb) {
21 // console.log('tap', chunk)
22
23 try {
24 await fn(chunk, index++)
25 } catch (err) {
26 console.error(err)
27 // suppressed error
28 }
29
30 cb(null, chunk) // pass through the item
31 },
32 })
33}