UNPKG

1.47 kBPlain TextView Raw
1import { Transform } from 'stream'
2import { AsyncPredicate, Predicate } from '@naturalcycles/js-lib'
3import { TransformOptions, TransformTyped } from '../stream.model'
4
5/**
6 * Note, that currently it's NOT concurrent! (concurrency = 1)
7 * So, it's recommended to use transformMap instead, that is both concurrent and has
8 * filtering feature by default.
9 */
10export function transformFilter<IN = any>(
11 predicate: AsyncPredicate<IN>,
12 opt: TransformOptions = {},
13): TransformTyped<IN, IN> {
14 let index = 0
15
16 return new Transform({
17 objectMode: true,
18 ...opt,
19 async transform(chunk: IN, _encoding, cb) {
20 try {
21 if (await predicate(chunk, index++)) {
22 cb(null, chunk) // pass through
23 } else {
24 cb() // signal that we've finished processing, but emit no output here
25 }
26 } catch (err) {
27 cb(err as Error)
28 }
29 },
30 })
31}
32
33/**
34 * Sync version of `transformFilter`
35 */
36export function transformFilterSync<IN = any>(
37 predicate: Predicate<IN>,
38 opt: TransformOptions = {},
39): TransformTyped<IN, IN> {
40 let index = 0
41
42 return new Transform({
43 objectMode: true,
44 ...opt,
45 async transform(chunk: IN, _encoding, cb) {
46 try {
47 if (predicate(chunk, index++)) {
48 cb(null, chunk) // pass through
49 } else {
50 cb() // signal that we've finished processing, but emit no output here
51 }
52 } catch (err) {
53 cb(err as Error)
54 }
55 },
56 })
57}