# stream-chain > Chain functions, generators, and streams into a single pipeline with proper per-item backpressure. Zero dependencies. 4.x is ESM-only and ships three substrate variants: `stream-chain` (default = Node Streams), `stream-chain/web` (native Web Streams), `stream-chain/core` (substrate-free async iterables). ## Install npm i stream-chain Requires Node 22, 24, or 26. ## Quick start ```js import chain from 'stream-chain'; const pipeline = chain([ x => x * x, x => x % 2 ? x : null, async x => await process(x) ]); dataSource.pipe(pipeline).pipe(destination); ``` ## API ### chain(fns[, options]) Creates a Duplex stream from an array of functions, streams, or arrays (flattened). - `fns` (array) — functions, streams, or nested arrays. Falsy values are ignored. - `options` (object, optional) — Duplex options plus: - `noGrouping` (boolean) — disable function grouping optimization (default: false). - `skipEvents` (boolean) — disable error event forwarding (default: false). - Default: `{writableObjectMode: true, readableObjectMode: true}`. - Returns: `Duplex` stream with `.streams`, `.input`, `.output` properties. Supported function types: regular, async, generator, async generator. ### chainUnchecked(fns[, options]) Same as `chain()` but bypasses TypeScript type checking on the `fns` parameter. ```js import {chainUnchecked} from 'stream-chain'; const pipeline = chainUnchecked([x => x * x]); ``` ### Special return values - `none` — skip, produce no value (same as returning `null`/`undefined`). - `stop` — skip and terminate the generator pipeline. - `many(values)` — emit multiple values from a single input. - `finalValue(value)` — skip remaining chain steps, emit value directly (gen/fun only). - `flushable(fn, final?)` — mark function to be called at stream end. **Convention:** Generators (sync `function*` and async `async function*`) must yield plain values only — not `none`/`stop`/`many(...)`/`finalValue(...)`. Use language constructs instead: skip with `continue`, stop with `return`, emit multiple via separate `yield`s. The markers are for regular-function returns only. ```js import chain from 'stream-chain'; import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js'; chain([ x => x % 2 ? x : none, x => many([x, x * 10]), ]); ``` ### gen(...fns) Creates an async generator pipeline from functions. Used internally by `chain()` for grouping. ```js import gen from 'stream-chain/gen.js'; const g = gen(x => x + 1, x => x * x); for await (const v of g(2)) console.log(v); // 9 ``` ### fun(...fns) Like `gen()` but returns an async function. Generator results are collected into `many()`. **Memory caveat:** `fun()` collects all outputs for a single input into one `Many` before returning. Memory scales with output-per-input. Unsafe for unbounded expansions; `gen()` is the safe default. Intentionally NOT exported from the default `stream-chain` / `stream-chain/node` entry — requires an explicit import. Available from `stream-chain/fun.js` directly and re-exported by `/web` and `/core`. ```js import fun from 'stream-chain/fun.js'; const f = fun(x => x + 1, x => x * x); console.log(await f(2)); // 9 ``` ### asStream(fn[, options]) Wraps any function as a Node Duplex stream with per-item backpressure. ```js import asStream from 'stream-chain/asStream.js'; const stream = asStream(x => x * x); ``` ### asWebStream(fn[, options]) Wraps any function as a Web Streams `{readable, writable}` duplex pair with per-item backpressure. NOT a TransformStream — `transform()` can't suspend mid-call for per-item drain. ```js import asWebStream from 'stream-chain/asWebStream.js'; const {readable, writable} = asWebStream(x => x * x); ``` `options` accepts `{strategy, readableStrategy, writableStrategy}` — Web Streams' standard `QueuingStrategy` shape. ### Subpaths ```js import chain from 'stream-chain'; // default — same as /node import chain from 'stream-chain/node'; // canonical Node Streams chain import chain from 'stream-chain/web'; // native Web Streams chain (browser-safe) import chain from 'stream-chain/core'; // async-iterable chain (no streams at all) ``` The `/node` chain returns a `Duplex`. The `/web` chain returns `{readable, writable}`. The `/core` chain returns a callable: `(input?) => AsyncGenerator`. ### Stream type guards ```js import { isReadableWebStream, isWritableWebStream, isDuplexWebStream, isReadableNodeStream, isWritableNodeStream, isDuplexNodeStream } from 'stream-chain/defs.js'; ``` All shape-based — no `node:stream` import. Also accessible as `chain.isReadableWebStream` etc. on the `/node` and `/web` entries. ### dataSource(fn) Takes a function or iterable and returns the underlying iterator function. Substrate-agnostic — exported from `stream-chain`, `stream-chain/web`, and `stream-chain/core` (and as `chain.dataSource` on all three). ```js import {dataSource} from 'stream-chain'; const iter = dataSource([1, 2, 3]); ``` ## Utilities All utilities return functions for use in `chain()`. ### Slicing - `take(n, finalValue?)` — take N items then stop. - `takeWhile(fn, finalValue?)` — take while predicate is true. - `takeWithSkip(n, skip?, finalValue?)` — skip then take. - `skip(n)` — skip N items. - `skipWhile(fn)` — skip while predicate is true. ### Folding - `fold(fn, initial)` — reduce stream to single value at end. - `reduce(fn, initial)` — alias for fold. - `scan(fn, initial)` — emit running accumulator after each item. - `reduceStream(fn, initial)` — reduce as Node `Writable` with `.accumulator`. - `reduceWebStream(fn, initial)` — reduce as Web `WritableStream`; returns `{writable, result, accumulator}`. ### Stream helpers - `batch(size)` — group items into arrays of `size`. - `lines()` — split byte stream into lines. - `fixUtf8Stream()` — repartition chunks for valid UTF-8. - `readableFrom({iterable})` — convert iterable to Node `Readable`. - `readableWebStreamFrom({iterable})` — convert iterable to Web `ReadableStream`. ### Async-iterator wrappers (4.x) - `makeStreamPuller(readable)` — wrap a Node `Readable` as a non-destructive async iterator (`stream-chain/utils/streamPuller.js`). - `makeWebStreamPuller(readable)` — wrap a Web `ReadableStream` as a non-destructive async iterator with `cancel(reason)` extension (`stream-chain/utils/webStreamPuller.js`). Both implement `for await` directly. Used by stream-join / stream-sorting for downstream merge operations. ```js import makeStreamPuller from 'stream-chain/utils/streamPuller.js'; for await (const v of makeStreamPuller(readable)) { if (shouldStop(v)) break; // source remains alive — non-destructive } ``` ```js import take from 'stream-chain/utils/take.js'; import fold from 'stream-chain/utils/fold.js'; import batch from 'stream-chain/utils/batch.js'; chain([ take(10, stop), batch(3), fold((acc, x) => acc + x.length, 0) ]); ``` ## JSONL support - `parser(reviver?)` — JSONL parser function (returns gen() pipeline; substrate-free). Emits `{key, value}`; drops empty lines. - `parserStream(options?)` — JSONL parser as a Node `Duplex` stream. - `parserWebStream(options?)` — JSONL parser as a Web Streams `{readable, writable}` pair. - `stringer(options?)` — function-pipeline JSONL stringer (flushable). Canonical building block. - `stringerStream(options?)` — JSONL stringer as a Node Transform. - `stringerWebStream(options?)` — JSONL stringer as a Web Streams `TransformStream`. - Raw export from `stream-chain/jsonl/parser.js`: `jsonlParser(options?)` (per-line factory, no `fixUtf8Stream`/`lines` input front). - Factory-bundled entries carrying `.asStream` / `.asWebStream` methods: `stream-chain/node/jsonl/parser.js` (Node, both adapters), `stream-chain/web/jsonl/parser.js` (Web, browser-safe — `.asWebStream` only), and the matching `.../jsonl/stringer.js`. The `stream-chain/node/jsonl` and `stream-chain/web/jsonl` barrels export `{jsonlParser, jsonlStringer}`. Option type `JsonlParserOptions` / `JsonlStringerOptions`, item type `JsonlItem`; `checkErrors?` is an accepted no-op. Primary purpose: migrating stream-json's deprecated JSONL imports to stream-chain with unchanged call sites. - File-edge composites in `stream-chain/jsonl/file/` (Node-only): `parseFile(options)` and `stringerToFile(path, options)`. Recommended for local file workloads — composes the parser/stringer with `fs/promises`-backed block I/O into a single fused gen pipeline. Drive with `pipe(...)` + `drain(...)` from `stream-chain/utils/`. Error handling: `ignoreErrors: true` drops failed lines silently (counter still bumps; gappy keys; back-compat). `errorIndicator` (presence-checked) is the alternative — `errorIndicator: undefined` drops without bumping the counter, `errorIndicator: null` emits `{key:N, value:null}`, function form `(error, input, reviver) => unknown` returns the replacement (`undefined` drops). `errorIndicator` wins when both are set. ```js import chain from 'stream-chain'; import parser from 'stream-chain/jsonl/parser.js'; import fs from 'node:fs'; chain([ fs.createReadStream('data.jsonl'), parser(), obj => console.log(obj) ]); // File-edge: substantially faster for round-trip workloads import {pipe} from 'stream-chain/utils/pipe.js'; import {drain} from 'stream-chain/utils/drain.js'; import parseFile from 'stream-chain/jsonl/file/parser.js'; import stringerToFile from 'stream-chain/jsonl/file/stringer.js'; const c = pipe(parseFile(), r => r.value, stringerToFile('out.jsonl')); await drain(c('in.jsonl')); ``` ## Common patterns ### Object processing pipeline ```js import chain from 'stream-chain'; const pipeline = chain([ x => x * x, x => chain.many([x - 1, x, x + 1]), x => x % 2 ? x : null, ]); dataSource.pipe(pipeline); pipeline.on('data', x => console.log(x)); ``` ### Async pipeline with filtering ```js chain([ async x => await fetchData(x), x => x.status === 200 ? x.body : null, x => JSON.parse(x), ]); ``` ### Generator producing multiple values ```js chain([ function* (x) { for (let i = 0; i < x; ++i) yield i; }, x => x * x, ]); ``` ### Accumulate and emit at end ```js import {none, flushable} from 'stream-chain/defs.js'; let sum = 0; chain([ flushable(x => { if (x === none) return sum; sum += x; return none; }) ]); ``` ### Web streams ```js const readable = new ReadableStream({ /* ... */ }); const writable = new WritableStream({ /* ... */ }); chain([readable, x => x * 2, writable]); ``` ## TypeScript ```ts import chain from 'stream-chain'; import {TypedTransform} from 'stream-chain/typed-streams.js'; const transform = new TypedTransform({ objectMode: true, transform(x, _, cb) { cb(null, String(x)); } }); const pipeline = chain([transform] as const); ``` ## Links - Docs: https://github.com/uhop/stream-chain/wiki - npm: https://www.npmjs.com/package/stream-chain - Full LLM reference: https://github.com/uhop/stream-chain/blob/master/llms-full.txt