1 | 'use strict';
|
2 |
|
3 | const fetch = require('node-fetch');
|
4 | const { Collection } = require('mongodb');
|
5 | const assert = require('assert');
|
6 | const { cloneDeep, once } = require('lodash');
|
7 | const AppError = require('./appError');
|
8 |
|
9 |
|
10 | const doFetch = async (jsonUrl, etag, options) => {
|
11 |
|
12 | let headers = options.headers || {};
|
13 | if (etag) {
|
14 | headers = { 'If-None-Match': etag, ...headers };
|
15 | }
|
16 |
|
17 | const fetchOptions = {
|
18 | timeout: 20000,
|
19 | ...options,
|
20 | headers
|
21 | };
|
22 |
|
23 | const response = await fetch(jsonUrl, fetchOptions);
|
24 |
|
25 | const notModified = etag && response.status === 304;
|
26 |
|
27 | if (!response.ok && response.status !== 304) {
|
28 | throw AppError.internal(`URL fetch fail: ${jsonUrl}`, {
|
29 | response: {
|
30 | status: response.status,
|
31 | body: await response.text()
|
32 | }
|
33 | });
|
34 | }
|
35 |
|
36 | return {
|
37 | notModified,
|
38 | newEtag: response.headers.get('etag') || null,
|
39 | newContent: notModified ? null : await response.json()
|
40 | };
|
41 | };
|
42 |
|
43 | const metaProjection = { fetchedAt: true, etag: true };
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 | const findRecord = async (collection, key, projection) => (
|
51 | collection.findOne({ _id: key }, { projection })
|
52 | );
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 | const findRecordOrThrow = async (collection, key, projection) => {
|
60 | const record = await findRecord(collection, key, projection);
|
61 | if (!record) {
|
62 | throw AppError.internal('Missing record for key.');
|
63 | }
|
64 | return record;
|
65 | };
|
66 |
|
67 | const isCacheFresh = (fetchedAt, freshnessLimitDate) => fetchedAt >= freshnessLimitDate;
|
68 |
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 | const getKeyResolver = (cacheConfig, key, url) => {
|
77 |
|
78 | const refreshRecord = once(async () => {
|
79 |
|
80 | const freshnessLimitDate = new Date(Date.now() - cacheConfig.cacheLifetime);
|
81 |
|
82 | const refreshPromise = (async () => {
|
83 |
|
84 |
|
85 | const meta = await findRecord(cacheConfig.collection, key, metaProjection);
|
86 |
|
87 | if (meta && isCacheFresh(meta.fetchedAt, freshnessLimitDate)) {
|
88 | return meta;
|
89 | }
|
90 |
|
91 | try {
|
92 | const etag = meta && meta.etag;
|
93 | const { notModified, newContent, newEtag } = await doFetch(url, etag, cacheConfig.fetchOptions);
|
94 |
|
95 | if (notModified) {
|
96 | const {
|
97 | value: newMeta,
|
98 | lastErrorObject: { updatedExisting }
|
99 | } = await cacheConfig.collection.findOneAndUpdate(
|
100 | { _id: key, fetchedAt: meta.fetchedAt },
|
101 | { $set: { fetchedAt: new Date() } },
|
102 | { returnOriginal: false, projection: metaProjection }
|
103 | );
|
104 |
|
105 | if (!updatedExisting) {
|
106 |
|
107 | return findRecordOrThrow(cacheConfig.collection, key, metaProjection);
|
108 | }
|
109 |
|
110 | return newMeta;
|
111 | }
|
112 |
|
113 | const finalContent = await cacheConfig.transform(newContent, key);
|
114 |
|
115 | const record = { content: finalContent, etag: newEtag, fetchedAt: new Date() };
|
116 |
|
117 | await cacheConfig.collection.replaceOne({ _id: key }, record, { upsert: true });
|
118 |
|
119 | return record;
|
120 |
|
121 | } catch (e) {
|
122 | cacheConfig.logError(e);
|
123 | if (!meta) {
|
124 | throw e;
|
125 | }
|
126 | return meta;
|
127 | }
|
128 |
|
129 | })();
|
130 |
|
131 | const refreshResult = await new Promise((resolve, reject) => {
|
132 |
|
133 | const timeoutId = setTimeout(() => {
|
134 | cacheConfig.logError(new Error('Loading of the content timed out.'));
|
135 | findRecord(cacheConfig.collection, key)
|
136 | .then((record) => {
|
137 | if (!record) {
|
138 | throw new Error('Cache refresh timed-out and there is no cached version to serve yet.');
|
139 | }
|
140 | return record;
|
141 | })
|
142 | .then(resolve, reject);
|
143 | }, cacheConfig.timeout);
|
144 |
|
145 | refreshPromise
|
146 | .then(resolve, reject)
|
147 | .finally(() => clearTimeout(timeoutId));
|
148 | });
|
149 |
|
150 |
|
151 | return {
|
152 | meta: {
|
153 | isCacheFresh: isCacheFresh(refreshResult.fetchedAt, freshnessLimitDate),
|
154 | etag: refreshResult.etag
|
155 | },
|
156 | getFull: once(async () => {
|
157 | const fullRecord = 'content' in refreshResult
|
158 | ? refreshResult
|
159 | : await findRecordOrThrow(cacheConfig.collection, key);
|
160 | return {
|
161 | isCacheFresh: isCacheFresh(fullRecord.fetchedAt, freshnessLimitDate),
|
162 | etag: fullRecord.etag,
|
163 | content: fullRecord.content
|
164 | };
|
165 | })
|
166 | };
|
167 | });
|
168 |
|
169 | return async ({ ifNoneMatch, metaOnly }) => {
|
170 |
|
171 | const result = await refreshRecord();
|
172 |
|
173 | let etagMatch = !!(ifNoneMatch && ifNoneMatch === result.meta.etag);
|
174 |
|
175 | if (metaOnly || etagMatch) {
|
176 |
|
177 | return {
|
178 | isCacheFresh: result.meta.isCacheFresh,
|
179 | etag: result.meta.etag,
|
180 | etagMatch,
|
181 | content: null
|
182 | };
|
183 | }
|
184 |
|
185 | const full = await result.getFull();
|
186 | etagMatch = !!(ifNoneMatch && ifNoneMatch === full.etag);
|
187 |
|
188 | return {
|
189 | isCacheFresh: full.isCacheFresh,
|
190 | etag: full.etag,
|
191 | etagMatch,
|
192 | content: etagMatch ? null : full.content
|
193 | };
|
194 | };
|
195 | };
|
196 |
|
197 |
|
198 |
|
199 |
|
200 |
|
201 |
|
202 |
|
203 |
|
204 |
|
205 |
|
206 |
|
207 |
|
208 |
|
209 |
|
210 |
|
211 |
|
212 |
|
213 |
|
214 |
|
215 |
|
216 |
|
217 |
|
218 | module.exports = async (collection, {
|
219 | url: defaultUrl = null,
|
220 | fetchOptions = {},
|
221 | cacheLifetime = -1,
|
222 | transform = content => content,
|
223 | ensureIndexes = true,
|
224 | logError = () => {},
|
225 | timeout = 2000
|
226 | } = {}) => {
|
227 |
|
228 | assert(collection instanceof Collection, 'The collection has to be instance of mongodb.Collection.');
|
229 |
|
230 | if (ensureIndexes) {
|
231 |
|
232 |
|
233 | await collection.createIndex({ _id: 1, fetchedAt: 1, etag: 1 });
|
234 | }
|
235 |
|
236 |
|
237 | const cacheConfig = {
|
238 | collection, fetchOptions, cacheLifetime, transform, logError, timeout
|
239 | };
|
240 |
|
241 |
|
242 |
|
243 | const runningKeyResolvers = new Map();
|
244 |
|
245 | return async ({
|
246 | url = defaultUrl,
|
247 | key = url,
|
248 | ifNoneMatch = null,
|
249 | metaOnly = false
|
250 | } = {}) => {
|
251 |
|
252 | let runningResolver = runningKeyResolvers.get(key);
|
253 |
|
254 | if (!runningResolver) {
|
255 | runningResolver = {
|
256 | resolve: getKeyResolver(cacheConfig, key, url),
|
257 | concurrentRuns: 1
|
258 | };
|
259 | runningKeyResolvers.set(key, runningResolver);
|
260 | } else {
|
261 | runningResolver.concurrentRuns++;
|
262 | }
|
263 |
|
264 | try {
|
265 | const resolved = await runningResolver.resolve({ ifNoneMatch, metaOnly });
|
266 |
|
267 | return runningResolver.concurrentRuns === 1
|
268 | ? resolved
|
269 | : cloneDeep(resolved);
|
270 |
|
271 | } finally {
|
272 |
|
273 | if (--runningResolver.concurrentRuns === 0) {
|
274 | runningKeyResolvers.delete(key);
|
275 | }
|
276 | }
|
277 | };
|
278 | };
|