1 | import { Transform } from 'stream'
|
2 | import { AggregatedError, ErrorMode, Mapper, Predicate } from '@naturalcycles/js-lib'
|
3 | import { yellow } from '../../colors'
|
4 | import { TransformTyped } from '../stream.model'
|
5 | import { notNullishPredicate } from './transformMap'
|
6 |
|
7 | export interface TransformMapSyncOptions<IN = any, OUT = IN> {
|
8 | |
9 |
|
10 |
|
11 | objectMode?: boolean
|
12 |
|
13 | |
14 |
|
15 |
|
16 |
|
17 | flattenArrayOutput?: boolean
|
18 |
|
19 | |
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 | predicate?: Predicate<OUT>
|
28 |
|
29 | |
30 |
|
31 |
|
32 | errorMode?: ErrorMode
|
33 |
|
34 | |
35 |
|
36 |
|
37 |
|
38 | onError?: (err: unknown, input: IN) => any
|
39 |
|
40 | |
41 |
|
42 |
|
43 |
|
44 |
|
45 | metric?: string
|
46 | }
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 | export function transformMapSync<IN = any, OUT = IN>(
|
53 | mapper: Mapper<IN, OUT>,
|
54 | opt: TransformMapSyncOptions = {},
|
55 | ): TransformTyped<IN, OUT> {
|
56 | let index = -1
|
57 |
|
58 | const {
|
59 | predicate = notNullishPredicate,
|
60 | errorMode = ErrorMode.THROW_IMMEDIATELY,
|
61 | flattenArrayOutput = false,
|
62 | onError,
|
63 | metric = 'stream',
|
64 | objectMode = true,
|
65 | } = opt
|
66 | let isRejected = false
|
67 | let errors = 0
|
68 | const collectedErrors: Error[] = []
|
69 |
|
70 | return new Transform({
|
71 | objectMode,
|
72 | ...opt,
|
73 | transform(this: Transform, chunk: IN, _encoding, cb) {
|
74 |
|
75 | if (isRejected && errorMode === ErrorMode.THROW_IMMEDIATELY) {
|
76 | return cb()
|
77 | }
|
78 |
|
79 | try {
|
80 | if (!predicate(chunk, ++index)) {
|
81 | cb()
|
82 | return
|
83 | }
|
84 |
|
85 |
|
86 | const v = mapper(chunk, index)
|
87 |
|
88 | if (flattenArrayOutput && Array.isArray(v)) {
|
89 |
|
90 | v.forEach(item => this.push(item))
|
91 | } else {
|
92 | cb(null, v)
|
93 | }
|
94 | } catch (err) {
|
95 | console.error(err)
|
96 | errors++
|
97 |
|
98 | logErrorStats()
|
99 |
|
100 | if (onError) {
|
101 | try {
|
102 | onError(err, chunk)
|
103 | } catch {}
|
104 | }
|
105 |
|
106 | if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
|
107 | isRejected = true
|
108 |
|
109 | return cb(err as Error)
|
110 | }
|
111 |
|
112 | if (errorMode === ErrorMode.THROW_AGGREGATED) {
|
113 | collectedErrors.push(err as Error)
|
114 | }
|
115 |
|
116 | cb()
|
117 | }
|
118 | },
|
119 | final(cb) {
|
120 |
|
121 |
|
122 | logErrorStats(true)
|
123 |
|
124 | if (collectedErrors.length) {
|
125 |
|
126 | cb(new AggregatedError(collectedErrors))
|
127 | } else {
|
128 |
|
129 | cb()
|
130 | }
|
131 | },
|
132 | })
|
133 |
|
134 | function logErrorStats(final = false): void {
|
135 | if (!errors) return
|
136 |
|
137 | console.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
|
138 | }
|
139 | }
|