UNPKG

15.7 kBJavaScriptView Raw
1'use strict';
2
3// Simple wrapper around the popular Node request module, adding some logging
4// and Express-like route URI templates
5
6const { extend, filter, find, flatten, groupBy, map, omit, pick, sortBy } = require('underscore');
7
8const rrequest = require('request');
9const lock = require('abacus-lock').locker('request');
10const lru = require('abacus-lrucache');
11const moment = require('abacus-moment');
12const transform = require('abacus-transform');
13
14const url = require('url');
15
16/* jshint undef: false */
17/* jshint unused: false */
18
19// Setup debug log
20const debug = require('abacus-debug')('abacus-request');
21const edebug = require('abacus-debug')('e-abacus-request');
22
23// Return the list of parameters found in a URI template
24const params = (template) => {
25 return map(template.match(/:[a-z_][a-z0-9_]*/gi), (k) => k.substr(1));
26};
27
28// Generates a URI from an Express-like URI template, use like this:
29// request.route('http://.../:x/:y', { x: 1, y: 2 });
30// returns http://.../1/2
31const route = (template, parms) => {
32 return template.replace(/:[a-z_][a-z0-9_]*/gi, (name) => {
33 const k = name.substr(1);
34 return parms[k] === undefined ? name : parms[k];
35 });
36};
37
38// Setup request default options
39const drequest = rrequest.defaults({
40 json: true,
41 rejectUnauthorized: !process.env.SKIP_SSL_VALIDATION,
42 forever: false,
43 pool: {
44 maxSockets: parseInt(process.env.MAX_SOCKETS) || 1000
45 }
46});
47
48// Convert request params to a request target configuration, uri, options and
49// cb are all optional, but uri is expected to be given either as the uri
50// parameter or as a field in the options parameter
51const target = (m, uri, opt, cb) => {
52 const callback = typeof opt === 'function' && !cb ? opt : cb;
53
54 // Compute the default options
55 const options = (uri, opt) => {
56 if (typeof opt === 'object') return extend({}, opt, { route: uri });
57
58 if (typeof uri === 'string') return { route: uri };
59
60 return extend({}, uri, { route: uri.route || uri.uri });
61 };
62
63 // Resolve the route URI
64 const resolve = (opts) =>
65 extend({}, opts, {
66 uri: route(opts.route, opts)
67 });
68
69 // Determine the target method
70 const method = (m, opts) => extend({}, opts, m ? { method: m } : {});
71
72 // Return the request target configuration
73 const opts = method(m, resolve(options(uri, opt)));
74
75 return {
76 // uri: opts.uri,
77 uri: opts.baseUrl ? opts.baseUrl + opts.uri : opts.uri,
78 options: opts,
79 callback: callback
80 };
81};
82
83// Return true if the given request target is cacheable
84const cacheableTarget = (t) =>
85 t.options.cache &&
86 t.callback &&
87 !(t.options.method !== undefined && t.options.method !== 'GET');
88
89// Return true if the given request target and response are cacheable
90const cacheable = (t, res) => {
91 if (!cacheableTarget(t))
92 return false;
93 if (res && res.statusCode !== undefined && res.statusCode !== 200)
94 return false;
95
96 return res ? true : false;
97};
98
99// Maintain a LRU cache of REST resources
100const resources = lru({
101 max: 1000,
102 maxAge: 1000 * 60 * 20
103});
104
105// Cache a response
106const cache = (uri, res) => {
107 resources.set(uri, pick(res, 'statusCode', 'cookies', 'headers', 'body'));
108 debug('Cached %s resource %d %o', uri, res.statusCode || 200, res.body ? res.body : '');
109 return res;
110};
111
112// Return a resource from the cache
113const cached = (uri) => {
114 const res = resources.get(uri);
115 if (!res)
116 debug('Didn\'t find resource %s in cache', uri);
117 else
118 debug('Found resource %s in cache %d %o', uri, res.statusCode || 200, res.body ? res.body : '');
119 return res;
120};
121
122const extractMessageFromBody = (body, defaultMessage) => body && body.message || body && body.error || defaultMessage;
123
124// Convert an HTTP result with an error status code to an exception
125const httpexc = (res) => {
126 const message = extractMessageFromBody(res.body, `HTTP response status code ${res.statusCode}`);
127 const exception = extend(new Error(message), res.body);
128
129 // Warning: mutating variable err as that's really the simplest
130 // way to set its code property without having to get into the
131 // mess of creating a subclass of Error
132 exception.code = res.statusCode;
133 exception.statusCode = res.statusCode;
134 exception.headers = res.headers;
135
136 return exception;
137};
138
139// Simple wrapper around the request module function, use like this:
140// request('http://localhost/whatever/:x/:y', { x: 10, y: 20 }, (err, res) => {
141// do something with res, res is a Javascript object parsed from the
142// JSON response
143// });
144const createRequest = () => (uri, opts, cb) => {
145 // Convert the input parameters to a request target configuration
146 const t = target(undefined, uri, opts, cb);
147
148 // Send the request
149 /* eslint complexity: [1, 7] */
150 const send = (t, scb) => {
151 const requestMsg = `${t.options.method} ${t.uri}`;
152 debug(`Sending ${requestMsg}, body %o`, t.options.body || '');
153
154 return drequest(
155 t.options,
156 scb
157 ? (err, res) => {
158 // Call back with the response
159 if (err) {
160 debug(`Error on ${requestMsg}, body: %o, error: %o`, t.options.body || '', err);
161 edebug(`Error on ${requestMsg}, error: %o`, err);
162 return scb(err, undefined);
163 }
164
165 debug('Received %s response %d %o', t.options.method, res.statusCode, res.body || '');
166
167 if (res.statusCode >= 500 && res.statusCode <= 599) return scb(httpexc(res), undefined);
168
169 // Optionally cache REST resources
170 if (cacheable(t, res)) cache(t.uri, res);
171
172 return scb(undefined, res);
173 }
174 : undefined
175 );
176 };
177
178 // Optionally look for a cached REST resource
179 if (cacheableTarget(t))
180 return lock(t.uri, (err, unlock) => {
181 if (err) {
182 t.callback(err);
183 unlock();
184 return;
185 }
186 const res = cached(t.uri);
187 if (res) {
188 t.callback(undefined, res);
189 unlock();
190 return;
191 }
192
193 // Send the request
194 send(t, (err, res) => {
195 t.callback(err, res);
196 unlock();
197 });
198 });
199
200 // If not cacheable then just sent the request
201 return send(t, t.callback);
202};
203
204// Return a function that will send a request with the given HTTP method
205const singleOp = (method, opt) => {
206 return (uri, opts, cb) => {
207 const t = target(method, uri, opts, cb);
208 t.options = extend(t.options, opt);
209
210 return createRequest()(t.options, t.callback);
211 };
212};
213
214// Return a function that will send a batch of requests with the given HTTP
215// method
216const batchOp = (method, opt) => {
217 // Batches are sent using an HTTP POST
218 const post = singleOp('POST', opt);
219
220 return (reqs, cb) => {
221 debug('Sending a batch of %d %s requests', reqs.length, method);
222
223 // Return each request with its index in the list and the corresponding
224 // target request options
225 const targets = map(reqs, (args, i, reqs) => {
226 const cbargs = args.concat(() => undefined);
227 return {
228 i: i,
229 target: target(method, cbargs[0], cbargs[1], cbargs[2])
230 };
231 });
232
233 // Optionally lookup cached REST resources
234 transform.map(
235 targets,
236 (t, i, targets, ccb) => {
237 if (cacheableTarget(t.target)) {
238 const res = cached(t.target.uri);
239 if (res) return ccb(undefined, extend({}, t, { cached: [undefined, res] }));
240 }
241 return ccb(undefined, t);
242 },
243 (err, res) => {
244 if (err) cb(err);
245
246 // Collect the cached resources
247 const rcached = map(filter(res, (t) => t.cached), (r) => ({ i: r.i, res: r.cached }));
248
249 // Collect the non-cached resources
250 const targets = filter(res, (t) => !t.cached);
251
252 // Send the remaining requests, group the calls by target
253 // protocol, auth, host and OAuth bearer access token
254 const groups = map(
255 groupBy(targets, (t) =>
256 [
257 url.resolve(t.target.uri, '/'),
258 t.target.options.headers && t.target.options.headers['x-batch-id']
259 ? t.target.options.headers['x-batch-id']
260 : '',
261 t.target.options.headers && t.target.options.headers.authorization
262 ? t.target.options.headers.authorization
263 : ''
264 ].join('-')
265 )
266 );
267
268 // Send a single HTTP request per group
269 transform.map(
270 groups,
271 (group, g, groups, gcb) => {
272 // Build the request body
273 const greq = map(group, (t) =>
274 extend({ uri: url.parse(t.target.uri).path }, pick(t.target.options, 'method', 'headers', 'json', 'body'))
275 );
276
277 // Use the group's OAuth bearer access token for POST request
278 const o =
279 greq[0].headers && greq[0].headers.authorization
280 ? { headers: pick(greq[0].headers, 'authorization') }
281 : {};
282
283 const batchEndpoint = (opt && opt.batch_endpoint) || '/batch';
284 debug('Using batch endpoint %s', batchEndpoint);
285
286 // Send the POST request to the target's host /batch path
287 post(url.resolve(group[0].target.uri, batchEndpoint), extend(o, { body: greq }), (err, bres) => {
288 if (err) {
289 gcb(
290 undefined,
291 map(group, (g) => {
292 return { i: g.i, res: [err, undefined] };
293 })
294 );
295 return;
296 }
297
298 // Return the list of results from the response body
299 if (bres) {
300 // Forward non-200 status codes from /batch to requests
301 const httpres = () => {
302 edebug(
303 'Batch response from %s %d %o %o',
304 group[0].target.uri,
305 bres.statusCode,
306 pick(bres.headers, 'www-authenticate') || '',
307 bres.body || ''
308 );
309
310 gcb(
311 undefined,
312 map(group, (g) => ({
313 i: g.i,
314 res: [undefined, extend({}, bres)]
315 }))
316 );
317 };
318
319 // Handle non-200 status codes from /batch
320 if (bres.statusCode !== 200) {
321 httpres();
322 return;
323 }
324
325 /* eslint complexity: [1, 7] */
326 gcb(
327 undefined,
328 map(bres.body, (r, i) => {
329 if (r.statusCode >= 500 && r.statusCode <= 599)
330 return { i: group[i].i, res: [httpexc(r), undefined] };
331
332 return {
333 i: group[i].i,
334 res: [
335 undefined,
336 {
337 statusCode: r.statusCode || 200,
338 headers: extend(
339 r.headers || omit(bres.headers, ['content-type', 'content-length']) || {},
340 r.header && r.header.Location ? { location: r.header.Location } : {}
341 ),
342 body: r.body
343 }
344 ]
345 };
346 })
347 );
348 } else
349 gcb(
350 undefined,
351 map(group, (g) => {
352 return { i: g.i, res: [undefined, undefined] };
353 })
354 );
355 });
356 },
357 (err, gres) => {
358 if (err) {
359 cb(err);
360 return;
361 }
362
363 // Optionally cache GET responses
364 const fgres = flatten(gres, true);
365 transform.map(
366 fgres,
367 (r, i, fgres, rcb) => {
368 const t = find(targets, (t) => t.i === r.i);
369
370 if (cacheable(t.target, r.res[1])) cache(t.target.uri, r.res[1]);
371
372 return rcb();
373 },
374 () => {
375 // Return assembled list of results from the cached list and
376 // the list of groups, sorted like the corresponding requests
377 const res = map(sortBy(rcached.concat(fgres), (r) => r.i), (r) => r.res);
378
379 cb(undefined, res);
380 }
381 );
382 }
383 );
384 }
385 );
386 };
387};
388
389// Return a function that pings a URL, useful to wait for the availability of
390// an application in test cases for example. Ping can be invoked repeatedly
391// Jest waitsFor condition for example without flooding the target URL with
392// requests, as it will only ping that URL every 250 msec.
393
394// Warning: pings is a mutable variable, used to record ping times and the
395// corresponding responses
396let pings = [];
397
398const maxSuccessfulPings = 5;
399const counterBarrier = maxSuccessfulPings + 1;
400
401const ping = (uri, opt, reqFn) => {
402 const cb = function(err, val) {
403 // Warning: mutating variable pings
404 if (!err) {
405 pings[t.uri].val = val;
406 pings[t.uri].count = ((pings[t.uri].count || 0) + 1) % counterBarrier;
407 return;
408 }
409
410 pings[t.uri].count = 0;
411 };
412
413 const t = target(undefined, uri, opt ? opt : cb, opt ? cb : undefined);
414
415 // Warning: mutating variable pings
416 pings[t.uri] = pings[t.uri] || { t: moment.now() };
417
418 if (moment.now() - pings[t.uri].t >= 250) {
419 // Warning: mutating variable pings
420 pings[t.uri].t = moment.now();
421 reqFn.options(t.uri, t.options, t.callback);
422 }
423
424 return pings[t.uri].count || 0;
425};
426
427// Ping the target URI every 250 msec and wait for it to become available
428const waitFor = (uri, opt, time, reqFn, cb) => {
429 const callback = typeof time === 'function' && !cb ? time : cb;
430 const timeout = typeof time === 'function' ? 10000 : time;
431
432 const t = target(undefined, uri, opt, callback);
433
434 debug('Pinging %s', t.uri);
435 const i = setInterval(() => {
436 const n = reqFn.ping(t.uri, t.options);
437 if (n !== 0) debug('%d successful pings', n);
438 if (n === maxSuccessfulPings) {
439 // Call back after max successful pings
440 clearInterval(i);
441 clearTimeout(to);
442 t.callback(undefined, t.uri);
443 }
444 }, 250);
445
446 // Time out (by default after 10 seconds)
447 const to = setTimeout(() => {
448 debug('Timed out after %d ms', timeout);
449 clearInterval(i);
450 t.callback(new Error('timeout'));
451 }, timeout);
452};
453
454// Shorthand functions for the various HTTP methods
455const extendRequest = (defaultOptions) => {
456 const requestFn = extend(createRequest(), {
457 get: singleOp('GET', defaultOptions),
458 head: singleOp('HEAD', defaultOptions),
459 patch: singleOp('PATCH', defaultOptions),
460 options: singleOp('OPTIONS', defaultOptions),
461 post: singleOp('POST', defaultOptions),
462 put: singleOp('PUT', defaultOptions),
463 del: singleOp('DELETE', defaultOptions),
464 delete: singleOp('DELETE', defaultOptions),
465
466 // Batch versions of the HTTP methods, for use with the batch module
467 batch_get: batchOp('GET', defaultOptions),
468 batch_head: batchOp('HEAD', defaultOptions),
469 batch_patch: batchOp('PATCH', defaultOptions),
470 batch_post: batchOp('POST', defaultOptions),
471 batch_put: batchOp('PUT', defaultOptions),
472 batch_del: batchOp('DELETE', defaultOptions),
473 batch_delete: batchOp('DELETE', defaultOptions),
474
475 // Add helpers
476 params: params,
477 route: route
478 });
479
480 // Add ping & wait functions
481 return extend(requestFn, {
482 ping: (uri, opt) => ping(uri, opt, requestFn),
483 waitFor: (uri, opt, time, cb) => waitFor(uri, opt, time, requestFn, cb)
484 });
485};
486
487const noDefaults = extendRequest();
488const defaults = (options) => extendRequest(options);
489
490// Export our public functions
491module.exports = noDefaults;
492module.exports.defaults = defaults;