1 | import { Transform } from 'stream'
|
2 | import { AsyncPredicate, Predicate } from '@naturalcycles/js-lib'
|
3 | import { TransformOptions, TransformTyped } from '../stream.model'
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | export function transformFilter<IN = any>(
|
11 | predicate: AsyncPredicate<IN>,
|
12 | opt: TransformOptions = {},
|
13 | ): TransformTyped<IN, IN> {
|
14 | let index = 0
|
15 |
|
16 | return new Transform({
|
17 | objectMode: true,
|
18 | ...opt,
|
19 | async transform(chunk: IN, _encoding, cb) {
|
20 | try {
|
21 | if (await predicate(chunk, index++)) {
|
22 | cb(null, chunk)
|
23 | } else {
|
24 | cb()
|
25 | }
|
26 | } catch (err) {
|
27 | cb(err as Error)
|
28 | }
|
29 | },
|
30 | })
|
31 | }
|
32 |
|
33 |
|
34 |
|
35 |
|
36 | export function transformFilterSync<IN = any>(
|
37 | predicate: Predicate<IN>,
|
38 | opt: TransformOptions = {},
|
39 | ): TransformTyped<IN, IN> {
|
40 | let index = 0
|
41 |
|
42 | return new Transform({
|
43 | objectMode: true,
|
44 | ...opt,
|
45 | async transform(chunk: IN, _encoding, cb) {
|
46 | try {
|
47 | if (predicate(chunk, index++)) {
|
48 | cb(null, chunk)
|
49 | } else {
|
50 | cb()
|
51 | }
|
52 | } catch (err) {
|
53 | cb(err as Error)
|
54 | }
|
55 | },
|
56 | })
|
57 | }
|