UNPKG

12 kBJavaScriptView Raw
1'use strict'
2
3const BB = require('bluebird')
4
5const cacheFile = require('npm-cache-filename')
6const chownr = BB.promisify(require('chownr'))
7const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
8const figgyPudding = require('figgy-pudding')
9const fs = require('graceful-fs')
10const JSONStream = require('JSONStream')
11const log = require('npmlog')
12const mkdir = BB.promisify(require('gentle-fs').mkdir)
13const ms = require('mississippi')
14const npmFetch = require('libnpm/fetch')
15const path = require('path')
16const sortedUnionStream = require('sorted-union-stream')
17const url = require('url')
18const writeStreamAtomic = require('fs-write-stream-atomic')
19
20const statAsync = BB.promisify(fs.stat)
21
22const APMOpts = figgyPudding({
23 cache: {},
24 registry: {}
25})
26// Returns a sorted stream of all package metadata. Internally, takes care of
27// maintaining its metadata cache and making partial or full remote requests,
28// according to staleness, validity, etc.
29//
30// The local cache must hold certain invariants:
31// 1. It must be a proper JSON object
32// 2. It must have its keys lexically sorted
33// 3. The first entry must be `_updated` with a millisecond timestamp as a val.
34// 4. It must include all entries that exist in the metadata endpoint as of
35// the value in `_updated`
36module.exports = allPackageMetadata
37function allPackageMetadata (opts) {
38 const staleness = opts.staleness
39 const stream = ms.through.obj()
40
41 opts = APMOpts(opts)
42 const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
43 const cachePath = path.join(cacheBase, '.cache.json')
44 createEntryStream(
45 cachePath, staleness, opts
46 ).then(({entryStream, latest, newEntries}) => {
47 log.silly('all-package-metadata', 'entry stream created')
48 if (entryStream && newEntries) {
49 return createCacheWriteStream(cachePath, latest, opts).then(writer => {
50 log.silly('all-package-metadata', 'output stream created')
51 ms.pipeline.obj(entryStream, writer, stream)
52 })
53 } else if (entryStream) {
54 ms.pipeline.obj(entryStream, stream)
55 } else {
56 stream.emit('error', new Error('No search sources available'))
57 }
58 }).catch(err => stream.emit('error', err))
59 return stream
60}
61
62// Creates a stream of the latest available package metadata.
63// Metadata will come from a combination of the local cache and remote data.
64module.exports._createEntryStream = createEntryStream
65function createEntryStream (cachePath, staleness, opts) {
66 return createCacheEntryStream(
67 cachePath, opts
68 ).catch(err => {
69 log.warn('', 'Failed to read search cache. Rebuilding')
70 log.silly('all-package-metadata', 'cache read error: ', err)
71 return {}
72 }).then(({
73 updateStream: cacheStream,
74 updatedLatest: cacheLatest
75 }) => {
76 cacheLatest = cacheLatest || 0
77 return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
78 log.warn('', 'Search data request failed, search might be stale')
79 log.silly('all-package-metadata', 'update request error: ', err)
80 return {}
81 }).then(({updateStream, updatedLatest}) => {
82 updatedLatest = updatedLatest || 0
83 const latest = updatedLatest || cacheLatest
84 if (!cacheStream && !updateStream) {
85 throw new Error('No search sources available')
86 }
87 if (cacheStream && updateStream) {
88 // Deduped, unioned, sorted stream from the combination of both.
89 return {
90 entryStream: createMergedStream(cacheStream, updateStream),
91 latest,
92 newEntries: !!updatedLatest
93 }
94 } else {
95 // Either one works if one or the other failed
96 return {
97 entryStream: cacheStream || updateStream,
98 latest,
99 newEntries: !!updatedLatest
100 }
101 }
102 })
103 })
104}
105
106// Merges `a` and `b` into one stream, dropping duplicates in favor of entries
107// in `b`. Both input streams should already be individually sorted, and the
108// returned output stream will have semantics resembling the merge step of a
109// plain old merge sort.
110module.exports._createMergedStream = createMergedStream
111function createMergedStream (a, b) {
112 linkStreams(a, b)
113 return sortedUnionStream(b, a, ({name}) => name)
114}
115
116// Reads the local index and returns a stream that spits out package data.
117module.exports._createCacheEntryStream = createCacheEntryStream
118function createCacheEntryStream (cacheFile, opts) {
119 log.verbose('all-package-metadata', 'creating entry stream from local cache')
120 log.verbose('all-package-metadata', cacheFile)
121 return statAsync(cacheFile).then(stat => {
122 // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
123 const entryStream = ms.pipeline.obj(
124 fs.createReadStream(cacheFile),
125 JSONStream.parse('*'),
126 // I believe this passthrough is necessary cause `jsonstream` returns
127 // weird custom streams that behave funny sometimes.
128 ms.through.obj()
129 )
130 return extractUpdated(entryStream, 'cached-entry-stream', opts)
131 })
132}
133
134// Stream of entry updates from the server. If `latest` is `0`, streams the
135// entire metadata object from the registry.
136module.exports._createEntryUpdateStream = createEntryUpdateStream
137function createEntryUpdateStream (staleness, latest, opts) {
138 log.verbose('all-package-metadata', 'creating remote entry stream')
139 let partialUpdate = false
140 let uri = '/-/all'
141 if (latest && (Date.now() - latest < (staleness * 1000))) {
142 // Skip the request altogether if our `latest` isn't stale.
143 log.verbose('all-package-metadata', 'Local data up to date, skipping update')
144 return BB.resolve({})
145 } else if (latest === 0) {
146 log.warn('', 'Building the local index for the first time, please be patient')
147 log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
148 } else {
149 log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
150 uri += '/since?stale=update_after&startkey=' + latest
151 partialUpdate = true
152 }
153 return npmFetch(uri, opts).then(res => {
154 log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
155 let entryStream = ms.pipeline.obj(
156 res.body,
157 JSONStream.parse('*', (pkg, key) => {
158 if (key[0] === '_updated' || key[0][0] !== '_') {
159 return pkg
160 }
161 })
162 )
163 if (partialUpdate) {
164 // The `/all/since` endpoint doesn't return `_updated`, so we
165 // just use the request's own timestamp.
166 return {
167 updateStream: entryStream,
168 updatedLatest: Date.parse(res.headers.get('date'))
169 }
170 } else {
171 return extractUpdated(entryStream, 'entry-update-stream', opts)
172 }
173 })
174}
175
176// Both the (full) remote requests and the local index have `_updated` as their
177// first returned entries. This is the "latest" unix timestamp for the metadata
178// in question. This code does a bit of juggling with the data streams
179// so that we can pretend that field doesn't exist, but still extract `latest`
180function extractUpdated (entryStream, label, opts) {
181 log.silly('all-package-metadata', 'extracting latest')
182 return new BB((resolve, reject) => {
183 function nope (msg) {
184 return function () {
185 log.warn('all-package-metadata', label, msg)
186 entryStream.removeAllListeners()
187 entryStream.destroy()
188 reject(new Error(msg))
189 }
190 }
191 const onErr = nope('Failed to read stream')
192 const onEnd = nope('Empty or invalid stream')
193 entryStream.on('error', onErr)
194 entryStream.on('end', onEnd)
195 entryStream.once('data', latest => {
196 log.silly('all-package-metadata', 'got first stream entry for', label, latest)
197 entryStream.removeListener('error', onErr)
198 entryStream.removeListener('end', onEnd)
199 if (typeof latest === 'number') {
200 // The extra pipeline is to return a stream that will implicitly unpause
201 // after having an `.on('data')` listener attached, since using this
202 // `data` event broke its initial state.
203 resolve({
204 updateStream: entryStream.pipe(ms.through.obj()),
205 updatedLatest: latest
206 })
207 } else {
208 reject(new Error('expected first entry to be _updated'))
209 }
210 })
211 })
212}
213
214// Creates a stream that writes input metadata to the current cache.
215// Cache updates are atomic, and the stream closes when *everything* is done.
216// The stream is also passthrough, so entries going through it will also
217// be output from it.
218module.exports._createCacheWriteStream = createCacheWriteStream
219function createCacheWriteStream (cacheFile, latest, opts) {
220 return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {
221 log.silly('all-package-metadata', 'creating output stream')
222 const outStream = _createCacheOutStream()
223 const cacheFileStream = writeStreamAtomic(cacheFile)
224 const inputStream = _createCacheInStream(
225 cacheFileStream, outStream, latest
226 )
227
228 // Glue together the various streams so they fail together.
229 // `cacheFileStream` errors are already handled by the `inputStream`
230 // pipeline
231 let errEmitted = false
232 linkStreams(inputStream, outStream, () => { errEmitted = true })
233
234 cacheFileStream.on('close', () => {
235 if (!errEmitted) {
236 if (typeof uid === 'number' &&
237 typeof gid === 'number' &&
238 process.getuid &&
239 process.getgid &&
240 (process.getuid() !== uid || process.getgid() !== gid)) {
241 chownr.sync(cacheFile, uid, gid)
242 }
243 outStream.end()
244 }
245 })
246
247 return ms.duplex.obj(inputStream, outStream)
248 })
249}
250
251// return the {uid,gid} that the cache should have
252function _ensureCacheDirExists (cacheFile, opts) {
253 var cacheBase = path.dirname(cacheFile)
254 log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
255 return correctMkdir(opts.cache).then(st => {
256 return mkdir(cacheBase).then(made => {
257 return chownr(made || cacheBase, st.uid, st.gid)
258 }).then(() => ({ uid: st.uid, gid: st.gid }))
259 })
260}
261
262function _createCacheOutStream () {
263 // NOTE: this looks goofy, but it's necessary in order to get
264 // JSONStream to play nice with the rest of everything.
265 return ms.pipeline.obj(
266 ms.through(),
267 JSONStream.parse('*', (obj, key) => {
268 // This stream happens to get _updated passed through it, for
269 // implementation reasons. We make sure to filter it out cause
270 // the fact that it comes t
271 if (typeof obj === 'object') {
272 return obj
273 }
274 }),
275 ms.through.obj()
276 )
277}
278
279function _createCacheInStream (writer, outStream, latest) {
280 let updatedWritten = false
281 const inStream = ms.pipeline.obj(
282 ms.through.obj((pkg, enc, cb) => {
283 if (!updatedWritten && typeof pkg === 'number') {
284 // This is the `_updated` value getting sent through.
285 updatedWritten = true
286 return cb(null, ['_updated', pkg])
287 } else if (typeof pkg !== 'object') {
288 this.emit('error', new Error('invalid value written to input stream'))
289 } else {
290 // The [key, val] format is expected by `jsonstream` for object writing
291 cb(null, [pkg.name, pkg])
292 }
293 }),
294 JSONStream.stringifyObject('{', ',', '}'),
295 ms.through((chunk, enc, cb) => {
296 // This tees off the buffer data to `outStream`, and then continues
297 // the pipeline as usual
298 outStream.write(chunk, enc, () => cb(null, chunk))
299 }),
300 // And finally, we write to the cache file.
301 writer
302 )
303 inStream.write(latest)
304 return inStream
305}
306
307// Links errors between `a` and `b`, preventing cycles, and calls `cb` if
308// an error happens, once per error.
309function linkStreams (a, b, cb) {
310 var lastError = null
311 a.on('error', function (err) {
312 if (err !== lastError) {
313 lastError = err
314 b.emit('error', err)
315 cb && cb(err)
316 }
317 })
318 b.on('error', function (err) {
319 if (err !== lastError) {
320 lastError = err
321 a.emit('error', err)
322 cb && cb(err)
323 }
324 })
325}