1 | 'use strict'
|
2 |
|
3 | const zlib = require('zlib')
|
4 | const { pipeline } = require('stream')
|
5 |
|
6 | const decoderFactories = {
|
7 | gzip: zlib.createGunzip,
|
8 | deflate: zlib.createInflate,
|
9 | default: encoding => { throw new Error(`Unsupported encoding: ${encoding}`) }
|
10 | }
|
11 |
|
12 |
|
13 | if (zlib.createBrotliDecompress) {
|
14 | decoderFactories.br = zlib.createBrotliDecompress
|
15 | }
|
16 |
|
17 | function defer () {
|
18 | let done
|
19 | let fail
|
20 | const promise = new Promise((resolve, reject) => {
|
21 | done = resolve
|
22 | fail = reject
|
23 | })
|
24 | return { done, fail, promise }
|
25 | }
|
26 |
|
27 | function selectDecoder (headers) {
|
28 | const encoding = headers['content-encoding']
|
29 | if (encoding && encoding !== 'identity') {
|
30 | const factory = decoderFactories[encoding] || decoderFactories.default
|
31 | return factory(encoding)
|
32 | }
|
33 | }
|
34 |
|
35 | function _getParameters (forEnd, args) {
|
36 | let [data, encoding, callback = () => {}] = args
|
37 | if (forEnd && typeof data === 'function') {
|
38 | callback = data
|
39 | data = undefined
|
40 | encoding = undefined
|
41 | } else if (typeof encoding === 'function') {
|
42 | callback = encoding
|
43 | encoding = undefined
|
44 | }
|
45 | return { data, encoding, callback }
|
46 | }
|
47 |
|
48 | const writeParameters = _getParameters.bind(null, false)
|
49 | const endParameters = _getParameters.bind(null, true)
|
50 |
|
51 | function capture (response, headers, writableStream) {
|
52 | const { done, fail, promise } = defer()
|
53 | const { emit, end, write } = response
|
54 |
|
55 | function release () {
|
56 | response.write = write
|
57 | response.end = end
|
58 | }
|
59 |
|
60 | function onError (error) {
|
61 | fail(error)
|
62 | release()
|
63 | }
|
64 |
|
65 | try {
|
66 | const out = selectDecoder(headers) || writableStream
|
67 |
|
68 | out.on('error', onError)
|
69 |
|
70 | if (out !== writableStream) {
|
71 | writableStream.on('error', onError)
|
72 | pipeline(out, writableStream, () => {
|
73 | writableStream.end()
|
74 | })
|
75 | }
|
76 |
|
77 | response.on('error', onError)
|
78 |
|
79 | writableStream.on('finish', () => done())
|
80 |
|
81 | response.write = function () {
|
82 | const { data, encoding, callback } = writeParameters(arguments)
|
83 | let waitForDrain = 0
|
84 | function drained () {
|
85 | if (--waitForDrain === 0) {
|
86 | response.emit = emit
|
87 | response.emit('drain')
|
88 | }
|
89 | }
|
90 | if (!out.write(data, encoding)) {
|
91 | ++waitForDrain
|
92 | out.once('drain', drained)
|
93 | }
|
94 | if (!write.call(response, data, encoding, callback)) {
|
95 | ++waitForDrain
|
96 | }
|
97 | if (waitForDrain) {
|
98 | response.emit = function (eventName) {
|
99 | if (eventName !== 'drain') {
|
100 | return emit.apply(response, arguments)
|
101 | }
|
102 | drained()
|
103 | }
|
104 | return false
|
105 | }
|
106 | return true
|
107 | }
|
108 |
|
109 | response.end = function () {
|
110 | const { data, encoding, callback } = endParameters(arguments)
|
111 | function endWritableStream () {
|
112 | if (out !== writableStream) {
|
113 | out.end()
|
114 | } else {
|
115 | writableStream.end()
|
116 | }
|
117 | }
|
118 | end.call(response, data, encoding, function () {
|
119 | endWritableStream()
|
120 | callback.apply(this, arguments)
|
121 | })
|
122 | return this
|
123 | }
|
124 | } catch (e) {
|
125 | fail(e)
|
126 | }
|
127 | return promise
|
128 | }
|
129 |
|
130 | module.exports = (response, writableStream) => {
|
131 | const { done, fail, promise } = defer()
|
132 | const { writeHead } = response
|
133 | response.writeHead = function (status, headers = {}) {
|
134 | if (status === 200) {
|
135 | done(capture(response, headers, writableStream))
|
136 | } else {
|
137 | fail(new Error('Invalid status'))
|
138 | }
|
139 | writeHead.apply(response, arguments)
|
140 | }
|
141 | return promise
|
142 | }
|