1 | import {
|
2 | _anyToError,
|
3 | AggregatedError,
|
4 | CommonLogger,
|
5 | END,
|
6 | ErrorMode,
|
7 | Mapper,
|
8 | Predicate,
|
9 | SKIP,
|
10 | } from '@naturalcycles/js-lib'
|
11 | import { yellow } from '../../colors'
|
12 | import { AbortableTransform } from '../pipeline/pipeline'
|
13 | import { TransformTyped } from '../stream.model'
|
14 | import { pipelineClose } from '../stream.util'
|
15 |
|
16 | export interface TransformMapSyncOptions<IN = any, OUT = IN> {
|
17 | |
18 |
|
19 |
|
20 | objectMode?: boolean
|
21 |
|
22 | |
23 |
|
24 |
|
25 |
|
26 | flattenArrayOutput?: boolean
|
27 |
|
28 | |
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 | predicate?: Predicate<OUT>
|
36 |
|
37 | |
38 |
|
39 |
|
40 | errorMode?: ErrorMode
|
41 |
|
42 | |
43 |
|
44 |
|
45 |
|
46 | onError?: (err: Error, input: IN) => any
|
47 |
|
48 | |
49 |
|
50 |
|
51 |
|
52 |
|
53 | metric?: string
|
54 |
|
55 | logger?: CommonLogger
|
56 | }
|
57 |
|
58 | export class TransformMapSync extends AbortableTransform {}
|
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 | export function transformMapSync<IN = any, OUT = IN>(
|
65 | mapper: Mapper<IN, OUT>,
|
66 | opt: TransformMapSyncOptions = {},
|
67 | ): TransformTyped<IN, OUT> {
|
68 | let index = -1
|
69 |
|
70 | const {
|
71 | predicate,
|
72 | errorMode = ErrorMode.THROW_IMMEDIATELY,
|
73 | flattenArrayOutput = false,
|
74 | onError,
|
75 | metric = 'stream',
|
76 | objectMode = true,
|
77 | logger = console,
|
78 | } = opt
|
79 | let isSettled = false
|
80 | let errors = 0
|
81 | const collectedErrors: Error[] = []
|
82 |
|
83 | return new TransformMapSync({
|
84 | objectMode,
|
85 | ...opt,
|
86 | transform(this: AbortableTransform, chunk: IN, _, cb) {
|
87 |
|
88 | if (isSettled) return cb()
|
89 |
|
90 | const currentIndex = ++index
|
91 |
|
92 | try {
|
93 |
|
94 | const v = mapper(chunk, currentIndex)
|
95 |
|
96 | const passedResults = (flattenArrayOutput && Array.isArray(v) ? v : [v]).filter(r => {
|
97 | if (r === END) {
|
98 | isSettled = true
|
99 | return false
|
100 | }
|
101 | return r !== SKIP && (!predicate || predicate(r, currentIndex))
|
102 | })
|
103 |
|
104 | passedResults.forEach(r => this.push(r))
|
105 |
|
106 | if (isSettled) {
|
107 | logger.log(`transformMapSync END received at index ${currentIndex}`)
|
108 | pipelineClose('transformMapSync', this, this.sourceReadable, this.streamDone, logger)
|
109 | }
|
110 |
|
111 | cb()
|
112 | } catch (err) {
|
113 | logger.error(err)
|
114 | errors++
|
115 |
|
116 | logErrorStats()
|
117 |
|
118 | if (onError) {
|
119 | try {
|
120 | onError(_anyToError(err), chunk)
|
121 | } catch {}
|
122 | }
|
123 |
|
124 | if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
|
125 | isSettled = true
|
126 |
|
127 | return cb(err as Error)
|
128 | }
|
129 |
|
130 | if (errorMode === ErrorMode.THROW_AGGREGATED) {
|
131 | collectedErrors.push(err as Error)
|
132 | }
|
133 |
|
134 | cb()
|
135 | }
|
136 | },
|
137 | final(cb) {
|
138 |
|
139 |
|
140 | logErrorStats(true)
|
141 |
|
142 | if (collectedErrors.length) {
|
143 |
|
144 | cb(new AggregatedError(collectedErrors))
|
145 | } else {
|
146 |
|
147 | cb()
|
148 | }
|
149 | },
|
150 | })
|
151 |
|
152 | function logErrorStats(final = false): void {
|
153 | if (!errors) return
|
154 |
|
155 | logger.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
|
156 | }
|
157 | }
|