1 | const { Minipass } = require('minipass')
|
2 | const fetch = require('minipass-fetch')
|
3 | const promiseRetry = require('promise-retry')
|
4 | const ssri = require('ssri')
|
5 | const { log } = require('proc-log')
|
6 |
|
7 | const CachingMinipassPipeline = require('./pipeline.js')
|
8 | const { getAgent } = require('@npmcli/agent')
|
9 | const pkg = require('../package.json')
|
10 |
|
11 | const USER_AGENT = `${pkg.name}/${pkg.version} (+https://npm.im/${pkg.name})`
|
12 |
|
13 | const RETRY_ERRORS = [
|
14 | 'ECONNRESET',
|
15 | 'ECONNREFUSED',
|
16 | 'EADDRINUSE',
|
17 | 'ETIMEDOUT',
|
18 |
|
19 | 'ECONNECTIONTIMEOUT',
|
20 | 'EIDLETIMEOUT',
|
21 | 'ERESPONSETIMEOUT',
|
22 | 'ETRANSFERTIMEOUT',
|
23 |
|
24 |
|
25 |
|
26 |
|
27 | ]
|
28 |
|
29 | const RETRY_TYPES = [
|
30 | 'request-timeout',
|
31 | ]
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 | const remoteFetch = (request, options) => {
|
38 | const agent = getAgent(request.url, options)
|
39 | if (!request.headers.has('connection')) {
|
40 | request.headers.set('connection', agent ? 'keep-alive' : 'close')
|
41 | }
|
42 |
|
43 | if (!request.headers.has('user-agent')) {
|
44 | request.headers.set('user-agent', USER_AGENT)
|
45 | }
|
46 |
|
47 |
|
48 |
|
49 | const _opts = {
|
50 | ...options,
|
51 | agent,
|
52 | redirect: 'manual',
|
53 | }
|
54 |
|
55 | return promiseRetry(async (retryHandler, attemptNum) => {
|
56 | const req = new fetch.Request(request, _opts)
|
57 | try {
|
58 | let res = await fetch(req, _opts)
|
59 | if (_opts.integrity && res.status === 200) {
|
60 |
|
61 |
|
62 | const integrityStream = ssri.integrityStream({
|
63 | algorithms: _opts.algorithms,
|
64 | integrity: _opts.integrity,
|
65 | size: _opts.size,
|
66 | })
|
67 | const pipeline = new CachingMinipassPipeline({
|
68 | events: ['integrity', 'size'],
|
69 | }, res.body, integrityStream)
|
70 |
|
71 |
|
72 | integrityStream.on('integrity', i => pipeline.emit('integrity', i))
|
73 | integrityStream.on('size', s => pipeline.emit('size', s))
|
74 | res = new fetch.Response(pipeline, res)
|
75 |
|
76 | res.body.hasIntegrityEmitter = true
|
77 | }
|
78 |
|
79 | res.headers.set('x-fetch-attempts', attemptNum)
|
80 |
|
81 |
|
82 |
|
83 | const isStream = Minipass.isStream(req.body)
|
84 | const isRetriable = req.method !== 'POST' &&
|
85 | !isStream &&
|
86 | ([408, 420, 429].includes(res.status) || res.status >= 500)
|
87 |
|
88 | if (isRetriable) {
|
89 | if (typeof options.onRetry === 'function') {
|
90 | options.onRetry(res)
|
91 | }
|
92 |
|
93 |
|
94 | log.http('fetch', `${req.method} ${req.url} attempt ${attemptNum} failed with ${res.status}`)
|
95 | return retryHandler(res)
|
96 | }
|
97 |
|
98 | return res
|
99 | } catch (err) {
|
100 | const code = (err.code === 'EPROMISERETRY')
|
101 | ? err.retried.code
|
102 | : err.code
|
103 |
|
104 |
|
105 |
|
106 |
|
107 | const isRetryError = err.retried instanceof fetch.Response ||
|
108 | (RETRY_ERRORS.includes(code) && RETRY_TYPES.includes(err.type))
|
109 |
|
110 | if (req.method === 'POST' || isRetryError) {
|
111 | throw err
|
112 | }
|
113 |
|
114 | if (typeof options.onRetry === 'function') {
|
115 | options.onRetry(err)
|
116 | }
|
117 |
|
118 | log.http('fetch', `${req.method} ${req.url} attempt ${attemptNum} failed with ${err.code}`)
|
119 | return retryHandler(err)
|
120 | }
|
121 | }, options.retry).catch((err) => {
|
122 |
|
123 | if (err.status >= 400 && err.type !== 'system') {
|
124 | return err
|
125 | }
|
126 |
|
127 | throw err
|
128 | })
|
129 | }
|
130 |
|
131 | module.exports = remoteFetch
|