UNPKG

980 BPlain TextView Raw
1import { Transform } from 'stream'
2import { AsyncMapper, CommonLogger } from '@naturalcycles/js-lib'
3import { TransformOptions, TransformTyped } from '../stream.model'
4
5export interface TransformTapOptions extends TransformOptions {
6 logger?: CommonLogger
7}
8
9/**
10 * Similar to RxJS `tap` - allows to run a function for each stream item, without affecting the result.
11 * Item is passed through to the output.
12 *
13 * Can also act as a counter, since `index` is passed to `fn`
14 */
15export function transformTap<IN>(
16 fn: AsyncMapper<IN, any>,
17 opt: TransformTapOptions = {},
18): TransformTyped<IN, IN> {
19 const { logger = console } = opt
20 let index = -1
21
22 return new Transform({
23 objectMode: true,
24 ...opt,
25 async transform(chunk: IN, _, cb) {
26 // console.log('tap', chunk)
27
28 try {
29 await fn(chunk, ++index)
30 } catch (err) {
31 logger.error(err)
32 // suppressed error
33 }
34
35 cb(null, chunk) // pass through the item
36 },
37 })
38}