1 | import { Readable, ReadableOptions } from 'stream'
|
2 | import { ReadableTyped } from '../stream.model'
|
3 |
|
4 | /**
|
5 | * Convenience function to create a Readable that can be pushed into (similar to RxJS Subject).
|
6 | * Push `null` to it to complete (similar to RxJS `.complete()`).
|
7 | *
|
8 | * Difference from Readable.from() is that this readable is not "finished" yet and allows pushing more to it.
|
9 | *
|
10 | * Caution!
|
11 | * The implementation of this Readable is not fully compliant,
|
12 | * e.g the read() method doesn't return anything, so, it will hand the Node process (or cause it to process.exit(0))
|
13 | * if read() will be called AFTER everything was pushed and Readable is closed (by pushing `null`).
|
14 | * Beware of it when e.g doing unit testing! Jest prefers to hang (not exit-0).
|
15 | *
|
16 | * @deprecated because of the caution above
|
17 | */
|
18 | export function readableCreate<T>(
|
19 | items: Iterable<T> = [],
|
20 | opt?: ReadableOptions,
|
21 | ): ReadableTyped<T> {
|
22 | const readable = new Readable({
|
23 | objectMode: true,
|
24 | ...opt,
|
25 | read() {}, // Caution, if this is called and Readable has not finished yet (null wasn't pushed) - it'll hang the process!
|
26 | })
|
27 | for (const item of items) {
|
28 | readable.push(item)
|
29 | }
|
30 | return readable
|
31 | }
|
32 |
|
33 | /**
|
34 | * Convenience type-safe wrapper around Readable.from() that infers the Type of input.
|
35 | */
|
36 | export function readableFrom<T>(
|
37 | items: Iterable<T> | AsyncIterable<T>,
|
38 | opt?: ReadableOptions,
|
39 | ): ReadableTyped<T> {
|
40 | return Readable.from(items, opt)
|
41 | }
|