UNPKG

3.82 kBPlain TextView Raw
1import {
2 _anyToError,
3 AggregatedError,
4 CommonLogger,
5 END,
6 ErrorMode,
7 Mapper,
8 Predicate,
9 SKIP,
10} from '@naturalcycles/js-lib'
11import { yellow } from '../../colors'
12import { AbortableTransform } from '../pipeline/pipeline'
13import { TransformTyped } from '../stream.model'
14import { pipelineClose } from '../stream.util'
15
16export interface TransformMapSyncOptions<IN = any, OUT = IN> {
17 /**
18 * @default true
19 */
20 objectMode?: boolean
21
22 /**
23 * @default false
24 * Set true to support "multiMap" - possibility to return [] and emit 1 result for each item in the array.
25 */
26 flattenArrayOutput?: boolean
27
28 /**
29 * Predicate to filter outgoing results (after mapper).
30 * Allows to not emit all results.
31 *
32 * Defaults to "pass everything".
33 * Simpler way to skip individual entries is to return SKIP symbol.
34 */
35 predicate?: Predicate<OUT>
36
37 /**
38 * @default THROW_IMMEDIATELY
39 */
40 errorMode?: ErrorMode
41
42 /**
43 * If defined - will be called on every error happening in the stream.
44 * Called BEFORE observable will emit error (unless skipErrors is set to true).
45 */
46 onError?: (err: Error, input: IN) => any
47
48 /**
49 * Progress metric
50 *
51 * @default `stream`
52 */
53 metric?: string
54
55 logger?: CommonLogger
56}
57
58export class TransformMapSync extends AbortableTransform {}
59
60/**
61 * Sync (not async) version of transformMap.
62 * Supposedly faster, for cases when async is not needed.
63 */
64export function transformMapSync<IN = any, OUT = IN>(
65 mapper: Mapper<IN, OUT>,
66 opt: TransformMapSyncOptions = {},
67): TransformTyped<IN, OUT> {
68 let index = -1
69
70 const {
71 predicate, // defaults to "no predicate" (pass everything)
72 errorMode = ErrorMode.THROW_IMMEDIATELY,
73 flattenArrayOutput = false,
74 onError,
75 metric = 'stream',
76 objectMode = true,
77 logger = console,
78 } = opt
79 let isSettled = false
80 let errors = 0
81 const collectedErrors: Error[] = [] // only used if errorMode == THROW_AGGREGATED
82
83 return new TransformMapSync({
84 objectMode,
85 ...opt,
86 transform(this: AbortableTransform, chunk: IN, _, cb) {
87 // Stop processing if isSettled
88 if (isSettled) return cb()
89
90 const currentIndex = ++index
91
92 try {
93 // map and pass through
94 const v = mapper(chunk, currentIndex)
95
96 const passedResults = (flattenArrayOutput && Array.isArray(v) ? v : [v]).filter(r => {
97 if (r === END) {
98 isSettled = true // will be checked later
99 return false
100 }
101 return r !== SKIP && (!predicate || predicate(r, currentIndex))
102 })
103
104 passedResults.forEach(r => this.push(r))
105
106 if (isSettled) {
107 logger.log(`transformMapSync END received at index ${currentIndex}`)
108 pipelineClose('transformMapSync', this, this.sourceReadable, this.streamDone, logger)
109 }
110
111 cb() // done processing
112 } catch (err) {
113 logger.error(err)
114 errors++
115
116 logErrorStats()
117
118 if (onError) {
119 try {
120 onError(_anyToError(err), chunk)
121 } catch {}
122 }
123
124 if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
125 isSettled = true
126 // Emit error immediately
127 return cb(err as Error)
128 }
129
130 if (errorMode === ErrorMode.THROW_AGGREGATED) {
131 collectedErrors.push(err as Error)
132 }
133
134 cb()
135 }
136 },
137 final(cb) {
138 // console.log('transformMap final')
139
140 logErrorStats(true)
141
142 if (collectedErrors.length) {
143 // emit Aggregated error
144 cb(new AggregatedError(collectedErrors))
145 } else {
146 // emit no error
147 cb()
148 }
149 },
150 })
151
152 function logErrorStats(final = false): void {
153 if (!errors) return
154
155 logger.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
156 }
157}