1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.AbortableTransform = exports._pipelineToArray = exports._pipeline = void 0;
|
4 | const node_stream_1 = require("node:stream");
|
5 | const js_lib_1 = require("@naturalcycles/js-lib");
|
6 | const index_1 = require("../../index");
|
7 | /**
|
8 | * Promisified `stream.pipeline`.
|
9 | *
|
10 | * Supports opt.allowClose, which allows transformLimit to work (to actually stop source Readable)
|
11 | * without throwing an error (ERR_STREAM_PREMATURE_CLOSE).
|
12 | */
|
13 | async function _pipeline(streams, opt = {}) {
|
14 | const first = streams[0];
|
15 | const rest = streams.slice(1);
|
16 | if (opt.allowClose) {
|
17 | // Do the magic of making the pipeline "abortable"
|
18 | //
|
19 | // How does it work:
|
20 | // It finds `sourceReadable` (basically, it's just first item in the passed array of streams)
|
21 | // Finds last "writable" (last item), patches the `_final` method of it to detect when the whole pipeline is "done",
|
22 | // sets the `streamDone` DeferredPromise that resolves when the pipeline is done.
|
23 | // Scans through all passed items, finds those that are capable of "closing" the stream
|
24 | // (currently its `transformLimit` or `transformMap`)
|
25 | // Patches them by attaching `sourceReadable` and `streamDone`.
|
26 | // These items (transformLimit and transformMap), when they need to "close the stream" - call `pipelineClose`.
|
27 | // `pipelineClose` is the result of 2 sleepless nights of googling and experimentation:)
|
28 | // It does:
|
29 | // 1. Stops the "downstream" by doing `this.push(null)`.
|
30 | // 2. Pauses the `sourceReadable` by calling sourceReadable.unpipe()
|
31 | // 3. Waits for `streamDone` to ensure that downstream chunks are fully processed (e.g written to disk).
|
32 | // 4. Calls `sourceReadable.destroy()`, which emits ERR_STREAM_PREMATURE_CLOSE
|
33 | // 5. _pipeline (this function) catches that specific error and suppresses it (because it's expected and
|
34 | // inevitable in this flow). Know a better way to close the stream? Tell me!
|
35 | const streamDone = (0, js_lib_1.pDefer)();
|
36 | const sourceReadable = first;
|
37 | const last = (0, js_lib_1._last)(streams);
|
38 | const lastFinal = last._final?.bind(last) || ((cb) => cb());
|
39 | last._final = cb => {
|
40 | lastFinal(() => {
|
41 | cb();
|
42 | streamDone.resolve();
|
43 | });
|
44 | };
|
45 | rest.forEach(s => {
|
46 | // console.log(s)
|
47 | if (s instanceof AbortableTransform || s.constructor.name === 'DestroyableTransform') {
|
48 | // console.log(`found ${s.constructor.name}, setting props`)
|
49 | ;
|
50 | s.sourceReadable = sourceReadable;
|
51 | s.streamDone = streamDone;
|
52 | }
|
53 | });
|
54 | }
|
55 | return new Promise((resolve, reject) => {
|
56 | (0, node_stream_1.pipeline)(first, ...rest, (err) => {
|
57 | if (err) {
|
58 | if (opt.allowClose && err?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
|
59 | console.log('_pipeline closed (as expected)');
|
60 | return resolve();
|
61 | }
|
62 | // console.log(`_pipeline error`, err)
|
63 | return reject(err);
|
64 | }
|
65 | resolve();
|
66 | });
|
67 | });
|
68 | }
|
69 | exports._pipeline = _pipeline;
|
70 | /**
|
71 | * Convenience function to make _pipeline collect all items at the end of the stream (should be Transform, not Writeable!)
|
72 | * and return.
|
73 | */
|
74 | async function _pipelineToArray(streams, opt = {}) {
|
75 | const a = [];
|
76 | await _pipeline([...streams, (0, index_1.writablePushToArray)(a)], opt);
|
77 | return a;
|
78 | }
|
79 | exports._pipelineToArray = _pipelineToArray;
|
80 | class AbortableTransform extends node_stream_1.Transform {
|
81 | }
|
82 | exports.AbortableTransform = AbortableTransform;
|