UNPKG

2.88 kBJavaScriptView Raw
1'use strict';
2const {PassThrough} = require('stream');
3const duplexer3 = require('duplexer3');
4const is = require('@sindresorhus/is');
5const requestAsEventEmitter = require('./request-as-event-emitter');
6const {HTTPError, ReadError} = require('./errors');
7
8module.exports = options => {
9 const input = new PassThrough();
10 const output = new PassThrough();
11 const proxy = duplexer3(input, output);
12 const piped = new Set();
13 let isFinished = false;
14
15 options.gotRetry.retries = () => 0;
16
17 if (options.body) {
18 proxy.write = () => {
19 throw new Error('Got\'s stream is not writable when the `body` option is used');
20 };
21 }
22
23 const emitter = requestAsEventEmitter(options);
24
25 emitter.on('request', request => {
26 proxy.emit('request', request);
27 const uploadComplete = () => {
28 request.emit('upload-complete');
29 };
30
31 if (is.nodeStream(options.body)) {
32 options.body.once('end', uploadComplete);
33 options.body.pipe(request);
34 return;
35 }
36
37 if (options.body) {
38 request.end(options.body, uploadComplete);
39 return;
40 }
41
42 if (options.method === 'POST' || options.method === 'PUT' || options.method === 'PATCH') {
43 input.once('end', uploadComplete);
44 input.pipe(request);
45 return;
46 }
47
48 request.end(uploadComplete);
49 });
50
51 emitter.on('response', response => {
52 const {statusCode} = response;
53
54 response.on('error', error => {
55 proxy.emit('error', new ReadError(error, options));
56 });
57
58 if (options.throwHttpErrors && statusCode !== 304 && (statusCode < 200 || statusCode > 299)) {
59 proxy.emit('error', new HTTPError(statusCode, response.statusMessage, response.headers, options), null, response);
60 return;
61 }
62
63 isFinished = true;
64
65 response.pipe(output);
66
67 for (const destination of piped) {
68 if (destination.headersSent) {
69 continue;
70 }
71
72 for (const [key, value] of Object.entries(response.headers)) {
73 // Got gives *uncompressed* data. Overriding `content-encoding` header would result in an error.
74 // It's not possible to decompress uncompressed data, is it?
75 const allowed = options.decompress ? key !== 'content-encoding' : true;
76 if (allowed) {
77 destination.setHeader(key, value);
78 }
79 }
80
81 destination.statusCode = response.statusCode;
82 }
83
84 proxy.emit('response', response);
85 });
86
87 [
88 'error',
89 'redirect',
90 'uploadProgress',
91 'downloadProgress'
92 ].forEach(event => emitter.on(event, (...args) => proxy.emit(event, ...args)));
93
94 const pipe = proxy.pipe.bind(proxy);
95 const unpipe = proxy.unpipe.bind(proxy);
96 proxy.pipe = (destination, options) => {
97 if (isFinished) {
98 throw new Error('Failed to pipe. The response has been emitted already.');
99 }
100
101 const result = pipe(destination, options);
102
103 if (Reflect.has(destination, 'setHeader')) {
104 piped.add(destination);
105 }
106
107 return result;
108 };
109 proxy.unpipe = stream => {
110 piped.delete(stream);
111 return unpipe(stream);
112 };
113
114 return proxy;
115};