1 | const { Transform } = require('stream');
|
2 | let cleanStack = require('@artdeco/clean-stack'); if (cleanStack && cleanStack.__esModule) cleanStack = cleanStack.default;
|
3 | const { checkRule, hideStack } = require('./lib');
|
4 |
|
5 | class Replaceable extends Transform {
|
6 | /**
|
7 | * Replaceable class that extends Transform and pushes data when it's done replacing each incoming chunk. If the replacement is passed as a function, it will work in the same way as the replacer for `string.replace` method (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace), taking the `match` as the first argument, and matched `p1`, `p2`, _etc_ parameters as following arguments. The replacer can also be an async function.
|
8 | * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules A single replacement rule, or multiple rules.
|
9 | * @param {!stream.TransformOptions} [options] The options for the transform stream.
|
10 | * @example
|
11 | *
|
12 | ```
|
13 | // markdown __ to html <em> implementation
|
14 | const stream = replaceStream({
|
15 | re: /__(\S+)__/g,
|
16 | replacement(match, p1) {
|
17 | return `<em>${p1}</em>`
|
18 | },
|
19 | })
|
20 | ```
|
21 | */
|
22 | constructor(rules, options) {
|
23 | super(options)
|
24 | const re = Array.isArray(rules) ? rules : [rules]
|
25 | const fr = re.filter(checkRule)
|
26 | this.rules = fr
|
27 | /**
|
28 | * Whether the _Replaceable_ will not apply any more rules.
|
29 | * @type {boolean}
|
30 | */
|
31 | this._broke = false
|
32 | }
|
33 |
|
34 | /**
|
35 | * Stop executing further after the current rule.
|
36 | */
|
37 | brake() {
|
38 | this._broke = true
|
39 | }
|
40 |
|
41 | async reduce(chunk) {
|
42 | /** @type {string} */
|
43 | const s = await this.rules.reduce(async (acc, { re, replacement }) => {
|
44 | /** @type {string} */
|
45 | let string = await acc
|
46 | if (this._broke) return string
|
47 |
|
48 | if (typeof replacement == 'string') {
|
49 | string = string.replace(re, replacement)
|
50 | } else {
|
51 | const promises = []
|
52 | let commonError
|
53 | const t = string.replace(re, (match, ...args) => {
|
54 | commonError = new Error()
|
55 | try {
|
56 | if (this._broke) return match
|
57 | const p = replacement.call(this, match, ...args)
|
58 | if (p instanceof Promise) {
|
59 | promises.push(p)
|
60 | }
|
61 | return p
|
62 | } catch (e) { // hide stack for sync stack traces
|
63 | hideStack(commonError, e)
|
64 | }
|
65 | })
|
66 | if (promises.length) {
|
67 | try { // hide stack only for when throw happens before awaits
|
68 | const data = await Promise.all(promises)
|
69 | string = string.replace(re, () => data.shift())
|
70 | } catch (e) {
|
71 | hideStack(commonError, e)
|
72 | }
|
73 | } else {
|
74 | string = t
|
75 | }
|
76 | }
|
77 | return string
|
78 | }, `${chunk}`)
|
79 |
|
80 | return s
|
81 | }
|
82 | /**
|
83 | * @suppress {checkTypes}
|
84 | * @returns {!Promise}
|
85 | */
|
86 | async _transform(chunk, _, next) {
|
87 | try {
|
88 | const s = await this.reduce(chunk)
|
89 | this.push(s)
|
90 | next()
|
91 | } catch (e) {
|
92 | const s = cleanStack(e.stack)
|
93 | e.stack = s
|
94 | next(e)
|
95 | }
|
96 | }
|
97 | }
|
98 |
|
99 | /**
|
100 | * The class for when serial execution of asynchronous replacements withing the same rule are needed.
|
101 | */
|
102 | class SerialAsyncReplaceable extends Replaceable {
|
103 | /**
|
104 | * @param {!(_restream.Rule|Array<_restream.Rule>)} rules
|
105 | */
|
106 | constructor(rules) {
|
107 | super(rules)
|
108 | this.promise = Promise.resolve()
|
109 | }
|
110 | addItem(fn) {
|
111 | const pp = this.promise.then(fn)
|
112 | this.promise = pp
|
113 | return pp
|
114 | }
|
115 | }
|
116 |
|
117 | /**
|
118 | * @suppress {nonStandardJsDocs}
|
119 | * @typedef {import('..').Rule} _restream.Rule
|
120 | */
|
121 | /**
|
122 | * @suppress {nonStandardJsDocs}
|
123 | * @typedef {import('..').AsyncReplacer} _restream.AsyncReplacer
|
124 | */
|
125 | /**
|
126 | * @suppress {nonStandardJsDocs}
|
127 | * @typedef {import('..').Replacer} _restream.Replacer
|
128 | */
|
129 | /**
|
130 | * @suppress {nonStandardJsDocs}
|
131 | * @typedef {import('stream').TransformOptions} stream.TransformOptions
|
132 | */
|
133 |
|
134 | module.exports = Replaceable
|
135 | module.exports.SerialAsyncReplaceable = SerialAsyncReplaceable |
\ | No newline at end of file |