UNPKG

8.44 kBJavaScriptView Raw
1'use strict';
2
3const fetch = require('node-fetch');
4const { Collection } = require('mongodb');
5const assert = require('assert');
6const { cloneDeep, once } = require('lodash');
7const AppError = require('./appError');
8
9
10const doFetch = async (jsonUrl, etag, options) => {
11
12 let headers = options.headers || {};
13 if (etag) {
14 headers = { 'If-None-Match': etag, ...headers };
15 }
16
17 const fetchOptions = {
18 timeout: 20000,
19 ...options,
20 headers
21 };
22
23 const response = await fetch(jsonUrl, fetchOptions);
24
25 const notModified = etag && response.status === 304;
26
27 if (!response.ok && response.status !== 304) {
28 throw AppError.internal(`URL fetch fail: ${jsonUrl}`, {
29 response: {
30 status: response.status,
31 body: await response.text()
32 }
33 });
34 }
35
36 return {
37 notModified,
38 newEtag: response.headers.get('etag') || null,
39 newContent: notModified ? null : await response.json()
40 };
41};
42
43const metaProjection = { fetchedAt: true, etag: true };
44
45/**
46 * @param {Collection} collection
47 * @param {string} key
48 * @param {Object} [projection]
49 */
50const findRecord = async (collection, key, projection) => (
51 collection.findOne({ _id: key }, { projection })
52);
53
54/**
55 * @param {Collection} collection
56 * @param {string} key
57 * @param {Object} [projection]
58 */
59const findRecordOrThrow = async (collection, key, projection) => {
60 const record = await findRecord(collection, key, projection);
61 if (!record) {
62 throw AppError.internal('Missing record for key.');
63 }
64 return record;
65};
66
67const isCacheFresh = (fetchedAt, freshnessLimitDate) => fetchedAt >= freshnessLimitDate;
68
69
70/**
71 * @param {CacheConfig} cacheConfig
72 * @param {string} key
73 * @param {string} url
74 * @returns {function(...[*]=)}
75 */
76const getKeyResolver = (cacheConfig, key, url) => {
77
78 const refreshRecord = once(async () => {
79
80 const freshnessLimitDate = new Date(Date.now() - cacheConfig.cacheLifetime);
81
82 const refreshPromise = (async () => {
83
84 // find meta only, we don't want to hold potentially large data in memory during the fetch
85 const meta = await findRecord(cacheConfig.collection, key, metaProjection);
86
87 if (meta && isCacheFresh(meta.fetchedAt, freshnessLimitDate)) {
88 return meta;
89 }
90
91 try {
92 const etag = meta && meta.etag;
93 const { notModified, newContent, newEtag } = await doFetch(url, etag, cacheConfig.fetchOptions);
94
95 if (notModified) {
96 const {
97 value: newMeta,
98 lastErrorObject: { updatedExisting }
99 } = await cacheConfig.collection.findOneAndUpdate(
100 { _id: key, fetchedAt: meta.fetchedAt },
101 { $set: { fetchedAt: new Date() } },
102 { returnOriginal: false, projection: metaProjection }
103 );
104
105 if (!updatedExisting) {
106 // concurrency occurred - another fetcher/process updated the record in the meanwhile
107 return findRecordOrThrow(cacheConfig.collection, key, metaProjection);
108 }
109
110 return newMeta;
111 }
112
113 const finalContent = await cacheConfig.transform(newContent, key);
114
115 const record = { content: finalContent, etag: newEtag, fetchedAt: new Date() };
116
117 await cacheConfig.collection.replaceOne({ _id: key }, record, { upsert: true });
118
119 return record;
120
121 } catch (e) {
122 cacheConfig.logError(e);
123 if (!meta) {
124 throw e;
125 }
126 return meta;
127 }
128
129 })();
130
131 const refreshResult = await new Promise((resolve, reject) => {
132
133 const timeoutId = setTimeout(() => {
134 cacheConfig.logError(new Error('Loading of the content timed out.'));
135 findRecord(cacheConfig.collection, key)
136 .then((record) => {
137 if (!record) {
138 throw new Error('Cache refresh timed-out and there is no cached version to serve yet.');
139 }
140 return record;
141 })
142 .then(resolve, reject);
143 }, cacheConfig.timeout);
144
145 refreshPromise
146 .then(resolve, reject)
147 .finally(() => clearTimeout(timeoutId));
148 });
149
150
151 return {
152 meta: {
153 isCacheFresh: isCacheFresh(refreshResult.fetchedAt, freshnessLimitDate),
154 etag: refreshResult.etag
155 },
156 getFull: once(async () => {
157 const fullRecord = 'content' in refreshResult
158 ? refreshResult
159 : await findRecordOrThrow(cacheConfig.collection, key);
160 return {
161 isCacheFresh: isCacheFresh(fullRecord.fetchedAt, freshnessLimitDate),
162 etag: fullRecord.etag,
163 content: fullRecord.content
164 };
165 })
166 };
167 });
168
169 return async ({ ifNoneMatch, metaOnly }) => {
170
171 const result = await refreshRecord();
172
173 let etagMatch = !!(ifNoneMatch && ifNoneMatch === result.meta.etag);
174
175 if (metaOnly || etagMatch) {
176 // no need to load content from the DB
177 return {
178 isCacheFresh: result.meta.isCacheFresh,
179 etag: result.meta.etag,
180 etagMatch,
181 content: null
182 };
183 }
184
185 const full = await result.getFull();
186 etagMatch = !!(ifNoneMatch && ifNoneMatch === full.etag);
187
188 return {
189 isCacheFresh: full.isCacheFresh,
190 etag: full.etag,
191 etagMatch,
192 content: etagMatch ? null : full.content
193 };
194 };
195};
196
197/**
198 * @typedef {{
199 * collection: Collection
200 * fetchOptions: Object
201 * cacheLifetime: number
202 * transform: Function
203 * logError: Function
204 * timeout: number
205 * }} CacheConfig
206 */
207
208/**
209 * @param {Collection} collection
210 * @param {string} [url]
211 * @param {number} [cacheLifetime]
212 * @param {Object} [fetchOptions]
213 * @param {Function} [transform]
214 * @param {boolean} [ensureIndexes=true]
215 * @param {function} [logError]
216 * @throws Error
217 */
218module.exports = async (collection, {
219 url: defaultUrl = null,
220 fetchOptions = {},
221 cacheLifetime = -1, // every call will cause fetch fresh data by default, the cache is only fallback
222 transform = content => content,
223 ensureIndexes = true,
224 logError = () => {},
225 timeout = 2000
226} = {}) => {
227
228 assert(collection instanceof Collection, 'The collection has to be instance of mongodb.Collection.');
229
230 if (ensureIndexes) {
231 // ensure index to enable covered queries
232 // the collection.createIndex will perform the index creation only if the index is missing
233 await collection.createIndex({ _id: 1, fetchedAt: 1, etag: 1 });
234 }
235
236 /** @type {CacheConfig} */
237 const cacheConfig = {
238 collection, fetchOptions, cacheLifetime, transform, logError, timeout
239 };
240
241 // the map helps to handle concurrent cache access in optimal way,
242 // there's no need to process the same key (url) multiple times
243 const runningKeyResolvers = new Map();
244
245 return async ({
246 url = defaultUrl,
247 key = url,
248 ifNoneMatch = null,
249 metaOnly = false
250 } = {}) => {
251
252 let runningResolver = runningKeyResolvers.get(key);
253
254 if (!runningResolver) {
255 runningResolver = {
256 resolve: getKeyResolver(cacheConfig, key, url),
257 concurrentRuns: 1
258 };
259 runningKeyResolvers.set(key, runningResolver);
260 } else {
261 runningResolver.concurrentRuns++;
262 }
263
264 try {
265 const resolved = await runningResolver.resolve({ ifNoneMatch, metaOnly });
266
267 return runningResolver.concurrentRuns === 1
268 ? resolved
269 : cloneDeep(resolved);
270
271 } finally {
272 // let's
273 if (--runningResolver.concurrentRuns === 0) {
274 runningKeyResolvers.delete(key);
275 }
276 }
277 };
278};