UNPKG

3.76 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.AbortableTransform = exports._pipelineToArray = exports._pipeline = void 0;
4const stream_1 = require("stream");
5const js_lib_1 = require("@naturalcycles/js-lib");
6const index_1 = require("../../index");
7/**
8 * Promisified `stream.pipeline`.
9 *
10 * Supports opt.allowClose, which allows transformLimit to work (to actually stop source Readable)
11 * without throwing an error (ERR_STREAM_PREMATURE_CLOSE).
12 */
13async function _pipeline(streams, opt = {}) {
14 const first = streams[0];
15 const rest = streams.slice(1);
16 if (opt.allowClose) {
17 // Do the magic of making the pipeline "abortable"
18 //
19 // How does it work:
20 // It finds `sourceReadable` (basically, it's just first item in the passed array of streams)
21 // Finds last "writable" (last item), patches the `_final` method of it to detect when the whole pipeline is "done",
22 // sets the `streamDone` DeferredPromise that resolves when the pipeline is done.
23 // Scans through all passed items, finds those that are capable of "closing" the stream
24 // (currently its `transformLimit` or `transformMap`)
25 // Patches them by attaching `sourceReadable` and `streamDone`.
26 // These items (transformLimit and transformMap), when they need to "close the stream" - call `pipelineClose`.
27 // `pipelineClose` is the result of 2 sleepless nights of googling and experimentation:)
28 // It does:
29 // 1. Stops the "downstream" by doing `this.push(null)`.
30 // 2. Pauses the `sourceReadable` by calling sourceReadable.unpipe()
31 // 3. Waits for `streamDone` to ensure that downstream chunks are fully processed (e.g written to disk).
32 // 4. Calls `sourceReadable.destroy()`, which emits ERR_STREAM_PREMATURE_CLOSE
33 // 5. _pipeline (this function) catches that specific error and suppresses it (because it's expected and
34 // inevitable in this flow). Know a better way to close the stream? Tell me!
35 const streamDone = (0, js_lib_1.pDefer)();
36 const sourceReadable = first;
37 const last = (0, js_lib_1._last)(streams);
38 const lastFinal = last._final?.bind(last) || ((cb) => cb());
39 last._final = cb => {
40 lastFinal(() => {
41 cb();
42 streamDone.resolve();
43 });
44 };
45 rest.forEach(s => {
46 // console.log(s)
47 if (s instanceof AbortableTransform || s.constructor.name === 'DestroyableTransform') {
48 // console.log(`found ${s.constructor.name}, setting props`)
49 ;
50 s.sourceReadable = sourceReadable;
51 s.streamDone = streamDone;
52 }
53 });
54 }
55 return new Promise((resolve, reject) => {
56 (0, stream_1.pipeline)(first, ...rest, (err) => {
57 if (err) {
58 if (opt.allowClose && err?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
59 console.log('_pipeline closed (as expected)');
60 return resolve();
61 }
62 // console.log(`_pipeline error`, err)
63 return reject(err);
64 }
65 resolve();
66 });
67 });
68}
69exports._pipeline = _pipeline;
70/**
71 * Convenience function to make _pipeline collect all items at the end of the stream (should be Transform, not Writeable!)
72 * and return.
73 */
74async function _pipelineToArray(streams, opt = {}) {
75 const a = [];
76 await _pipeline([...streams, (0, index_1.writablePushToArray)(a)], opt);
77 return a;
78}
79exports._pipelineToArray = _pipelineToArray;
80class AbortableTransform extends stream_1.Transform {
81}
82exports.AbortableTransform = AbortableTransform;