UNPKG

1.45 kBPlain TextView Raw
1import { Readable, ReadableOptions } from 'stream'
2import { ReadableTyped } from '../stream.model'
3
4/**
5 * Convenience function to create a Readable that can be pushed into (similar to RxJS Subject).
6 * Push `null` to it to complete (similar to RxJS `.complete()`).
7 *
8 * Difference from Readable.from() is that this readable is not "finished" yet and allows pushing more to it.
9 *
10 * Caution!
11 * The implementation of this Readable is not fully compliant,
12 * e.g the read() method doesn't return anything, so, it will hand the Node process (or cause it to process.exit(0))
13 * if read() will be called AFTER everything was pushed and Readable is closed (by pushing `null`).
14 * Beware of it when e.g doing unit testing! Jest prefers to hang (not exit-0).
15 *
16 * @deprecated because of the caution above
17 */
18export function readableCreate<T>(
19 items: Iterable<T> = [],
20 opt?: ReadableOptions,
21): ReadableTyped<T> {
22 const readable = new Readable({
23 objectMode: true,
24 ...opt,
25 read() {}, // Caution, if this is called and Readable has not finished yet (null wasn't pushed) - it'll hang the process!
26 })
27 for (const item of items) {
28 readable.push(item)
29 }
30 return readable
31}
32
33/**
34 * Convenience type-safe wrapper around Readable.from() that infers the Type of input.
35 */
36export function readableFrom<T>(
37 items: Iterable<T> | AsyncIterable<T>,
38 opt?: ReadableOptions,
39): ReadableTyped<T> {
40 return Readable.from(items, opt)
41}