UNPKG

813 BPlain TextView Raw
1import { AsyncMapper, _passthroughPredicate } from '@naturalcycles/js-lib'
2import { ReadableTyped, transformLogProgress, transformMap, writableVoid } from '../index'
3import { _pipeline } from './pipeline/pipeline'
4import { StreamForEachOptions } from './stream.model'
5
6/**
7 * Wrapper around stream.pipeline() that will run Mapper for each of the items, respecting backpressure.
8 */
9export async function streamForEach<IN>(
10 streams: ReadableTyped<IN> | (NodeJS.ReadableStream | NodeJS.WritableStream)[],
11 mapper: AsyncMapper<IN, void>,
12 opt?: StreamForEachOptions<IN>,
13): Promise<void> {
14 await _pipeline([
15 ...(Array.isArray(streams) ? streams : [streams]),
16 transformMap(mapper, {
17 ...opt,
18 predicate: _passthroughPredicate,
19 }),
20 transformLogProgress(opt),
21 writableVoid(),
22 ])
23}