1 | 'use strict';
|
2 | const {PassThrough} = require('stream');
|
3 | const duplexer3 = require('duplexer3');
|
4 | const is = require('@sindresorhus/is');
|
5 | const requestAsEventEmitter = require('./request-as-event-emitter');
|
6 | const {HTTPError, ReadError} = require('./errors');
|
7 |
|
8 | module.exports = options => {
|
9 | const input = new PassThrough();
|
10 | const output = new PassThrough();
|
11 | const proxy = duplexer3(input, output);
|
12 | const piped = new Set();
|
13 | let isFinished = false;
|
14 |
|
15 | options.gotRetry.retries = () => 0;
|
16 |
|
17 | if (options.body) {
|
18 | proxy.write = () => {
|
19 | throw new Error('Got\'s stream is not writable when the `body` option is used');
|
20 | };
|
21 | }
|
22 |
|
23 | const emitter = requestAsEventEmitter(options);
|
24 |
|
25 | emitter.on('request', request => {
|
26 | proxy.emit('request', request);
|
27 | const uploadComplete = () => {
|
28 | request.emit('upload-complete');
|
29 | };
|
30 |
|
31 | if (is.nodeStream(options.body)) {
|
32 | options.body.once('end', uploadComplete);
|
33 | options.body.pipe(request);
|
34 | return;
|
35 | }
|
36 |
|
37 | if (options.body) {
|
38 | request.end(options.body, uploadComplete);
|
39 | return;
|
40 | }
|
41 |
|
42 | if (options.method === 'POST' || options.method === 'PUT' || options.method === 'PATCH') {
|
43 | input.once('end', uploadComplete);
|
44 | input.pipe(request);
|
45 | return;
|
46 | }
|
47 |
|
48 | request.end(uploadComplete);
|
49 | });
|
50 |
|
51 | emitter.on('response', response => {
|
52 | const {statusCode} = response;
|
53 |
|
54 | response.on('error', error => {
|
55 | proxy.emit('error', new ReadError(error, options));
|
56 | });
|
57 |
|
58 | if (options.throwHttpErrors && statusCode !== 304 && (statusCode < 200 || statusCode > 299)) {
|
59 | proxy.emit('error', new HTTPError(statusCode, response.statusMessage, response.headers, options), null, response);
|
60 | return;
|
61 | }
|
62 |
|
63 | isFinished = true;
|
64 |
|
65 | response.pipe(output);
|
66 |
|
67 | for (const destination of piped) {
|
68 | if (destination.headersSent) {
|
69 | continue;
|
70 | }
|
71 |
|
72 | for (const [key, value] of Object.entries(response.headers)) {
|
73 |
|
74 |
|
75 | const allowed = options.decompress ? key !== 'content-encoding' : true;
|
76 | if (allowed) {
|
77 | destination.setHeader(key, value);
|
78 | }
|
79 | }
|
80 |
|
81 | destination.statusCode = response.statusCode;
|
82 | }
|
83 |
|
84 | proxy.emit('response', response);
|
85 | });
|
86 |
|
87 | [
|
88 | 'error',
|
89 | 'redirect',
|
90 | 'uploadProgress',
|
91 | 'downloadProgress'
|
92 | ].forEach(event => emitter.on(event, (...args) => proxy.emit(event, ...args)));
|
93 |
|
94 | const pipe = proxy.pipe.bind(proxy);
|
95 | const unpipe = proxy.unpipe.bind(proxy);
|
96 | proxy.pipe = (destination, options) => {
|
97 | if (isFinished) {
|
98 | throw new Error('Failed to pipe. The response has been emitted already.');
|
99 | }
|
100 |
|
101 | const result = pipe(destination, options);
|
102 |
|
103 | if (Reflect.has(destination, 'setHeader')) {
|
104 | piped.add(destination);
|
105 | }
|
106 |
|
107 | return result;
|
108 | };
|
109 | proxy.unpipe = stream => {
|
110 | piped.delete(stream);
|
111 | return unpipe(stream);
|
112 | };
|
113 |
|
114 | return proxy;
|
115 | };
|