1 | import { Readable, ReadableOptions } from 'stream'
|
2 | import { AsyncMapper, pMap, _passthroughMapper } from '@naturalcycles/js-lib'
|
3 | import { ReadableTyped } from '../stream.model'
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | export function readableFromArray<IN, OUT>(
|
12 | items: IN[],
|
13 | mapper: AsyncMapper<IN, OUT> = _passthroughMapper,
|
14 | opt?: ReadableOptions,
|
15 | ): ReadableTyped<OUT> {
|
16 | const readable = new Readable({
|
17 | objectMode: true,
|
18 | ...opt,
|
19 | read() {},
|
20 | })
|
21 |
|
22 | void pMap(
|
23 | items,
|
24 | async (item, index) => {
|
25 | readable.push(await mapper(item, index))
|
26 | },
|
27 | { concurrency: 1 },
|
28 | )
|
29 | .then(() => {
|
30 | readable.push(null)
|
31 | })
|
32 | .catch(err => {
|
33 | console.error(err)
|
34 | readable.push(err)
|
35 | })
|
36 |
|
37 | return readable
|
38 | }
|