1 | let Stream = require('stream'); const { Transform } = Stream; if (Stream && Stream.__esModule) Stream = Stream.default;
|
2 | let cleanStack = require('@artdeco/clean-stack'); if (cleanStack && cleanStack.__esModule) cleanStack = cleanStack.default;
|
3 | const { collect } = require('catchment');
|
4 | const { checkRule, hideStack } = require('./lib');
|
5 |
|
6 | class Replaceable extends Transform {
|
7 | /**
|
8 | * Replaceable class that extends Transform and pushes data when it's done replacing each incoming chunk. If the replacement is passed as a function, it will work in the same way as the replacer for `string.replace` method (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace), taking the `match` as the first argument, and matched `p1`, `p2`, _etc_ parameters as following arguments. The replacer can also be an async function.
|
9 | * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules A single replacement rule, or multiple rules.
|
10 | * @param {!stream.TransformOptions} [options] The options for the transform stream.
|
11 | * @example
|
12 | *
|
13 | ```
|
14 | // markdown __ to html <em> implementation
|
15 | const stream = replaceStream({
|
16 | re: /__(\S+)__/g,
|
17 | replacement(match, p1) {
|
18 | return `<em>${p1}</em>`
|
19 | },
|
20 | })
|
21 | ```
|
22 | */
|
23 | constructor(rules, options) {
|
24 | super(options)
|
25 | const re = Array.isArray(rules) ? rules : [rules]
|
26 | const fr = re.filter(checkRule)
|
27 | this.rules = fr
|
28 | /**
|
29 | * Whether the _Replaceable_ will not apply any more rules.
|
30 | * @type {boolean}
|
31 | */
|
32 | this._broke = false
|
33 | this._options = options
|
34 | }
|
35 |
|
36 | /**
|
37 | * Creates a new replaceable to replace the given string, buffer or stream using the rules of the current stream. Calling `brake` will also set `_broke` on the parent stream.
|
38 | * @param {string|!Buffer|!Stream} input The input to replace.
|
39 | * @param {!Object} [context] The context to assign to the new replaceable.
|
40 | */
|
41 | async replace(input, context) {
|
42 | const replaceable = new Replaceable(this.rules, this._options)
|
43 | if (context) Object.assign(replaceable, context)
|
44 | const res = await Replaceable.replace(replaceable, input)
|
45 | if (replaceable._broke) this.brake()
|
46 | if (context) Object.keys(context).forEach(key => {
|
47 | context[key] = replaceable[key]
|
48 | })
|
49 | return res
|
50 | }
|
51 | /**
|
52 | * The method to end the stream with given data, and collect the result.
|
53 | * @param {!Replaceable} replaceable
|
54 | * @param {string|!Buffer|!Stream} input The data or stream to end the _Replaceable_ with.
|
55 | */
|
56 | static async replace(replaceable, input) {
|
57 | if (input instanceof Stream) {
|
58 | input.pipe(replaceable)
|
59 | } else {
|
60 | replaceable.end(input)
|
61 | }
|
62 | /** @type {string} */
|
63 | const data = await collect(replaceable)
|
64 | return data
|
65 | }
|
66 |
|
67 | /**
|
68 | * Stop executing further after the current rule.
|
69 | */
|
70 | brake() {
|
71 | this._broke = true
|
72 | }
|
73 |
|
74 | /**
|
75 | * @param {string|!Buffer} chunk The incoming chunk, or an object if the stream was started in object mode.
|
76 | */
|
77 | async reduce(chunk) {
|
78 | /** @type {string} */
|
79 | const s = await this.rules.reduce(async (acc, { re, replacement }) => {
|
80 | /** @type {string} */
|
81 | let string = await acc
|
82 | if (this._broke) return string
|
83 |
|
84 | if (typeof replacement == 'string') {
|
85 | string = string.replace(re, replacement)
|
86 | } else {
|
87 | const promises = []
|
88 | let commonError
|
89 | const t = string.replace(re, (match, ...args) => {
|
90 | commonError = new Error()
|
91 | try {
|
92 | if (this._broke) {
|
93 | if (promises.length)
|
94 | return promises.push(Promise.resolve(match))
|
95 | return match
|
96 | }
|
97 | const p = replacement.call(this, match, ...args)
|
98 | if (p instanceof Promise) {
|
99 | promises.push(p)
|
100 | }
|
101 | return p
|
102 | } catch (e) { // hide stack for sync stack traces
|
103 | hideStack(commonError, e)
|
104 | }
|
105 | })
|
106 | if (promises.length) {
|
107 | try { // hide stack only for when throw happens before awaits
|
108 | const data = await Promise.all(promises)
|
109 | string = string.replace(re, () => data.shift())
|
110 | } catch (e) {
|
111 | hideStack(commonError, e)
|
112 | }
|
113 | } else {
|
114 | string = t
|
115 | }
|
116 | }
|
117 | return string
|
118 | }, `${chunk}`)
|
119 |
|
120 | return s
|
121 | }
|
122 | /**
|
123 | * @suppress {checkTypes}
|
124 | * @returns {!Promise}
|
125 | */
|
126 | async _transform(chunk, _, next) {
|
127 | try {
|
128 | const s = await this.reduce(chunk)
|
129 | this.push(s)
|
130 | next()
|
131 | } catch (e) {
|
132 | const s = cleanStack(e.stack)
|
133 | e.stack = s
|
134 | next(e)
|
135 | }
|
136 | }
|
137 | }
|
138 |
|
139 | /**
|
140 | * The class for when serial execution of asynchronous replacements withing the same rule are needed.
|
141 | */
|
142 | class SerialAsyncReplaceable extends Replaceable {
|
143 | /**
|
144 | * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules
|
145 | */
|
146 | constructor(rules) {
|
147 | super(rules)
|
148 | this.promise = Promise.resolve()
|
149 | }
|
150 | addItem(fn) {
|
151 | const pp = this.promise.then(fn)
|
152 | this.promise = pp
|
153 | return pp
|
154 | }
|
155 | }
|
156 |
|
157 | /**
|
158 | * @suppress {nonStandardJsDocs}
|
159 | * @typedef {import('.').Rule} _restream.Rule
|
160 | */
|
161 | /**
|
162 | * @suppress {nonStandardJsDocs}
|
163 | * @typedef {import('.').AsyncReplacer} _restream.AsyncReplacer
|
164 | */
|
165 | /**
|
166 | * @suppress {nonStandardJsDocs}
|
167 | * @typedef {import('.').Replacer} _restream.Replacer
|
168 | */
|
169 | /**
|
170 | * @suppress {nonStandardJsDocs}
|
171 | * @typedef {import('stream').TransformOptions} stream.TransformOptions
|
172 | */
|
173 |
|
174 | module.exports = Replaceable
|
175 | module.exports.SerialAsyncReplaceable = SerialAsyncReplaceable |
\ | No newline at end of file |