1 | Stream = require('stream')
|
2 | Zlib = require('zlib')
|
3 |
|
4 | Request = require('request')
|
5 | SocketIoStream = require('socket.io-stream')
|
6 |
|
7 | Base = require('../base')
|
8 | Logger = require('./logger')
|
9 |
|
10 |
|
11 | debug = Base.logger('http_proxy')
|
12 |
|
13 | PROXY_HTTP_HOST_USE_PROXY = 'USE_PROXY'
|
14 |
|
15 | upstreamUrl = null
|
16 | isUpstreamHttps = null
|
17 |
|
18 |
|
19 | isRewriteHostOn = null
|
20 | isRewritePostBodyOn = null
|
21 | upstreamHost = null
|
22 | upstreamProto = null
|
23 | upstreamHostReStr = null
|
24 | upstreamHostRe = null
|
25 | upstreamDomainCookieRe = null
|
26 | rewriteProtoDelimReStr = ":\\/\\/|:\\\\\\/\\\\\\/|%3A%2F%2F"
|
27 | rewriteCustomReplacers = null
|
28 | rewriteHeadersCustomReplacers = null
|
29 | rewriteUseRealDeflate = null
|
30 | rewriteResponseDomainsReStr = null
|
31 | rewriteResponseDomainsRe = null
|
32 | rewriteResponseDomainsBase = null
|
33 | rewriteResponseDomainsCookieRe = null
|
34 | proxyableDomainsRe = null
|
35 | rewriteCustomGenerator = null
|
36 |
|
37 |
|
38 | exports.setupHttpProxy = (upstream) ->
|
39 | upstreamUrl = upstream || Base.env.IX_HTTP_PROXY
|
40 | upstreamUrl = upstreamUrl?.trim().replace(/\/$/, '')
|
41 |
|
42 | rewriteResponseDomainsReStr = Base.env.IX_PROXY_REWRITE_RESPONSE_DOMAINS
|
43 | if rewriteResponseDomainsReStr
|
44 | upstreamUrl ||= ''
|
45 | rewriteResponseDomainsReStr = "[-0-9A-Za-z]+(?:\\.[-0-9A-Za-z]+)+" if rewriteResponseDomainsReStr.toLowerCase() == 'on'
|
46 | console.log("Rewriting response domains matching /#{rewriteResponseDomainsReStr}/")
|
47 | rewriteResponseDomainsRe = new RegExp("((?:https?(?::|%3A))?(?://|\\/\\/|%2F%2F))(#{rewriteResponseDomainsReStr})((?:[^-.\\w]|$)[-\\w.~:/?#\\[\\]@!$&'()*+,;=%]*)", 'gi')
|
48 | rewriteResponseDomainsCookieRe = new RegExp("(;\\s+domain=\\s*\\.?)(#{rewriteResponseDomainsReStr})(\\s|;|$)", "ig")
|
49 | proxyableDomainsRe = new RegExp("^#{rewriteResponseDomainsReStr}$")
|
50 | rewriteResponseDomainsBase = (Base.env.IX_PROXY_REWRITE_RESPONSE_DOMAINS_BASE || "#{Base.env.FAAS_API_KEY}.http-proxy-cert.faas.io").toLowerCase()
|
51 | throw new Error("No upstream server defined.") unless upstreamUrl || rewriteResponseDomainsRe
|
52 | debug('setupHttpProxy upstreamUrl', upstreamUrl)
|
53 | isUpstreamHttps = upstreamUrl.slice(0,6) == 'https:'
|
54 | isRewriteHostOn = Base.env.IX_PROXY_REWRITE_HOST?.toLowerCase() == 'on'
|
55 | isRewritePostBodyOn = Base.env.IX_PROXY_REWRITE_POST_BODY?.toLowerCase() == 'on'
|
56 | isRewriteHostOn ||= rewriteResponseDomainsRe?
|
57 | if isRewriteHostOn || isRewritePostBodyOn
|
58 | upstreamHost = upstreamUrl.replace(/^https?:\/\//i,'')
|
59 | upstreamProto = if isUpstreamHttps then 'https' else 'http'
|
60 | rewriteUseRealDeflate = Base.env.IX_PROXY_REWRITE_REAL_DEFLATE?.toLowerCase() == 'on'
|
61 | upstreamHostReStr = regexEscape(upstreamHost)
|
62 | upstreamHostRe = new RegExp("#{upstreamHostReStr}\\b", "ig")
|
63 | upstreamDomainCookieRe = new RegExp("(;\\s+domain=\\s*\\.?)#{upstreamHostReStr}(\\s|;|$)", "ig")
|
64 | if Base.env.IX_PROXY_REWRITE_PROTO_DELIM_REGEX
|
65 | rewriteProtoDelimReStr += "|"+Base.env.IX_PROXY_REWRITE_PROTO_DELIM_REGEX
|
66 | customRes = Base.env.IX_PROXY_REWRITE_CUSTOM_REGEXES
|
67 | if customRes
|
68 | console.log("Parsing IX_PROXY_REWRITE_CUSTOM_REGEXES: #{customRes}")
|
69 | replacers = JSON.parse(customRes)
|
70 | rewriteCustomReplacers = []
|
71 | for replacer in replacers
|
72 | flags = "g"
|
73 | flags += replacer[2] if replacer.length > 2
|
74 | rewriteCustomReplacers.push([new RegExp(replacer[0], flags), replacer[1]])
|
75 | debug('rewriteCustomReplacers', rewriteCustomReplacers)
|
76 | customHeaderRes = Base.env.IX_PROXY_REWRITE_HEADERS_REGEXES
|
77 | if customHeaderRes
|
78 | console.log("Parsing IX_PROXY_REWRITE_HEADERS_REGEXES: #{customHeaderRes}")
|
79 | replacers = JSON.parse(customHeaderRes)
|
80 | rewriteHeadersCustomReplacers = []
|
81 | for replacer in replacers
|
82 |
|
83 | flags = "g"
|
84 | flags += replacer[3] if replacer.length > 3
|
85 | rewriteHeadersCustomReplacers.push([new RegExp(replacer[0]), new RegExp(replacer[1], flags), replacer[2]])
|
86 | debug('rewriteHeadersCustomReplacers', rewriteHeadersCustomReplacers)
|
87 |
|
88 | rewriteCustomModuleFile = Base.env.IX_PROXY_REWRITE_MODULE
|
89 | if rewriteCustomModuleFile
|
90 | rewriteCustomModuleFile = process.cwd()+'/'+rewriteCustomModuleFile
|
91 | console.log("Loading IX_PROXY_REWRITE_MODULE: '#{rewriteCustomModuleFile}'")
|
92 | try
|
93 | rewriteCustomModule = require(rewriteCustomModuleFile)
|
94 | catch e
|
95 | console.error(e)
|
96 | throw new Error("Error encountered while trying to load IX_PROXY_REWRITE_MODULE: "+("#{e.stack}".split("\n").slice(0,2).join("\n")||"#{e}"))
|
97 | rewriteCustomGenerator = rewriteCustomModule.newRewriter
|
98 | if typeof rewriteCustomGenerator == 'function'
|
99 | console.log("IX_PROXY_REWRITE_MODULE newRewriter() function loaded.")
|
100 | else
|
101 | throw new Error("IX_PROXY_REWRITE_MODULE did not export newRewriter() function")
|
102 |
|
103 |
|
104 | exports.onHttpProxy = (faasReq, raw_cb_proxyResponse) ->
|
105 | try
|
106 | proxyResponse = {}
|
107 | debug('onHttpProxy.faasReq.headers', faasReq?.headers)
|
108 | faasReq.origHost = faasReq.headers.host
|
109 | faasReq.origProto = faasReq.headers['x-forwarded-proto']
|
110 | Logger.logHttpRequest(faasReq)
|
111 | cb_proxyResponse = (pr) ->
|
112 | debug('cb_proxyResponse', { headers: pr?.headers, lag: pr?.lag, statusCode: pr?.statusCode, statusMessage: pr?.statusMessage, error: pr?.error })
|
113 |
|
114 | raw_cb_proxyResponse(pr)
|
115 | upOrigin = upstreamUrl
|
116 | if rewriteResponseDomainsRe
|
117 | hostCut = faasReq.origHost.indexOf("--#{rewriteResponseDomainsBase}")
|
118 | if hostCut > 0
|
119 | newHost = faasReq.origHost.slice(0,hostCut)
|
120 | newProto = 'https'
|
121 | if newHost.slice(0,6) == 'http--'
|
122 | newHost = newHost.slice(6)
|
123 | newProto = 'http'
|
124 | newHost = newHost.replace(/--/g,'*').replace(/-/g,'.').replace(/\*/g,'-')
|
125 | throw new Error("Host is not proxyable: #{newHost} does not match #{proxyableDomainsRe}") unless proxyableDomainsRe.test(newHost)
|
126 | upOrigin = "#{newProto}://#{newHost}"
|
127 | throw new Error("No upstream server defined (CASB).") unless upOrigin
|
128 | url = upOrigin + faasReq.url
|
129 | fwdOptions =
|
130 | url: url
|
131 | headers: faasReq.headers
|
132 | method: faasReq.method
|
133 | followRedirect: false
|
134 | httpHost = Base.env.IX_PROXY_HTTP_HOST
|
135 | if isUpstreamHttps || httpHost == PROXY_HTTP_HOST_USE_PROXY
|
136 | delete fwdOptions.headers.host
|
137 | if httpHost && httpHost != PROXY_HTTP_HOST_USE_PROXY
|
138 | fwdOptions.headers.host = httpHost
|
139 | if isRewriteHostOn
|
140 | delete fwdOptions.headers.host
|
141 | delete fwdOptions.headers['x-forwarded-host']
|
142 | delete fwdOptions.headers['x-forwarded-server']
|
143 | fwdReq = null
|
144 | reqStream = faasReq.requestStream
|
145 | reqCounter = new Stream.Transform()
|
146 | reqCounter.numBytes = 0
|
147 | reqCounter._transform = (chunk, encoding, callback) ->
|
148 | debug('request chunk', encoding, chunk)
|
149 |
|
150 | reqCounter.numBytes += chunk.length
|
151 | callback(null, chunk)
|
152 | reqStream = safePipe(reqStream, reqCounter, 'reqCounter')
|
153 | if isRewritePostBodyOn && (fwdOptions.headers['content-length'] || fwdOptions.headers['content-type'] || fwdOptions.headers['transfer-encoding']) && !rewriteResponseDomainsRe?
|
154 | ct = fwdOptions.headers['content-type']?.toLowerCase() || ''
|
155 | if ct.match(/\bapplication\/x-www-form-urlencoded\b|\bmultipart\/form-data\b|\bjson\b|\btext\/|\bxml\b/i)
|
156 | delete fwdOptions.headers['content-length']
|
157 | fwdOptions.headers['transfer-encoding'] = 'chunked'
|
158 | reqStream = safePipe(reqStream, newRequestRewriterStream(faasReq.origHost, faasReq.origProto), 'RequestRewriter')
|
159 | reqCustomSetup = { url: fwdOptions.url, method: fwdOptions.method, followRedirect: fwdOptions.followRedirect }
|
160 | rewriteCustomInstance = rewriteCustomGenerator?(fwdOptions.headers, reqCustomSetup)
|
161 | if rewriteCustomInstance
|
162 | if rewriteCustomInstance.rewriteRequestHeaders
|
163 | rewriteCustomInstance.rewriteRequestHeaders(fwdOptions.headers, reqCustomSetup)
|
164 | fwdOptions.url = reqCustomSetup.url
|
165 | fwdOptions.method = reqCustomSetup.method
|
166 | fwdOptions.followRedirect = reqCustomSetup.followRedirect
|
167 | customReqTransform = safeTransform(rewriteCustomInstance.newRequestStreamTransform?(), 'IX_PROXY_REWRITE_MODULE#newRequestStreamTransform')
|
168 | reqStream = safePipe(reqStream, customReqTransform, 'rewriteCustomInstance.request') if customReqTransform
|
169 | debug('request', fwdOptions)
|
170 | fwdReq = Request(fwdOptions)
|
171 | catch e
|
172 |
|
173 | proxyResponse.statusCode = 500
|
174 | proxyResponse.error = e.toString()+"\n"
|
175 | Logger.logHttpResponseError(faasReq, proxyResponse)
|
176 | cb_proxyResponse(proxyResponse)
|
177 | faasReq.requestStream.on('data', (data) -> debug('fwdReq.error data', data) )
|
178 |
|
179 | return
|
180 | responseStarted = false
|
181 | responseStream = SocketIoStream.createStream()
|
182 | proxyResponse.responseStream = responseStream
|
183 | faasReq.responseBytes = 0
|
184 | respCounter = new Stream.Transform()
|
185 | respCounter._transform = (chunk, encoding, callback) ->
|
186 | debug('response chunk', encoding, chunk)
|
187 | faasReq.responseBytes += chunk.length
|
188 | callback(null, chunk)
|
189 | fwdReq.on 'error', (err) ->
|
190 | debug('fwdReq.error', err, proxyResponse)
|
191 | if responseStarted
|
192 | console.error('duplicate ERROR response attempted for '+url)
|
193 | responseStream.emit('error', err)
|
194 | else
|
195 | responseStarted = true
|
196 | proxyResponse.error = err.toString()+"\n"
|
197 | cb_proxyResponse(proxyResponse)
|
198 | try
|
199 | fwdReq.abort()
|
200 | fwdReq.destroy()
|
201 | faasReq.requestStream.unpipe()
|
202 | faasReq.requestStream.on('data', (data) -> debug('fwdReq.error data', data) )
|
203 | responseStream.end()
|
204 | catch e
|
205 | console.error(e)
|
206 | Logger.logHttpResponseError(faasReq, proxyResponse)
|
207 |
|
208 | fwdReq.on 'response', (response) ->
|
209 | Logger.logHttpResponseStart(faasReq, response)
|
210 | debug('fwdReq.response', response.headers, response.statusCode, response.statusMessage)
|
211 |
|
212 | if responseStarted
|
213 | console.error('duplicate SUCCESS response attempted for '+url)
|
214 | return
|
215 | responseStarted = true
|
216 | proxyResponse.lag = faasReq.ts.responseStart - faasReq.ts.requestStart
|
217 | proxyResponse.headers = response.headers
|
218 | proxyResponse.statusCode = response.statusCode
|
219 | proxyResponse.statusMessage = response.statusMessage
|
220 | stream = fwdReq
|
221 | try
|
222 | if isRewriteHostOn || rewriteCustomInstance?.rewriteResponseHeaders || rewriteCustomInstance?.newResponseStreamTransform
|
223 |
|
224 | rewriteHeaders(proxyResponse.headers, faasReq.origHost, faasReq.origProto) if isRewriteHostOn
|
225 | if rewriteCustomInstance?.rewriteResponseHeaders
|
226 | respCustomStatus = { statusCode: proxyResponse.statusCode, statusMessage: proxyResponse.statusMessage }
|
227 | rewriteCustomInstance.rewriteResponseHeaders(proxyResponse.headers, respCustomStatus)
|
228 | proxyResponse.statusCode = respCustomStatus.statusCode
|
229 | proxyResponse.statusMessage = respCustomStatus.statusMessage
|
230 | ce = proxyResponse.headers['content-encoding']?.toLowerCase()
|
231 | isGzip = (ce == 'gzip' || ce == 'x-gzip')
|
232 | isDeflate = (ce == 'deflate')
|
233 | ct = proxyResponse.headers['content-type']?.toLowerCase() || ''
|
234 | canRewrite = (ct.indexOf('text/') > -1 || ct.indexOf('javascript') > -1 || ct.indexOf('js') > -1)
|
235 | canRewrite &&= (!ce || ce == 'identity' || isGzip || isDeflate)
|
236 | customRespTransform = safeTransform(rewriteCustomInstance.newResponseStreamTransform(canRewrite), 'IX_PROXY_REWRITE_MODULE#newResponseStreamTransform') if rewriteCustomInstance?.newResponseStreamTransform
|
237 | if canRewrite && (isRewriteHostOn || customRespTransform)
|
238 |
|
239 |
|
240 |
|
241 |
|
242 | delete proxyResponse.headers['content-length']
|
243 | proxyResponse.headers['transfer-encoding'] = 'chunked'
|
244 | delete proxyResponse.headers['accept-ranges']
|
245 | if isGzip
|
246 | stream = safePipe(stream, Zlib.createGunzip(), 'Gunzip')
|
247 | else if isDeflate
|
248 |
|
249 |
|
250 |
|
251 |
|
252 | if rewriteUseRealDeflate
|
253 | stream = safePipe(stream, Zlib.createInflate(), 'Inflate')
|
254 | else
|
255 | stream = safePipe(stream, Zlib.createInflateRaw(), 'InflateRaw')
|
256 | stream = safePipe(stream, newRewriterStream(faasReq.origHost, faasReq.origProto), 'Rewriter') if isRewriteHostOn
|
257 | stream = safePipe(stream, customRespTransform, 'rewriteCustomInstance.response') if customRespTransform
|
258 | if isGzip
|
259 | stream = safePipe(stream, Zlib.createGzip(), 'Gzip')
|
260 | else if isDeflate
|
261 | if rewriteUseRealDeflate
|
262 | stream = safePipe(stream, Zlib.createDeflate(), 'Deflate')
|
263 | else
|
264 | stream = safePipe(stream, Zlib.createDeflateRaw(), 'DeflateRaw')
|
265 | else
|
266 | stream = safePipe(stream, customRespTransform, 'rewriteCustomInstance.response') if customRespTransform
|
267 | catch e
|
268 |
|
269 | console.error(e)
|
270 | proxyResponse.error = e.toString()+"\n"
|
271 | proxyResponse.statusCode = 500
|
272 | proxyResponse.statusMessage = 'Internal Server Error'
|
273 | cb_proxyResponse(proxyResponse)
|
274 | |
275 |
|
276 |
|
277 |
|
278 |
|
279 | if stream == fwdReq
|
280 | stream.pipe(respCounter).pipe(responseStream)
|
281 | else
|
282 | safePipe(safePipe(stream, respCounter, 'respCounter'), responseStream, 'responseStream')
|
283 |
|
284 | fwdReq.on 'end', ->
|
285 | Logger.logHttpResponseEnd(faasReq)
|
286 |
|
287 | reqStream.pipe(fwdReq)
|
288 |
|
289 | newRewriterStream = (host, proto) ->
|
290 | queue = ''
|
291 | changeProtoOriginsRe = null
|
292 | upProto = upstreamProto
|
293 | if rewriteResponseDomainsRe && host.indexOf("--#{rewriteResponseDomainsBase}") > 0
|
294 | upProto = if host.slice(0,6) == 'http--' then 'http' else 'https'
|
295 | if upProto != proto
|
296 | hostsReStr = regexEscape(host)
|
297 | if Base.env.IX_PROXY_REWRITE_PROTO_HOSTS_REGEX
|
298 | hostsReStr += "|"+Base.env.IX_PROXY_REWRITE_PROTO_HOSTS_REGEX
|
299 | changeProtoOriginsRe = new RegExp("#{upProto}(#{rewriteProtoDelimReStr})(#{hostsReStr})\\b", "ig")
|
300 | debug('changeProtoOriginsRe', changeProtoOriginsRe)
|
301 | rewriter = new Stream.Transform()
|
302 | rewriter._transform = (chunk, encoding, callback) ->
|
303 | debug('rewriter chunk', encoding, chunk)
|
304 | queue += chunk.toString('utf8')
|
305 | queue = queue.replace(upstreamHostRe, host) if upstreamUrl
|
306 | queue = queue.replace(changeProtoOriginsRe, "#{proto}$1$2") if changeProtoOriginsRe
|
307 | if rewriteCustomReplacers
|
308 | for replacer in rewriteCustomReplacers
|
309 | queue = queue.replace(replacer[0], replacer[1])
|
310 | queue = rewriteResponseDomains(queue) if rewriteResponseDomainsRe
|
311 | if queue.length > 100
|
312 | splitPoint = queue.length - 100
|
313 | oldChunk = queue.slice(0, splitPoint)
|
314 | queue = queue.slice(splitPoint)
|
315 | rewriter.push(new Buffer(oldChunk, 'utf8'))
|
316 | callback()
|
317 | rewriter._flush = (callback) ->
|
318 | debug('rewriter flush')
|
319 | rewriter.push(new Buffer(queue, 'utf8'))
|
320 | callback()
|
321 | rewriter
|
322 |
|
323 | newRequestRewriterStream = (host, proto) ->
|
324 | queue = ''
|
325 | isBinary = false
|
326 | downstreamHostReStr = regexEscape(host)
|
327 | if upstreamProto != proto
|
328 | changeProtoOriginsRe = new RegExp("#{proto}(#{rewriteProtoDelimReStr})(#{downstreamHostReStr})\\b", "ig")
|
329 | debug('changeProtoOriginsRe', changeProtoOriginsRe)
|
330 | downstreamHostRe = new RegExp("#{downstreamHostReStr}\\b", "ig")
|
331 | debug('downstreamHostRe', downstreamHostRe)
|
332 | rewriter = new Stream.Transform()
|
333 | rewriter._transform = (chunk, encoding, callback) ->
|
334 | debug('requestRewriter chunk', encoding, chunk)
|
335 | if isBinary
|
336 | rewriter.push(chunk)
|
337 | callback()
|
338 | return
|
339 | chunkStr = chunk.toString()
|
340 | if chunk.equals(new Buffer(chunkStr))
|
341 | queue += chunkStr
|
342 |
|
343 | if upstreamProto != proto
|
344 | queue = queue.replace(changeProtoOriginsRe, "#{upstreamProto}$1$2")
|
345 | queue = queue.replace(downstreamHostRe, upstreamHost)
|
346 | if queue.length > 100
|
347 | splitPoint = queue.length - 100
|
348 | oldChunk = queue.slice(0, splitPoint)
|
349 | queue = queue.slice(splitPoint)
|
350 | rewriter.push(new Buffer(oldChunk))
|
351 | callback()
|
352 | else
|
353 |
|
354 | debug('requestRewriter switch to binary mode')
|
355 | isBinary = true
|
356 | rewriter.push(new Buffer(queue)) if queue.length > 0
|
357 | rewriter._transform(chunk, encoding, callback)
|
358 | rewriter._flush = (callback) ->
|
359 | debug('requestRewriter flush')
|
360 | rewriter.push(new Buffer(queue)) unless isBinary
|
361 | callback()
|
362 | rewriter
|
363 |
|
364 | regexEscape = (str) ->
|
365 |
|
366 | str.replace(/([.*+?^=!:${}()|\[\]\/\\])/g, "\\$1")
|
367 |
|
368 | rewriteHeaders = (headers, host, proto) ->
|
369 |
|
370 | if headers.location
|
371 | if upstreamUrl
|
372 | headers.location = headers.location.replace(upstreamHostRe, host)
|
373 | if upstreamProto != proto
|
374 | hostReStr = regexEscape(host)
|
375 | headers.location = headers.location.replace(new RegExp("#{upstreamProto}(#{rewriteProtoDelimReStr})(#{hostReStr})\\b", "ig"), "#{proto}$1$2")
|
376 | headers.location = rewriteResponseDomains(headers.location)
|
377 |
|
378 | if headers['set-cookie']
|
379 | for val, i in headers['set-cookie']
|
380 | val = val.replace(upstreamDomainCookieRe, "$1#{host}$2") if upstreamUrl
|
381 | headers['set-cookie'][i] = rewriteResponseDomains(val, true)
|
382 |
|
383 | if rewriteHeadersCustomReplacers
|
384 | for replacer in rewriteHeadersCustomReplacers
|
385 | nameRe = replacer[0]
|
386 | search = replacer[1]
|
387 | replace = replacer[2]
|
388 | for own name, value of headers
|
389 | if name.match(nameRe)
|
390 | if Array.isArray(value)
|
391 | for val, i in value
|
392 | value[i] = rewriteResponseDomains(val.replace(search, replace))
|
393 | else
|
394 | headers[name] = rewriteResponseDomains(value.replace(search, replace))
|
395 |
|
396 | rewriteResponseDomains = (str, forCookie) ->
|
397 | return str unless rewriteResponseDomainsRe
|
398 | re = if forCookie then rewriteResponseDomainsCookieRe else rewriteResponseDomainsRe
|
399 | str.replace re, (replaceCbArgs...) ->
|
400 |
|
401 | match = replaceCbArgs[0]
|
402 | before = replaceCbArgs[1]
|
403 | domain = replaceCbArgs[2]
|
404 | after = replaceCbArgs[replaceCbArgs.length-3]
|
405 | debug("rewriteResponseDomains match", match)
|
406 | replacement = match
|
407 | unless domain == rewriteResponseDomainsBase || domain.indexOf("--#{rewriteResponseDomainsBase}") > 0
|
408 | newDomain = domain.replace(/-/g,'--').replace(/\./g,'-')+'--'+rewriteResponseDomainsBase
|
409 | if !forCookie && before.match(/^http\b/i)
|
410 | newDomain = "http--#{newDomain}"
|
411 | replacement = before+newDomain
|
412 | replacement += after
|
413 | replacement
|
414 |
|
415 | safePipe = (inpipe, outpipe, lbl) ->
|
416 | outpipe.label = lbl
|
417 | label = "stream.#{inpipe.label}"
|
418 | inpipe.pipe(outpipe)
|
419 | inpipe.on 'error', (err) ->
|
420 | console.error(["#{label}.error", err])
|
421 | |
422 |
|
423 |
|
424 |
|
425 |
|
426 | try outpipe.end() catch e then debug("#{label}.end.error", e)
|
427 | try inpipe.unpipe(outpipe) catch e then debug("#{label}.unpipe.error", e)
|
428 | try inpipe.on('data', -> debug("#{label}.data", data)) catch e then debug("#{label}.data.error", e)
|
429 | inpipe.on('end', -> debug("#{label}.end"))
|
430 | outpipe
|
431 |
|
432 | safeTransform = (transform, lbl) ->
|
433 | return transform unless transform
|
434 | handleError = (e, callback) ->
|
435 | errMsg = "Error during transform #{lbl}: "+("#{e.stack}".split("\n").slice(0,2).join(" ")||"#{e}")
|
436 | console.error(errMsg)
|
437 | transform.push("[#{errMsg}]")
|
438 | callback()
|
439 | origTransform = transform._transform
|
440 | transform._transform = (chunk, encoding, callback) ->
|
441 | try
|
442 | origTransform.call(transform, chunk, encoding, callback)
|
443 | catch e
|
444 | handleError(e, callback)
|
445 | origFlush = transform._flush
|
446 | transform._flush = (callback) ->
|
447 | try
|
448 | origFlush.call(transform, callback)
|
449 | catch e
|
450 | handleError(e, callback)
|
451 | transform
|