UNPKG

813 BPlain TextView Raw
1import { Transform } from 'stream'
2import { AbortableAsyncMapper, SKIP } from '@naturalcycles/js-lib'
3import { ReadableTyped } from '../stream.model'
4
5export function readableMap<IN, OUT>(
6 readable: ReadableTyped<IN>,
7 mapper: AbortableAsyncMapper<IN, OUT>,
8): ReadableTyped<OUT> {
9 let i = -1
10
11 // todo: check if we need to handle errors somehow specifically
12 return readable.pipe(
13 new Transform({
14 objectMode: true,
15 async transform(chunk, _enc, cb) {
16 try {
17 const r = await mapper(chunk, ++i)
18 if (r === SKIP) {
19 cb()
20 } else {
21 // _assert(r !== END, `readableMap END not supported`)
22 cb(null, r)
23 }
24 } catch (err) {
25 console.error(err)
26 cb(err as Error)
27 }
28 },
29 }),
30 )
31}