1 | 'use strict';
|
2 | const {PassThrough} = require('stream');
|
3 | const duplexer3 = require('duplexer3');
|
4 | const requestAsEventEmitter = require('./request-as-event-emitter');
|
5 | const {HTTPError, ReadError} = require('./errors');
|
6 |
|
7 | module.exports = options => {
|
8 | const input = new PassThrough();
|
9 | const output = new PassThrough();
|
10 | const proxy = duplexer3(input, output);
|
11 | const piped = new Set();
|
12 | let isFinished = false;
|
13 |
|
14 | options.retry.retries = () => 0;
|
15 |
|
16 | if (options.body) {
|
17 | proxy.write = () => {
|
18 | throw new Error('Got\'s stream is not writable when the `body` option is used');
|
19 | };
|
20 | }
|
21 |
|
22 | const emitter = requestAsEventEmitter(options, input);
|
23 |
|
24 |
|
25 | proxy._destroy = emitter.abort;
|
26 |
|
27 | emitter.on('response', response => {
|
28 | const {statusCode} = response;
|
29 |
|
30 | response.on('error', error => {
|
31 | proxy.emit('error', new ReadError(error, options));
|
32 | });
|
33 |
|
34 | if (options.throwHttpErrors && statusCode !== 304 && (statusCode < 200 || statusCode > 299)) {
|
35 | proxy.emit('error', new HTTPError(response, options), null, response);
|
36 | return;
|
37 | }
|
38 |
|
39 | isFinished = true;
|
40 |
|
41 | response.pipe(output);
|
42 |
|
43 | for (const destination of piped) {
|
44 | if (destination.headersSent) {
|
45 | continue;
|
46 | }
|
47 |
|
48 | for (const [key, value] of Object.entries(response.headers)) {
|
49 |
|
50 |
|
51 | const allowed = options.decompress ? key !== 'content-encoding' : true;
|
52 | if (allowed) {
|
53 | destination.setHeader(key, value);
|
54 | }
|
55 | }
|
56 |
|
57 | destination.statusCode = response.statusCode;
|
58 | }
|
59 |
|
60 | proxy.emit('response', response);
|
61 | });
|
62 |
|
63 | [
|
64 | 'error',
|
65 | 'request',
|
66 | 'redirect',
|
67 | 'uploadProgress',
|
68 | 'downloadProgress'
|
69 | ].forEach(event => emitter.on(event, (...args) => proxy.emit(event, ...args)));
|
70 |
|
71 | const pipe = proxy.pipe.bind(proxy);
|
72 | const unpipe = proxy.unpipe.bind(proxy);
|
73 | proxy.pipe = (destination, options) => {
|
74 | if (isFinished) {
|
75 | throw new Error('Failed to pipe. The response has been emitted already.');
|
76 | }
|
77 |
|
78 | const result = pipe(destination, options);
|
79 |
|
80 | if (Reflect.has(destination, 'setHeader')) {
|
81 | piped.add(destination);
|
82 | }
|
83 |
|
84 | return result;
|
85 | };
|
86 | proxy.unpipe = stream => {
|
87 | piped.delete(stream);
|
88 | return unpipe(stream);
|
89 | };
|
90 |
|
91 | return proxy;
|
92 | };
|