1 | 'use strict'
|
2 |
|
3 | const zlib = require('zlib')
|
4 | const { pipeline } = require('stream')
|
5 |
|
6 | const decoderFactories = {
|
7 | gzip: zlib.createGunzip,
|
8 | deflate: zlib.createInflate,
|
9 | default: encoding => { throw new Error(`Unsupported encoding: ${encoding}`) }
|
10 | }
|
11 |
|
12 |
|
13 | if (zlib.createBrotliDecompress) {
|
14 | decoderFactories.br = zlib.createBrotliDecompress
|
15 | }
|
16 |
|
17 | function defer () {
|
18 | let done
|
19 | let fail
|
20 | const promise = new Promise((resolve, reject) => {
|
21 | done = resolve
|
22 | fail = reject
|
23 | })
|
24 | return { done, fail, promise }
|
25 | }
|
26 |
|
27 | function selectDecoder (headers) {
|
28 | const encoding = headers['content-encoding']
|
29 | if (encoding && encoding !== 'identity') {
|
30 | const factory = decoderFactories[encoding] || decoderFactories.default
|
31 | return factory(encoding)
|
32 | }
|
33 | }
|
34 |
|
35 | function _getParameters (forEnd, args) {
|
36 | let [data, encoding, callback = () => {}] = args
|
37 | if (forEnd && typeof data === 'function') {
|
38 | callback = data
|
39 | data = undefined
|
40 | encoding = undefined
|
41 | } else if (typeof encoding === 'function') {
|
42 | callback = encoding
|
43 | encoding = undefined
|
44 | }
|
45 | return { data, encoding, callback }
|
46 | }
|
47 |
|
48 | const writeParameters = _getParameters.bind(null, false)
|
49 | const endParameters = _getParameters.bind(null, true)
|
50 |
|
51 | function capture (response, headers, writableStream) {
|
52 | const { done, fail, promise } = defer()
|
53 | const { emit, end, write } = response
|
54 |
|
55 | function release () {
|
56 | response.write = write
|
57 | response.end = end
|
58 | }
|
59 |
|
60 | function onError (error) {
|
61 | fail(error)
|
62 | release()
|
63 | }
|
64 |
|
65 | try {
|
66 | const out = selectDecoder(headers) || writableStream
|
67 |
|
68 | out.on('error', onError)
|
69 |
|
70 | if (out !== writableStream) {
|
71 | writableStream.on('error', onError)
|
72 | pipeline(out, writableStream, () => {
|
73 | writableStream.end()
|
74 | })
|
75 | }
|
76 |
|
77 | response.on('error', onError)
|
78 |
|
79 | writableStream.on('finish', () => done())
|
80 |
|
81 | let writeToOut = true
|
82 |
|
83 | response.write = function () {
|
84 | const { data, encoding, callback } = writeParameters(arguments)
|
85 | let waitForDrain = 0
|
86 | function drained () {
|
87 | if (--waitForDrain === 0) {
|
88 | response.emit = emit
|
89 | response.emit('drain')
|
90 | }
|
91 | }
|
92 |
|
93 | if (writeToOut) {
|
94 | if (!out.write(data, encoding)) {
|
95 | ++waitForDrain
|
96 | out.once('drain', drained)
|
97 | }
|
98 | }
|
99 | if (!write.call(response, data, encoding, callback)) {
|
100 | ++waitForDrain
|
101 | }
|
102 | if (waitForDrain) {
|
103 | response.emit = function (eventName) {
|
104 | if (eventName !== 'drain') {
|
105 | return emit.apply(response, arguments)
|
106 | }
|
107 | drained()
|
108 | }
|
109 | return false
|
110 | }
|
111 | return true
|
112 | }
|
113 |
|
114 | response.end = function () {
|
115 | const { data, encoding, callback } = endParameters(arguments)
|
116 | if (out !== writableStream) {
|
117 | out.end(data, encoding)
|
118 | } else {
|
119 | writableStream.end(data, encoding)
|
120 | }
|
121 | writeToOut = false
|
122 | end.call(response, data, encoding, function () {
|
123 | callback.apply(this, arguments)
|
124 | })
|
125 | return this
|
126 | }
|
127 | } catch (e) {
|
128 | fail(e)
|
129 | }
|
130 | return promise
|
131 | }
|
132 |
|
133 | module.exports = (response, writableStream) => {
|
134 | const { done, fail, promise } = defer()
|
135 | const { writeHead } = response
|
136 | response.writeHead = function (status, headers = {}) {
|
137 | if (status === 200) {
|
138 | done(capture(response, headers, writableStream))
|
139 | } else {
|
140 | fail(new Error('Invalid status'))
|
141 | }
|
142 | writeHead.apply(response, arguments)
|
143 | }
|
144 | return promise
|
145 | }
|