UNPKG

1.74 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.writableFork = void 0;
4const stream_1 = require("stream");
5const __1 = require("../..");
6/**
7 * Allows "forking" a stream inside pipeline into a number of pipeline chains (2 or more).
8 * Currently does NOT (!) maintain backpressure.
9 * Error in the forked pipeline will propagate up to the main pipeline (and log error, to be sure).
10 * Will wait until all forked pipelines are completed before completing the stream.
11 */
12function writableFork(chains, opt) {
13 const readables = [];
14 const allChainsDone = Promise.all(chains.map(async (chain) => {
15 const readable = __1.readableCreate();
16 readables.push(readable);
17 return await __1._pipeline([readable, ...chain]);
18 })).catch(err => {
19 console.error(err); // ensure the error is logged
20 throw err;
21 });
22 return new stream_1.Writable({
23 objectMode: true,
24 ...opt,
25 write(chunk, _encoding, cb) {
26 // Push/fork to all sub-streams
27 // No backpressure is ensured here, it'll push regardless of the
28 readables.forEach(readable => readable.push(chunk));
29 cb();
30 },
31 async final(cb) {
32 try {
33 // Push null (complete) to all sub-streams
34 readables.forEach(readable => readable.push(null));
35 console.log(`writableFork.final is waiting for all chains to be done`);
36 await allChainsDone;
37 console.log(`writableFork.final all chains done`);
38 cb();
39 }
40 catch (err) {
41 cb(err);
42 }
43 },
44 });
45}
46exports.writableFork = writableFork;