UNPKG

4.17 kBPlain TextView Raw
1import { pipeline, Readable, Transform, Writable } from 'stream'
2import { _last, AnyFunction, DeferredPromise, pDefer } from '@naturalcycles/js-lib'
3import { writablePushToArray } from '../../index'
4
5type AnyStream = NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream
6
7// /**
8// * Promisified stream.pipeline()
9// */
10// export let _pipeline = promisify(pipeline)
11//
12// // Workaround https://github.com/nodejs/node/issues/40191
13// // todo: remove it when fix is released in 16.x and in AppEngine 16.x
14// if (process.version >= 'v16.10') {
15// const { pipeline } = require('stream/promises')
16// _pipeline = ((streams: AnyStream[]) => pipeline(...streams)) as any
17// }
18
19export interface PipelineOptions {
20 /**
21 * Set to true to allow ERR_STREAM_PREMATURE_CLOSE.
22 * Required to support graceful close when using transformLimit
23 */
24 allowClose?: boolean
25}
26
27/**
28 * Promisified `stream.pipeline`.
29 *
30 * Supports opt.allowClose, which allows transformLimit to work (to actually stop source Readable)
31 * without throwing an error (ERR_STREAM_PREMATURE_CLOSE).
32 */
33export async function _pipeline(streams: AnyStream[], opt: PipelineOptions = {}): Promise<void> {
34 const first = streams[0] as any
35 const rest = streams.slice(1)
36
37 if (opt.allowClose) {
38 // Do the magic of making the pipeline "abortable"
39 //
40 // How does it work:
41 // It finds `sourceReadable` (basically, it's just first item in the passed array of streams)
42 // Finds last "writable" (last item), patches the `_final` method of it to detect when the whole pipeline is "done",
43 // sets the `streamDone` DeferredPromise that resolves when the pipeline is done.
44 // Scans through all passed items, finds those that are capable of "closing" the stream
45 // (currently its `transformLimit` or `transformMap`)
46 // Patches them by attaching `sourceReadable` and `streamDone`.
47 // These items (transformLimit and transformMap), when they need to "close the stream" - call `pipelineClose`.
48 // `pipelineClose` is the result of 2 sleepless nights of googling and experimentation:)
49 // It does:
50 // 1. Stops the "downstream" by doing `this.push(null)`.
51 // 2. Pauses the `sourceReadable` by calling sourceReadable.unpipe()
52 // 3. Waits for `streamDone` to ensure that downstream chunks are fully processed (e.g written to disk).
53 // 4. Calls `sourceReadable.destroy()`, which emits ERR_STREAM_PREMATURE_CLOSE
54 // 5. _pipeline (this function) catches that specific error and suppresses it (because it's expected and
55 // inevitable in this flow). Know a better way to close the stream? Tell me!
56 const streamDone = pDefer()
57 const sourceReadable = first as Readable
58 const last = _last(streams) as Writable
59 const lastFinal = last._final?.bind(last) || ((cb: AnyFunction) => cb())
60 last._final = cb => {
61 lastFinal(() => {
62 cb()
63 streamDone.resolve()
64 })
65 }
66
67 rest.forEach(s => {
68 // console.log(s)
69 if (s instanceof AbortableTransform || s.constructor.name === 'DestroyableTransform') {
70 // console.log(`found ${s.constructor.name}, setting props`)
71 ;(s as AbortableTransform).sourceReadable = sourceReadable
72 ;(s as AbortableTransform).streamDone = streamDone
73 }
74 })
75 }
76
77 return new Promise<void>((resolve, reject) => {
78 pipeline(first, ...(rest as any[]), (err: Error) => {
79 if (err) {
80 if (opt.allowClose && (err as any)?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
81 console.log('_pipeline closed (as expected)')
82 return resolve()
83 }
84 // console.log(`_pipeline error`, err)
85 return reject(err)
86 }
87
88 resolve()
89 })
90 })
91}
92
93/**
94 * Convenience function to make _pipeline collect all items at the end of the stream (should be Transform, not Writeable!)
95 * and return.
96 */
97export async function _pipelineToArray<T>(
98 streams: AnyStream[],
99 opt: PipelineOptions = {},
100): Promise<T[]> {
101 const a: T[] = []
102 await _pipeline([...streams, writablePushToArray(a)], opt)
103 return a
104}
105
106export class AbortableTransform extends Transform {
107 sourceReadable?: Readable
108 streamDone?: DeferredPromise
109}
110
\No newline at end of file