1 | 'use strict'
|
2 |
|
3 | const zlib = require('zlib')
|
4 | const { pipeline } = require('stream')
|
5 | const defer = require('./defer')
|
6 |
|
7 | const decoderFactories = {
|
8 | gzip: zlib.createGunzip,
|
9 | deflate: zlib.createInflate,
|
10 | default: encoding => { throw new Error(`Unsupported encoding: ${encoding}`) }
|
11 | }
|
12 |
|
13 |
|
14 | if (zlib.createBrotliDecompress) {
|
15 | decoderFactories.br = zlib.createBrotliDecompress
|
16 | }
|
17 |
|
18 | function selectDecoder (headers) {
|
19 | const encoding = headers['content-encoding']
|
20 | if (encoding && encoding !== 'identity') {
|
21 | const factory = decoderFactories[encoding] || decoderFactories.default
|
22 | return factory(encoding)
|
23 | }
|
24 | }
|
25 |
|
26 | function _getParameters (forEnd, args) {
|
27 | let [data, encoding, callback = () => {}] = args
|
28 | if (forEnd && typeof data === 'function') {
|
29 | callback = data
|
30 | data = undefined
|
31 | encoding = undefined
|
32 | } else if (typeof encoding === 'function') {
|
33 | callback = encoding
|
34 | encoding = undefined
|
35 | }
|
36 | return { data, encoding, callback }
|
37 | }
|
38 |
|
39 | const writeParameters = _getParameters.bind(null, false)
|
40 | const endParameters = _getParameters.bind(null, true)
|
41 |
|
42 | function capture (response, headers, writableStream) {
|
43 | const [promise, done, fail] = defer()
|
44 | const { emit, end, write } = response
|
45 |
|
46 | function release () {
|
47 | response.write = write
|
48 | response.end = end
|
49 | }
|
50 |
|
51 | function onError (error) {
|
52 | fail(error)
|
53 | release()
|
54 | }
|
55 |
|
56 | try {
|
57 | const out = selectDecoder(headers) || writableStream
|
58 |
|
59 | out.on('error', onError)
|
60 |
|
61 | if (out !== writableStream) {
|
62 | writableStream.on('error', onError)
|
63 | pipeline(out, writableStream, () => {
|
64 | writableStream.end()
|
65 | })
|
66 | }
|
67 |
|
68 | response.on('error', onError)
|
69 |
|
70 | writableStream.on('finish', () => done())
|
71 |
|
72 | let writeToOut = true
|
73 |
|
74 | response.write = function () {
|
75 | const { data, encoding, callback } = writeParameters(arguments)
|
76 | let waitForDrain = 0
|
77 | function drained () {
|
78 | if (--waitForDrain === 0) {
|
79 | response.emit = emit
|
80 | response.emit('drain')
|
81 | }
|
82 | }
|
83 |
|
84 | if (writeToOut) {
|
85 | if (!out.write(data, encoding)) {
|
86 | ++waitForDrain
|
87 | out.once('drain', drained)
|
88 | }
|
89 | }
|
90 | if (!write.call(response, data, encoding, callback)) {
|
91 | ++waitForDrain
|
92 | }
|
93 | if (waitForDrain) {
|
94 | response.emit = function (eventName) {
|
95 | if (eventName !== 'drain') {
|
96 | return emit.apply(response, arguments)
|
97 | }
|
98 | drained()
|
99 | }
|
100 | return false
|
101 | }
|
102 | return true
|
103 | }
|
104 |
|
105 | response.end = function () {
|
106 | const { data, encoding, callback } = endParameters(arguments)
|
107 | if (out !== writableStream) {
|
108 | out.end(data, encoding)
|
109 | } else {
|
110 | writableStream.end(data, encoding)
|
111 | }
|
112 | writeToOut = false
|
113 | end.call(response, data, encoding, function () {
|
114 | callback.apply(this, arguments)
|
115 | })
|
116 | return this
|
117 | }
|
118 | } catch (e) {
|
119 | fail(e)
|
120 | }
|
121 | return promise
|
122 | }
|
123 |
|
124 | module.exports = (response, writableStream) => {
|
125 | const [promise, done, fail] = defer()
|
126 | const { writeHead } = response
|
127 | response.writeHead = function (status, headers = {}) {
|
128 | if (status === 200) {
|
129 | done(capture(response, headers, writableStream))
|
130 | } else {
|
131 | fail(new Error('Invalid status'))
|
132 | }
|
133 | writeHead.apply(response, arguments)
|
134 | }
|
135 | return promise
|
136 | }
|