UNPKG

1.64 kBPlain TextView Raw
1import { Writable } from 'stream'
2import { readableCreate, ReadableTyped, _pipeline } from '../..'
3import { TransformOptions, WritableTyped } from '../stream.model'
4
5/**
6 * Allows "forking" a stream inside pipeline into a number of pipeline chains (2 or more).
7 * Currently does NOT (!) maintain backpressure.
8 * Error in the forked pipeline will propagate up to the main pipeline (and log error, to be sure).
9 * Will wait until all forked pipelines are completed before completing the stream.
10 *
11 * @experimental
12 */
13export function writableFork<T>(
14 chains: NodeJS.WritableStream[][],
15 opt?: TransformOptions,
16): WritableTyped<T> {
17 const readables: ReadableTyped<T>[] = []
18
19 const allChainsDone = Promise.all(
20 chains.map(async chain => {
21 const readable = readableCreate<T>()
22 readables.push(readable)
23
24 return await _pipeline([readable, ...chain])
25 }),
26 ).catch(err => {
27 console.error(err) // ensure the error is logged
28 throw err
29 })
30
31 return new Writable({
32 objectMode: true,
33 ...opt,
34 write(chunk: T, _, cb) {
35 // Push/fork to all sub-streams
36 // No backpressure is ensured here, it'll push regardless of the
37 readables.forEach(readable => readable.push(chunk))
38
39 cb()
40 },
41 async final(cb) {
42 try {
43 // Push null (complete) to all sub-streams
44 readables.forEach(readable => readable.push(null))
45
46 console.log(`writableFork.final is waiting for all chains to be done`)
47 await allChainsDone
48 console.log(`writableFork.final all chains done`)
49 cb()
50 } catch (err) {
51 cb(err as Error)
52 }
53 },
54 })
55}