UNPKG

3.46 kBPlain TextView Raw
1import { Transform } from 'stream'
2import { AggregatedError, ErrorMode, Mapper, Predicate } from '@naturalcycles/js-lib'
3import { yellow } from '../../colors'
4import { TransformTyped } from '../stream.model'
5import { notNullishPredicate } from './transformMap'
6
7export interface TransformMapSyncOptions<IN = any, OUT = IN> {
8 /**
9 * @default true
10 */
11 objectMode?: boolean
12
13 /**
14 * @default false
15 * Set true to support "multiMap" - possibility to return [] and emit 1 result for each item in the array.
16 */
17 flattenArrayOutput?: boolean
18
19 /**
20 * Predicate to filter outgoing results (after mapper).
21 * Allows to not emit all results.
22 *
23 * @default to filter out undefined/null values, but pass anything else
24 *
25 * Set to `r => r` (passthrough predicate) to pass ANY value (including undefined/null)
26 */
27 predicate?: Predicate<OUT>
28
29 /**
30 * @default THROW_IMMEDIATELY
31 */
32 errorMode?: ErrorMode
33
34 /**
35 * If defined - will be called on every error happening in the stream.
36 * Called BEFORE observable will emit error (unless skipErrors is set to true).
37 */
38 onError?: (err: unknown, input: IN) => any
39
40 /**
41 * Progress metric
42 *
43 * @default `stream`
44 */
45 metric?: string
46}
47
48/**
49 * Sync (not async) version of transformMap.
50 * Supposedly faster, for cases when async is not needed.
51 */
52export function transformMapSync<IN = any, OUT = IN>(
53 mapper: Mapper<IN, OUT>,
54 opt: TransformMapSyncOptions = {},
55): TransformTyped<IN, OUT> {
56 let index = -1
57
58 const {
59 predicate = notNullishPredicate,
60 errorMode = ErrorMode.THROW_IMMEDIATELY,
61 flattenArrayOutput = false,
62 onError,
63 metric = 'stream',
64 objectMode = true,
65 } = opt
66 let isRejected = false
67 let errors = 0
68 const collectedErrors: Error[] = [] // only used if errorMode == THROW_AGGREGATED
69
70 return new Transform({
71 objectMode,
72 ...opt,
73 transform(this: Transform, chunk: IN, _encoding, cb) {
74 // Stop processing if THROW_IMMEDIATELY mode is used
75 if (isRejected && errorMode === ErrorMode.THROW_IMMEDIATELY) {
76 return cb()
77 }
78
79 try {
80 if (!predicate(chunk, ++index)) {
81 cb() // signal that we've finished processing, but emit no output here
82 return
83 }
84
85 // map and pass through
86 const v = mapper(chunk, index)
87
88 if (flattenArrayOutput && Array.isArray(v)) {
89 // Pass each item individually
90 v.forEach(item => this.push(item))
91 } else {
92 cb(null, v)
93 }
94 } catch (err) {
95 console.error(err)
96 errors++
97
98 logErrorStats()
99
100 if (onError) {
101 try {
102 onError(err, chunk)
103 } catch {}
104 }
105
106 if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
107 isRejected = true
108 // Emit error immediately
109 return cb(err as Error)
110 }
111
112 if (errorMode === ErrorMode.THROW_AGGREGATED) {
113 collectedErrors.push(err as Error)
114 }
115
116 cb()
117 }
118 },
119 final(cb) {
120 // console.log('transformMap final')
121
122 logErrorStats(true)
123
124 if (collectedErrors.length) {
125 // emit Aggregated error
126 cb(new AggregatedError(collectedErrors))
127 } else {
128 // emit no error
129 cb()
130 }
131 },
132 })
133
134 function logErrorStats(final = false): void {
135 if (!errors) return
136
137 console.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
138 }
139}