UNPKG

1.76 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.writableFork = void 0;
4const stream_1 = require("stream");
5const __1 = require("../..");
6/**
7 * Allows "forking" a stream inside pipeline into a number of pipeline chains (2 or more).
8 * Currently does NOT (!) maintain backpressure.
9 * Error in the forked pipeline will propagate up to the main pipeline (and log error, to be sure).
10 * Will wait until all forked pipelines are completed before completing the stream.
11 *
12 * @experimental
13 */
14function writableFork(chains, opt) {
15 const readables = [];
16 const allChainsDone = Promise.all(chains.map(async (chain) => {
17 const readable = (0, __1.readableCreate)();
18 readables.push(readable);
19 return await (0, __1._pipeline)([readable, ...chain]);
20 })).catch(err => {
21 console.error(err); // ensure the error is logged
22 throw err;
23 });
24 return new stream_1.Writable({
25 objectMode: true,
26 ...opt,
27 write(chunk, _, cb) {
28 // Push/fork to all sub-streams
29 // No backpressure is ensured here, it'll push regardless of the
30 readables.forEach(readable => readable.push(chunk));
31 cb();
32 },
33 async final(cb) {
34 try {
35 // Push null (complete) to all sub-streams
36 readables.forEach(readable => readable.push(null));
37 console.log(`writableFork.final is waiting for all chains to be done`);
38 await allChainsDone;
39 console.log(`writableFork.final all chains done`);
40 cb();
41 }
42 catch (err) {
43 cb(err);
44 }
45 },
46 });
47}
48exports.writableFork = writableFork;