1 | import { pipeline, Readable, Transform, Writable } from 'stream'
|
2 | import { _last, AnyFunction, DeferredPromise, pDefer } from '@naturalcycles/js-lib'
|
3 | import { writablePushToArray } from '../../index'
|
4 |
|
5 | type AnyStream = NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream
|
6 |
|
7 | // /**
|
8 | // * Promisified stream.pipeline()
|
9 | // */
|
10 | // export let _pipeline = promisify(pipeline)
|
11 | //
|
12 | // // Workaround https://github.com/nodejs/node/issues/40191
|
13 | // // todo: remove it when fix is released in 16.x and in AppEngine 16.x
|
14 | // if (process.version >= 'v16.10') {
|
15 | // const { pipeline } = require('stream/promises')
|
16 | // _pipeline = ((streams: AnyStream[]) => pipeline(...streams)) as any
|
17 | // }
|
18 |
|
19 | export interface PipelineOptions {
|
20 | /**
|
21 | * Set to true to allow ERR_STREAM_PREMATURE_CLOSE.
|
22 | * Required to support graceful close when using transformLimit
|
23 | */
|
24 | allowClose?: boolean
|
25 | }
|
26 |
|
27 | /**
|
28 | * Promisified `stream.pipeline`.
|
29 | *
|
30 | * Supports opt.allowClose, which allows transformLimit to work (to actually stop source Readable)
|
31 | * without throwing an error (ERR_STREAM_PREMATURE_CLOSE).
|
32 | */
|
33 | export async function _pipeline(streams: AnyStream[], opt: PipelineOptions = {}): Promise<void> {
|
34 | const first = streams[0] as any
|
35 | const rest = streams.slice(1)
|
36 |
|
37 | if (opt.allowClose) {
|
38 | // Do the magic of making the pipeline "abortable"
|
39 | //
|
40 | // How does it work:
|
41 | // It finds `sourceReadable` (basically, it's just first item in the passed array of streams)
|
42 | // Finds last "writable" (last item), patches the `_final` method of it to detect when the whole pipeline is "done",
|
43 | // sets the `streamDone` DeferredPromise that resolves when the pipeline is done.
|
44 | // Scans through all passed items, finds those that are capable of "closing" the stream
|
45 | // (currently its `transformLimit` or `transformMap`)
|
46 | // Patches them by attaching `sourceReadable` and `streamDone`.
|
47 | // These items (transformLimit and transformMap), when they need to "close the stream" - call `pipelineClose`.
|
48 | // `pipelineClose` is the result of 2 sleepless nights of googling and experimentation:)
|
49 | // It does:
|
50 | // 1. Stops the "downstream" by doing `this.push(null)`.
|
51 | // 2. Pauses the `sourceReadable` by calling sourceReadable.unpipe()
|
52 | // 3. Waits for `streamDone` to ensure that downstream chunks are fully processed (e.g written to disk).
|
53 | // 4. Calls `sourceReadable.destroy()`, which emits ERR_STREAM_PREMATURE_CLOSE
|
54 | // 5. _pipeline (this function) catches that specific error and suppresses it (because it's expected and
|
55 | // inevitable in this flow). Know a better way to close the stream? Tell me!
|
56 | const streamDone = pDefer()
|
57 | const sourceReadable = first as Readable
|
58 | const last = _last(streams) as Writable
|
59 | const lastFinal = last._final?.bind(last) || ((cb: AnyFunction) => cb())
|
60 | last._final = cb => {
|
61 | lastFinal(() => {
|
62 | cb()
|
63 | streamDone.resolve()
|
64 | })
|
65 | }
|
66 |
|
67 | rest.forEach(s => {
|
68 | // console.log(s)
|
69 | if (s instanceof AbortableTransform || s.constructor.name === 'DestroyableTransform') {
|
70 | // console.log(`found ${s.constructor.name}, setting props`)
|
71 | ;(s as AbortableTransform).sourceReadable = sourceReadable
|
72 | ;(s as AbortableTransform).streamDone = streamDone
|
73 | }
|
74 | })
|
75 | }
|
76 |
|
77 | return new Promise<void>((resolve, reject) => {
|
78 | pipeline(first, ...(rest as any[]), (err: Error) => {
|
79 | if (err) {
|
80 | if (opt.allowClose && (err as any)?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
|
81 | console.log('_pipeline closed (as expected)')
|
82 | return resolve()
|
83 | }
|
84 | // console.log(`_pipeline error`, err)
|
85 | return reject(err)
|
86 | }
|
87 |
|
88 | resolve()
|
89 | })
|
90 | })
|
91 | }
|
92 |
|
93 | /**
|
94 | * Convenience function to make _pipeline collect all items at the end of the stream (should be Transform, not Writeable!)
|
95 | * and return.
|
96 | */
|
97 | export async function _pipelineToArray<T>(
|
98 | streams: AnyStream[],
|
99 | opt: PipelineOptions = {},
|
100 | ): Promise<T[]> {
|
101 | const a: T[] = []
|
102 | await _pipeline([...streams, writablePushToArray(a)], opt)
|
103 | return a
|
104 | }
|
105 |
|
106 | export class AbortableTransform extends Transform {
|
107 | sourceReadable?: Readable
|
108 | streamDone?: DeferredPromise
|
109 | }
|
110 |
|
\ | No newline at end of file |