UNPKG

5.26 kBJavaScriptView Raw
1import Stream, { Transform } from 'stream'
2import cleanStack from '@artdeco/clean-stack'
3import { collect } from 'catchment'
4import { checkRule, hideStack } from './lib'
5
6export default class Replaceable extends Transform {
7 /**
8 * Replaceable class that extends Transform and pushes data when it's done replacing each incoming chunk. If the replacement is passed as a function, it will work in the same way as the replacer for `string.replace` method (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String/replace), taking the `match` as the first argument, and matched `p1`, `p2`, _etc_ parameters as following arguments. The replacer can also be an async function.
9 * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules A single replacement rule, or multiple rules.
10 * @param {!stream.TransformOptions} [options] The options for the transform stream.
11 * @example
12 *
13```
14// markdown __ to html <em> implementation
15const stream = replaceStream({
16 re: /__(\S+)__/g,
17 replacement(match, p1) {
18 return `<em>${p1}</em>`
19 },
20})
21```
22 */
23 constructor(rules, options) {
24 super(options)
25 const re = Array.isArray(rules) ? rules : [rules]
26 const fr = re.filter(checkRule)
27 this.rules = fr
28 /**
29 * Whether the _Replaceable_ will not apply any more rules.
30 * @type {boolean}
31 */
32 this._broke = false
33 this._options = options
34 }
35
36 /**
37 * Creates a new replaceable to replace the given string, buffer or stream using the rules of the current stream. Calling `brake` will also set `_broke` on the parent stream.
38 * @param {string|!Buffer|!Stream} input The input to replace.
39 * @param {!Object} [context] The context to assign to the new replaceable.
40 */
41 async replace(input, context) {
42 const replaceable = new Replaceable(this.rules, this._options)
43 if (context) Object.assign(replaceable, context)
44 const res = await Replaceable.replace(replaceable, input)
45 if (replaceable._broke) this.brake()
46 if (context) Object.keys(context).forEach(key => {
47 context[key] = replaceable[key]
48 })
49 return res
50 }
51 /**
52 * The method to end the stream with given data, and collect the result.
53 * @param {!Replaceable} replaceable
54 * @param {string|!Buffer|!Stream} input The data or stream to end the _Replaceable_ with.
55 */
56 static async replace(replaceable, input) {
57 if (input instanceof Stream) {
58 input.pipe(replaceable)
59 } else {
60 replaceable.end(input)
61 }
62 /** @type {string} */
63 const data = await collect(replaceable)
64 return data
65 }
66
67 /**
68 * Stop executing further after the current rule.
69 */
70 brake() {
71 this._broke = true
72 }
73
74 /**
75 * @param {string|!Buffer} chunk The incoming chunk, or an object if the stream was started in object mode.
76 */
77 async reduce(chunk) {
78 /** @type {string} */
79 const s = await this.rules.reduce(async (acc, { re, replacement }) => {
80 /** @type {string} */
81 let string = await acc
82 if (this._broke) return string
83
84 if (typeof replacement == 'string') {
85 string = string.replace(re, replacement)
86 } else {
87 const promises = []
88 let commonError
89 const t = string.replace(re, (match, ...args) => {
90 commonError = new Error()
91 try {
92 if (this._broke) {
93 if (promises.length)
94 return promises.push(Promise.resolve(match))
95 return match
96 }
97 const p = replacement.call(this, match, ...args)
98 if (p instanceof Promise) {
99 promises.push(p)
100 }
101 return p
102 } catch (e) { // hide stack for sync stack traces
103 hideStack(commonError, e)
104 }
105 })
106 if (promises.length) {
107 try { // hide stack only for when throw happens before awaits
108 const data = await Promise.all(promises)
109 string = string.replace(re, () => data.shift())
110 } catch (e) {
111 hideStack(commonError, e)
112 }
113 } else {
114 string = t
115 }
116 }
117 return string
118 }, `${chunk}`)
119
120 return s
121 }
122 /**
123 * @suppress {checkTypes}
124 * @returns {!Promise}
125 */
126 async _transform(chunk, _, next) {
127 try {
128 const s = await this.reduce(chunk)
129 this.push(s)
130 next()
131 } catch (e) {
132 const s = cleanStack(e.stack)
133 e.stack = s
134 next(e)
135 }
136 }
137}
138
139/**
140 * The class for when serial execution of asynchronous replacements withing the same rule are needed.
141 */
142export class SerialAsyncReplaceable extends Replaceable {
143 /**
144 * @param {!(_restream.Rule|Array<!_restream.Rule>)} rules
145 */
146 constructor(rules) {
147 super(rules)
148 this.promise = Promise.resolve()
149 }
150 addItem(fn) {
151 const pp = this.promise.then(fn)
152 this.promise = pp
153 return pp
154 }
155}
156
157/**
158 * @suppress {nonStandardJsDocs}
159 * @typedef {import('.').Rule} _restream.Rule
160 */
161/**
162 * @suppress {nonStandardJsDocs}
163 * @typedef {import('.').AsyncReplacer} _restream.AsyncReplacer
164 */
165/**
166 * @suppress {nonStandardJsDocs}
167 * @typedef {import('.').Replacer} _restream.Replacer
168 */
169/**
170 * @suppress {nonStandardJsDocs}
171 * @typedef {import('stream').TransformOptions} stream.TransformOptions
172 */
\No newline at end of file