UNPKG

3.77 kBJavaScriptView Raw
1import { Transform } from 'stream'
2import cleanStack from '@artdeco/clean-stack'
3import { checkRule, hideStack } from './lib'
4
5export default class Replaceable extends Transform {
6 /**
7 * Replaceable class that extends Transform and pushes data when it's done replacing each incoming chunk. If the replacement is passed as a function, it will work in the same way as the replacer for `string.replace` method (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace), taking the `match` as the first argument, and matched `p1`, `p2`, _etc_ parameters as following arguments. The replacer can also be an async function.
8 * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules A single replacement rule, or multiple rules.
9 * @param {!stream.TransformOptions} [options] The options for the transform stream.
10 * @example
11 *
12```
13// markdown __ to html <em> implementation
14const stream = replaceStream({
15 re: /__(\S+)__/g,
16 replacement(match, p1) {
17 return `<em>${p1}</em>`
18 },
19})
20```
21 */
22 constructor(rules, options) {
23 super(options)
24 const re = Array.isArray(rules) ? rules : [rules]
25 const fr = re.filter(checkRule)
26 this.rules = fr
27 /**
28 * Whether the _Replaceable_ will not apply any more rules.
29 * @type {boolean}
30 */
31 this._broke = false
32 }
33
34 /**
35 * Stop executing further after the current rule.
36 */
37 brake() {
38 this._broke = true
39 }
40
41 async reduce(chunk) {
42 /** @type {string} */
43 const s = await this.rules.reduce(async (acc, { re, replacement }) => {
44 /** @type {string} */
45 let string = await acc
46 if (this._broke) return string
47
48 if (typeof replacement == 'string') {
49 string = string.replace(re, replacement)
50 } else {
51 const promises = []
52 let commonError
53 const t = string.replace(re, (match, ...args) => {
54 commonError = new Error()
55 try {
56 if (this._broke) return match
57 const p = replacement.call(this, match, ...args)
58 if (p instanceof Promise) {
59 promises.push(p)
60 }
61 return p
62 } catch (e) { // hide stack for sync stack traces
63 hideStack(commonError, e)
64 }
65 })
66 if (promises.length) {
67 try { // hide stack only for when throw happens before awaits
68 const data = await Promise.all(promises)
69 string = string.replace(re, () => data.shift())
70 } catch (e) {
71 hideStack(commonError, e)
72 }
73 } else {
74 string = t
75 }
76 }
77 return string
78 }, `${chunk}`)
79
80 return s
81 }
82 /**
83 * @suppress {checkTypes}
84 * @returns {!Promise}
85 */
86 async _transform(chunk, _, next) {
87 try {
88 const s = await this.reduce(chunk)
89 this.push(s)
90 next()
91 } catch (e) {
92 const s = cleanStack(e.stack)
93 e.stack = s
94 next(e)
95 }
96 }
97}
98
99/**
100 * The class for when serial execution of asynchronous replacements withing the same rule are needed.
101 */
102export class SerialAsyncReplaceable extends Replaceable {
103 /**
104 * @param {!(_restream.Rule|Array<_restream.Rule>)} rules
105 */
106 constructor(rules) {
107 super(rules)
108 this.promise = Promise.resolve()
109 }
110 addItem(fn) {
111 const pp = this.promise.then(fn)
112 this.promise = pp
113 return pp
114 }
115}
116
117/**
118 * @suppress {nonStandardJsDocs}
119 * @typedef {import('..').Rule} _restream.Rule
120 */
121/**
122 * @suppress {nonStandardJsDocs}
123 * @typedef {import('..').AsyncReplacer} _restream.AsyncReplacer
124 */
125/**
126 * @suppress {nonStandardJsDocs}
127 * @typedef {import('..').Replacer} _restream.Replacer
128 */
129/**
130 * @suppress {nonStandardJsDocs}
131 * @typedef {import('stream').TransformOptions} stream.TransformOptions
132 */
\No newline at end of file