1 | import { Transform } from 'stream'
|
2 | import { AbortableAsyncMapper, SKIP } from '@naturalcycles/js-lib'
|
3 | import { ReadableTyped } from '../stream.model'
|
4 |
|
5 | export function readableMap<IN, OUT>(
|
6 | readable: ReadableTyped<IN>,
|
7 | mapper: AbortableAsyncMapper<IN, OUT>,
|
8 | ): ReadableTyped<OUT> {
|
9 | let i = -1
|
10 |
|
11 |
|
12 | return readable.pipe(
|
13 | new Transform({
|
14 | objectMode: true,
|
15 | async transform(chunk, _enc, cb) {
|
16 | try {
|
17 | const r = await mapper(chunk, ++i)
|
18 | if (r === SKIP) {
|
19 | cb()
|
20 | } else {
|
21 |
|
22 | cb(null, r)
|
23 | }
|
24 | } catch (err) {
|
25 | console.error(err)
|
26 | cb(err as Error)
|
27 | }
|
28 | },
|
29 | }),
|
30 | )
|
31 | }
|