UNPKG

22.6 kBMarkdownView Raw
1# streaming-iterables 🏄‍♂️
2
3[![Node CI](https://github.com/reconbot/streaming-iterables/workflows/Node%20CI/badge.svg?branch=master)](https://github.com/reconbot/streaming-iterables/actions?query=workflow%3A%22Node+CI%22) [![Try streaming-iterables on RunKit](https://badge.runkitcdn.com/streaming-iterables.svg)](https://npm.runkit.com/streaming-iterables) [![install size](https://packagephobia.now.sh/badge?p=streaming-iterables)](https://packagephobia.now.sh/result?p=streaming-iterables)
4
5A Swiss army knife for [async iterables](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of). Designed to help replace your streams. These utilities have a comparable speed, friendlier error handling, and are easier to understand than most stream based workloads.
6
7Streams were our last best hope for processing unbounded amounts of data. Since Node 10 they have become something greater, they've become async iterable. With async iterators you can have less code, do more work, faster.
8
9If you still need streams with async functions, check out sister project [`bluestream`🏄‍♀️](https://www.npmjs.com/package/bluestream)!
10
11We support and test against LTS node releases, but may work with older versions of node.
12
13## Install
14
15There are no dependencies.
16
17```bash
18npm install streaming-iterables
19```
20
21We ship esm, umd and types.
22
23## Overview
24
25Every function is curryable, you can call it with any number of arguments. For example:
26
27```ts
28import { map } from 'streaming-iterables'
29
30for await (const str of map(String, [1,2,3])) {
31 console.log(str)
32}
33// "1", "2", "3"
34
35const stringable = map(String)
36for await (const str of stringable([1,2,3])) {
37 console.log(str)
38}
39// "1", "2", "3"
40```
41
42Since this works with async iterators it requires node 10 or higher.
43
44## API
45
46- [`batch()`](#batch)
47- [`batchWithTimeout()`](#batchwithtimeout)
48- [`buffer()`](#buffer)
49- [`collect()`](#collect)
50- [`concat()`](#concat)
51- [`consume()`](#consume)
52- [`drop()`](#drop)
53- [`flatMap()`](#flatmap)
54- [`flatten()`](#flatten)
55- [`flatTransform()`](#flattransform)
56- [`fromStream()`](#fromstream)
57- [`filter()`](#filter)
58- [`getIterator()`](#getiterator)
59- [`map()`](#map)
60- [`merge()`](#merge)
61- [`parallelMap()`](#parallelmap)
62- [`parallelMerge()`](#parallelmerge)
63- [`pipeline()`](#pipeline)
64- [`reduce()`](#reduce)
65- [`take()`](#take)
66- [`tap()`](#tap)
67- [`throttle()`](#throttle)
68- [`time()`](#time)
69- [`transform()`](#transform)
70- [`writeToStream()`](#writetostream)
71
72### batch
73
74```ts
75function batch<T>(size: number, iterable: AsyncIterable<T>): AsyncGenerator<T[]>
76function batch<T>(size: number, iterable: Iterable<T>): Generator<T[]>
77```
78
79Batch objects from `iterable` into arrays of `size` length. The final array may be shorter than size if there is not enough items. Returns a sync iterator if the `iterable` is sync, otherwise an async iterator. Errors from the source `iterable` are immediately raised.
80
81`size` can be between 1 and `Infinity`.
82
83```ts
84import { batch } from 'streaming-iterables'
85import { getPokemon } from 'iterable-pokedex'
86
87// batch 10 pokemon while we process them
88for await (const pokemons of batch(10, getPokemon())) {
89 console.log(pokemons) // 10 pokemon at a time!
90}
91```
92
93### batchWithTimeout
94
95```ts
96function batchWithTimeout<T>(size: number, timeout: number, iterable: AsyncIterable<T>): AsyncGenerator<T[]>
97function batchWithTimeout<T>(size: number, timeout: number, iterable: Iterable<T>): Generator<T[]>
98```
99
100Like [`batch`](#batch) but flushes early if the `timeout` is reached. The batches may be shorter than size if there are not enough items. Returns a sync iterator if the `iterable` is sync, otherwise an async iterator. Errors from the source `iterable` are immediately raised.
101
102`size` can be between 1 and `Infinity`.
103`timeout` can be between 0 and `Infinity`.
104
105```ts
106import { batchWithTimeout } from 'streaming-iterables'
107import { getPokemon } from 'iterable-pokedex'
108
109// batch 10 pokemon while we process them
110for await (const pokemons of batchWithTimeout(10, 100, getPokemon())) {
111 console.log(pokemons) // Up to 10 pokemon at a time!
112}
113```
114
115### buffer
116
117```ts
118function buffer<T>(size: number, iterable: AsyncIterable<T>): AsyncIterable<T>
119function buffer<T>(size: number, iterable: Iterable<T>): AsyncIterable<T>
120```
121
122Buffer keeps a number of objects in reserve available for immediate reading. This is helpful with async iterators as it will pre-fetch results so you don't have to wait for them to load. For sync iterables it will pre-compute up to `size` values and keep them in reserve. The internal buffer will start to be filled once `.next()` is called for the first time and will continue to fill until the source `iterable` is exhausted or the buffer is full. Errors from the source `iterable` will be raised after all buffered values are yielded.
123
124`size` can be between 0 and `Infinity`.
125
126```ts
127import { buffer } from 'streaming-iterables'
128import { getPokemon, trainMonster } from 'iterable-pokedex'
129
130// load 10 monsters in the background while we process them one by one
131for await (const monster of buffer(10, getPokemon())) {
132 await trainMonster(monster) // got to do some pokéwork
133}
134```
135
136### collect
137
138```ts
139function collect<T>(iterable: Iterable<T>): T[]
140function collect<T>(iterable: AsyncIterable<T>): Promise<T[]>
141```
142
143Collect all the values from an iterable into an array. Returns an array if you pass it an iterable and a promise for an array if you pass it an async iterable. Errors from the source `iterable` are raised immediately.
144
145```ts
146import { collect } from 'streaming-iterables'
147import { getPokemon } from 'iterable-pokedex'
148
149console.log(await collect(getPokemon()))
150// [bulbasaur, ivysaur, venusaur, charmander, ...]
151```
152
153### concat
154
155```ts
156function concat(...iterables: Array<Iterable<any>>): Iterable<any>
157function concat(...iterables: Array<AnyIterable<any>>): AsyncIterable<any>
158```
159
160Combine multiple iterators into a single iterable. Reads each iterable completely one at a time. Returns a sync iterator if all `iterables` are sync, otherwise it returns an async iterable. Errors from the source `iterable` are raised immediately.
161
162```ts
163import { concat } from 'streaming-iterables'
164import { getPokemon } from 'iterable-pokedex'
165import { getTransformers } from './util'
166
167for await (const hero of concat(getPokemon(2), getTransformers(2))) {
168 console.log(hero)
169}
170// charmander
171// bulbasaur <- end of pokemon
172// megatron
173// bumblebee <- end of transformers
174```
175
176### consume
177
178```ts
179export function consume<T>(iterable: Iterable<T>): void
180export function consume<T>(iterable: AsyncIterable<T>): Promise<void>
181```
182
183A promise that resolves after the function drains the iterable of all data. Useful for processing a pipeline of data. Errors from the source `iterable` are raised immediately.
184
185```ts
186import { consume, map } from 'streaming-iterables'
187import { getPokemon, trainMonster } from 'iterable-pokedex'
188
189const train = map(trainMonster)
190await consume(train(getPokemon())) // load all the pokemon and train them!
191```
192
193### drop
194
195```ts
196function drop<T>(count: number, iterable: AsyncIterable<T>): AsyncIterableIterator<T>
197function drop<T>(count: number, iterable: Iterable<T>): IterableIterator<T>
198```
199
200Returns a new iterator that skips a specific number of items from `iterable`. When used with generators it advances the generator `count` items, when used with arrays it gets a new iterator and skips `count` items.
201
202```ts
203import { pipeline, drop, collect } from 'streaming-iterables'
204import { getPokemon } from 'iterable-pokedex'
205
206const allButFirstFive = await collect(drop(5, getPokemon()))
207// first five pokemon
208```
209
210### flatMap
211
212```ts
213function flatMap<T, B>(func: (data: T) => FlatMapValue<B>, iterable: AnyIterable<T>): AsyncGenerator<B>
214```
215
216Map `func` over the `iterable`, flatten the result and then ignore all null or undefined values. It's the transform function we've always needed. It's equivalent to;
217
218```ts
219(func, iterable) => filter(i => i !== undefined && i !== null, flatten(map(func, iterable)))
220```
221
222*note*: The return value for `func` is `FlatMapValue<B>`. Typescript doesn't have recursive types but you can nest iterables as deep as you like.
223
224The ordering of the results is guaranteed. Errors from the source `iterable` are raised after all mapped values are yielded. Errors from `func` are raised after all previously mapped values are yielded.
225
226```ts
227import { flatMap } from 'streaming-iterables'
228import { getPokemon, lookupStats } from 'iterable-pokedex'
229
230async function getDefeatedGyms(pokemon) {
231 if (pokemon.gymBattlesWon > 0) {
232 const stats = await lookupStats(pokemon)
233 return stats.gyms
234 }
235}
236
237for await (const gym of flatMap(getDefeatedGyms, getPokemon())) {
238 console.log(gym.name)
239}
240// "Pewter Gym"
241// "Cerulean Gym"
242// "Vermilion Gym"
243```
244
245### flatten
246
247```ts
248function flatten<B>(iterable: AnyIterable<B | AnyIterable<B>>): AsyncIterableIterator<B>
249```
250
251Returns a new iterator by pulling every item out of `iterable` (and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists. If the value is a string it is not iterated as that ends up in an infinite loop. Errors from the source `iterable` are raised immediately.
252
253*note*: Typescript doesn't have recursive types but you can nest iterables as deep as you like.
254
255```ts
256import { flatten } from 'streaming-iterables'
257
258for await (const item of flatten([1, 2, [3, [4, 5], 6])) {
259 console.log(item)
260}
261// 1
262// 2
263// 3
264// 4
265// 5
266// 6
267```
268
269### flatTransform
270
271```ts
272function flatTransform<T, R>(concurrency: number, func: (data: T) => FlatMapValue<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
273```
274
275Map `func` over the `iterable`, flatten the result and then ignore all null or undefined values. Returned async iterables are flattened concurrently too. It's the transform function we've always wanted.
276
277It's similar to;
278
279```ts
280const filterEmpty = filter(i => i !== undefined && i !== null)
281(concurrency, func, iterable) => filterEmpty(flatten(transform(concurrency, func, iterable)))
282```
283
284*note*: The return value for `func` is `FlatMapValue<B>`. Typescript doesn't have recursive types but you can nest iterables as deep as you like. However only directly returned async iterables are processed concurrently. (Eg, if you use an async generator function as `func` it's output will be processed concurrently, but if it's nested inside other iterables it will be processed sequentially.)
285
286Order is determined by when async operations resolve. And it will run up to `concurrency` async operations at once. This includes promises and async iterables returned from `func`. Errors from the source `iterable` are raised after all transformed values are yielded. Errors from `func` are raised after all previously transformed values are yielded.
287
288`concurrency` can be between 1 and `Infinity`.
289
290Promise Example;
291
292```ts
293import { flatTransform } from 'streaming-iterables'
294import { getPokemon, lookupStats } from 'iterable-pokedex'
295
296async function getDefeatedGyms(pokemon) {
297 if (pokemon.gymBattlesWon > 0) {
298 const stats = await lookupStats(pokemon)
299 return stats.gyms
300 }
301}
302
303// lookup 10 stats at a time
304for await (const gym of flatTransform(10, getDefeatedGyms, getPokemon())) {
305 console.log(gym.name)
306}
307// "Pewter Gym"
308// "Cerulean Gym"
309// "Vermilion Gym"
310```
311
312Async Generator Example
313
314```ts
315import { flatTransform } from 'streaming-iterables'
316import { getPokemon } from 'iterable-pokedex'
317import { findFriendsFB, findFriendsMySpace } from './util'
318
319
320async function* findFriends (pokemon) {
321 yield await findFriendsFB(pokemon.name)
322 yield await findFriendsMySpace(pokemon.name)
323}
324
325for await (const pokemon of flatTransform(10, findFriends, getPokemon())) {
326 console.log(pokemon.name)
327}
328// Pikachu
329// Meowth
330// Ash - FB
331// Jessie - FB
332// Misty - MySpace
333// James - MySpace
334```
335
336### fromStream
337
338```ts
339function fromStream<T>(stream: Readable): AsyncIterable<T>
340```
341
342Wraps the stream in an async iterator or returns the stream if it already is an async iterator.
343
344*note*: Since Node 10, streams already async iterators. This function may be used to ensure compatibility with older versions of Node.
345*note*: This method is deprecated since, node 10 is out of LTS. It may be removed in an upcoming major release.
346
347```ts
348import { fromStream } from 'streaming-iterables'
349import { createReadStream } from 'fs'
350
351const pokeLog = fromStream(createReadStream('./pokedex-operating-system.log'))
352
353for await (const pokeData of pokeLog) {
354 console.log(pokeData) // Buffer(...)
355}
356```
357
358### filter
359
360```ts
361function filter<T>(filterFunc: (data: T) => boolean | Promise<boolean>, iterable: AnyIterable<T>): AsyncIterableIterator<T>
362```
363
364Takes a `filterFunc` and a `iterable`, and returns a new async iterator of the same type containing the members of the given iterable which cause the `filterFunc` to return true.
365
366```ts
367import { filter } from 'streaming-iterables'
368import { getPokemon } from 'iterable-pokedex'
369
370const filterWater = filter(pokemon => pokemon.types.include('Water'))
371
372for await (const pokemon of filterWater(getPokemon())) {
373 console.log(pokemon)
374}
375// squirtle
376// vaporeon
377// magikarp
378```
379
380### getIterator
381
382```ts
383function getIterator<T>(values: Iterableish<T>): Iterator<T> | AsyncIterator<T>
384```
385
386Get the iterator from any iterable or just return an iterator itself.
387
388### map
389
390```ts
391function map<T, B>(func: (data: T) => B | Promise<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>
392```
393
394Map a function or async function over all the values of an iterable. Errors from the source `iterable` and `func` are raised immediately.
395
396```ts
397import { consume, map } from 'streaming-iterables'
398import got from 'got'
399
400const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
401const download = map(got)
402
403// download one at a time
404for await (page of download(urls)) {
405 console.log(page)
406}
407```
408
409### merge
410
411```ts
412function merge(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>
413```
414
415Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted. If you care less about order and want them faster see [`parallelMerge()`](#parallelmerge).
416
417### parallelMap
418
419```ts
420function parallelMap<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
421```
422
423Map a function or async function over all the values of an iterable and do them concurrently. Errors from the source `iterable` are raised after all mapped values are yielded. Errors from `func` are raised after all previously mapped values are yielded. Just like [`map()`](#map).
424
425`concurrency` can be between 1 and `Infinity`.
426
427If you don't care about order, see the faster [`transform()`](#transform) function.
428
429```ts
430import { consume, parallelMap } from 'streaming-iterables'
431import got from 'got'
432
433const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
434const download = parallelMap(2, got)
435
436// download two at a time
437for await (page of download(urls)) {
438 console.log(page)
439}
440```
441
442### parallelMerge
443
444```ts
445function parallelMerge<T>(...iterables: Array<AnyIterable<T>>): AsyncIterableIterator<T>
446```
447
448Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of a collection of iterables as soon as they're available. Errors `iterables` are raised immediately.
449
450```ts
451import { parallelMerge } from 'streaming-iterables'
452import { getPokemon, getTransformer } from 'iterable-pokedex'
453
454// pokemon are much faster to load btw
455const heros = parallelMerge(getPokemon(), getTransformer())
456for await (const hero of heros) {
457 console.log(hero)
458}
459// charmander
460// bulbasaur
461// megatron
462// pikachu
463// eevee
464// bumblebee
465// jazz
466```
467
468### pipeline
469
470```ts
471function pipeline(firstFn: Function, ...fns: Function[]): any;
472```
473
474Calls `firstFn` and then every function in `fns` with the result of the previous function. The final return is the result of the last function in `fns`.
475
476```ts
477import { pipeline, map, collect } from 'streaming-iterables'
478import { getPokemon } from 'iterable-pokedex'
479const getName = map(pokemon => pokemon.name)
480
481// equivalent to `await collect(getName(getPokemon()))`
482await pipeline(getPokemon, getName, collect)
483// charmander
484// bulbasaur
485// MissingNo.
486```
487
488### reduce
489
490```ts
491function reduce<T, B>(func: (acc: B, value: T) => B, start: B, iterable: AnyIterable<T>): Promise<B>
492```
493
494An async function that takes a reducer function, an initial value and an iterable.
495
496Reduces an iterable to a value which is the accumulated result of running each value from the iterable thru `func`, where each successive invocation is supplied the return value of the previous. Errors are immediate raised.
497
498### take
499
500```ts
501function take<T>(count: number, iterable: AsyncIterable<T>): AsyncIterableIterator<T>
502function take<T>(count: number, iterable: Iterable<T>): IterableIterator<T>
503```
504
505Returns a new iterator that reads a specific number of items from `iterable`. When used with generators it advances the generator, when used with arrays it gets a new iterator and starts from the beginning.
506
507```ts
508import { pipeline, take, collect } from 'streaming-iterables'
509import { getPokemon } from 'iterable-pokedex'
510
511const topFive = await collect(take(5, getPokemon()))
512// first five pokemon
513```
514
515### tap
516
517```ts
518function tap<T>(func: (data: T) => any, iterable: AnyIterable<T>): AsyncIterableIterator<T>
519```
520
521Returns a new iterator that yields the data it consumes, passing the data through to a function. If you provide an async function, the iterator will wait for the promise to resolve before yielding the value. This is useful for logging, or processing information and passing it along.
522
523### throttle
524
525```ts
526function throttle<T>(limit: number, interval: number, iterable: AnyIterable<T>): AsyncGenerator<T>
527```
528
529Throttles `iterable` at a rate of `limit` per `interval` without discarding data. Useful for throttling rate limited APIs.
530
531`limit` can be greater than 0 but less than `Infinity`.
532`interval` can be greater than or equal to 0 but less than `Infinity`.
533
534```ts
535import { throttle } from 'streaming-iterables'
536import { getPokemon, trainMonster } from 'iterable-pokedex'
537
538// load monsters at a maximum rate of 1 per second
539for await (const monster of throttle(1, 1000, getPokemon())) {
540 await trainMonster(monster)
541}
542```
543
544### time
545
546```ts
547function time<T>(config?: ITimeConfig, iterable: AsyncIterable<R>): AsyncIterableIterator<R>
548function time<T>(config?: ITimeConfig, iterable: Iterable<R>): IterableIterator<R>
549
550interface ITimeConfig {
551 progress?: (delta: [number, number], total: [number, number]) => any;
552 total?: (time: [number, number]) => any;
553}
554```
555
556Returns a new iterator that yields the data it consumes and calls the `progress` and `total` callbacks with the [`hrtime`](https://nodejs.org/api/process.html#process_process_hrtime_time) it took for `iterable` to provide a value when `.next()` was called on it. That is to say, the time returned is the time this iterator spent waiting for data, not the time it took to finish being read. The `hrtime` tuple looks like `[seconds, nanoseconds]`.
557
558```ts
559import { consume, transform, time } from 'streaming-iterables'
560import got from 'got'
561
562const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
563const download = transform(1000, got)
564const timer = time({
565 total: total => console.log(`Spent ${total[0]} seconds and ${total[1]}ns downloading cats`),
566})
567// download all of these at the same time
568for await (page of timer(download(urls))) {
569 console.log(page)
570}
571```
572
573### transform
574
575```ts
576function transform<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>
577```
578
579Map a function or async function over all the values of an iterable. Order is determined by when `func` resolves. And it will run up to `concurrency` async `func` operations at once. If you care about order see [`parallelMap()`](#parallelmap). Errors from the source `iterable` are raised after all transformed values are yielded. Errors from `func` are raised after all previously transformed values are yielded.
580
581`concurrency` can be between 1 and `Infinity`.
582
583```ts
584import { consume, transform } from 'streaming-iterables'
585import got from 'got'
586
587const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
588const download = transform(1000, got)
589
590// download all of these at the same time
591for await (page of download(urls)) {
592 console.log(page)
593}
594```
595
596### writeToStream
597
598```ts
599function writeToStream(stream: Writable, iterable: AnyIterable<any>): Promise<void>
600```
601
602Writes the `iterable` to the stream respecting the stream back pressure. Resolves when the iterable is exhausted, rejects if the stream errors during calls to `write()` or if there are `error` events during the write.
603
604As it is when working with streams there are a few caveats;
605
606- It is possible for the stream to error after `writeToStream()` has finished writing due to internal buffering and other concerns, so always handle errors on the stream as well.
607- `writeToStream()` doesn't close the stream like `stream.pipe()` might. This is done so you can write to the stream multiple times. You can call `stream.write(null)` or any stream specific end function if you are done with the stream.
608
609```ts
610import { pipeline, map, writeToStream } from 'streaming-iterables'
611import { getPokemon } from 'iterable-pokedex'
612import { createWriteStream } from 'fs'
613
614const file = createWriteStream('pokemon.ndjson')
615const serialize = map(pokemon => `${JSON.stringify(pokemon)}\n`)
616await pipeline(getPokemon, serialize, writeToStream(file))
617file.end() // close the stream
618// now all the pokemon are written to the file!
619```
620
621## Types
622
623### Iterableish
624
625```ts
626type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | AsyncIterator<T>
627```
628
629Any iterable or iterator.
630
631### AnyIterable
632
633```ts
634type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
635```
636
637Literally any `Iterable` (async or regular).
638
639### FlatMapValue
640
641```ts
642type FlatMapValue<B> = B | AnyIterable<B> | undefined | null | Promise<B | AnyIterable<B> | undefined | null>
643```
644
645A value, an array of that value, undefined, null or promises for any of them. Used in the `flatMap` and `flatTransform` functions as possible return values of the mapping function.
646
647## Contributors wanted
648
649Writing docs and code is a lot of work! Thank you in advance for helping out.