UNPKG

13.6 kBJavaScriptView Raw
1/**
2 * Class that manages making request, called by all of the API methods.
3 * @type {[type]}
4 */
5module.exports = Transport;
6
7var _ = require('lodash');
8var utils = require('./utils');
9var errors = require('./errors');
10var Host = require('./host');
11var patchSniffOnConnectionFault = require('./transport/sniff_on_connection_fault');
12var findCommonProtocol = require('./transport/find_common_protocol');
13
14function Transport(config) {
15 var self = this;
16 config = self._config = config || {};
17
18 var LogClass =
19 typeof config.log === 'function' ? config.log : require('./log');
20 config.log = self.log = new LogClass(config);
21
22 // setup the connection pool
23 var ConnectionPool = utils.funcEnum(
24 config,
25 'connectionPool',
26 Transport.connectionPools,
27 'main'
28 );
29 self.connectionPool = new ConnectionPool(config);
30
31 // setup the serializer
32 var Serializer = utils.funcEnum(
33 config,
34 'serializer',
35 Transport.serializers,
36 'json'
37 );
38 self.serializer = new Serializer(config);
39
40 // setup the nodesToHostCallback
41 self.nodesToHostCallback = utils.funcEnum(
42 config,
43 'nodesToHostCallback',
44 Transport.nodesToHostCallbacks,
45 'main'
46 );
47
48 // setup max retries
49 self.maxRetries = config.hasOwnProperty('maxRetries') ? config.maxRetries : 3;
50
51 // setup endpoint to use for sniffing
52 self.sniffEndpoint = config.hasOwnProperty('sniffEndpoint')
53 ? config.sniffEndpoint
54 : '/_nodes/_all/http';
55
56 // setup requestTimeout default
57 self.requestTimeout = config.hasOwnProperty('requestTimeout')
58 ? config.requestTimeout
59 : 30000;
60 self.pingTimeout = config.hasOwnProperty('pingTimeout')
61 ? config.pingTimeout
62 : 3000;
63
64 if (config.hasOwnProperty('defer')) {
65 self.defer = config.defer;
66 }
67
68 // randomizeHosts option
69 var randomizeHosts = config.hasOwnProperty('randomizeHosts')
70 ? !!config.randomizeHosts
71 : true;
72
73 if (config.host) {
74 config.hosts = config.host;
75 }
76
77 if (config.hosts) {
78 var hostsConfig = utils.createArray(config.hosts, function(val) {
79 if (_.isPlainObject(val) || _.isString(val) || val instanceof Host) {
80 return val;
81 }
82 });
83
84 if (!hostsConfig) {
85 throw new TypeError(
86 'Invalid hosts config. Expected a URL, an array of urls, a host config object, ' +
87 'or an array of host config objects.'
88 );
89 }
90
91 if (randomizeHosts) {
92 hostsConfig = _.shuffle(hostsConfig);
93 }
94
95 self.setHosts(hostsConfig);
96 }
97
98 if (config.hasOwnProperty('sniffedNodesProtocol')) {
99 self.sniffedNodesProtocol = config.sniffedNodesProtocol || null;
100 } else {
101 self.sniffedNodesProtocol =
102 findCommonProtocol(self.connectionPool.getAllHosts()) || null;
103 }
104
105 if (config.hasOwnProperty('sniffedNodesFilterPath')) {
106 self.sniffedNodesFilterPath = config.sniffedNodesFilterPath;
107 } else {
108 self.sniffedNodesFilterPath = [
109 'nodes.*.http.publish_address',
110 'nodes.*.name',
111 'nodes.*.hostname',
112 'nodes.*.host',
113 'nodes.*.version',
114 ].join(',');
115 }
116
117 if (config.sniffOnStart) {
118 self.sniff();
119 }
120
121 if (config.sniffInterval) {
122 self._timeout(function doSniff() {
123 self.sniff();
124 self._timeout(doSniff, config.sniffInterval);
125 }, config.sniffInterval);
126 }
127
128 if (config.sniffOnConnectionFault) {
129 patchSniffOnConnectionFault(self);
130 }
131}
132
133Transport.connectionPools = {
134 main: require('./connection_pool'),
135};
136
137Transport.serializers = require('./serializers');
138
139Transport.nodesToHostCallbacks = {
140 main: require('./nodes_to_host'),
141};
142
143Transport.prototype.defer = function() {
144 if (typeof Promise === 'undefined') {
145 throw new Error(
146 'No Promise implementation found. In order for elasticsearch-js to create promises ' +
147 'either specify the `defer` configuration or include a global Promise shim'
148 );
149 }
150
151 var defer = {};
152 defer.promise = new Promise(function(resolve, reject) {
153 defer.resolve = resolve;
154 defer.reject = reject;
155 });
156 return defer;
157};
158
159/**
160 * Perform a request with the client's transport
161 *
162 * @method request
163 * @todo async body writing
164 * @todo abort
165 * @todo access to custom headers, modifying of request in general
166 * @param {object} params
167 * @param {Number} params.requestTimeout - timeout for the entire request (inculding all retries)
168 * @param {Number} params.maxRetries - number of times to re-run request if the
169 * original node chosen can not be connected to.
170 * @param {string} [params.path="/"] - URL pathname. Do not include query string.
171 * @param {string|object} [params.query] - Query string.
172 * @param {String} params.method - The HTTP method for the request
173 * @param {String} params.body - The body of the HTTP request
174 * @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
175 */
176Transport.prototype.request = function(params, cb) {
177 var self = this;
178 var remainingRetries = this.maxRetries;
179 var requestTimeout = this.requestTimeout;
180
181 var connection; // set in sendReqWithConnection
182 var aborted = false; // several connector will respond with an error when the request is aborted
183 var requestAborter; // an abort function, returned by connection#request()
184 var requestTimeoutId; // the id of the ^timeout
185 var ret; // the object returned to the user, might be a promise
186 var defer; // the defer object, will be set when we are using promises.
187
188 var body = params.body;
189 var headers = !params.headers
190 ? {}
191 : _.transform(params.headers, function(headers, val, name) {
192 headers[String(name).toLowerCase()] = val;
193 });
194
195 self.log.debug('starting request', params);
196
197 // determine the response based on the presense of a callback
198 if (typeof cb === 'function') {
199 // handle callbacks within a domain
200 if (process.domain) {
201 cb = process.domain.bind(cb);
202 }
203 ret = {
204 abort: abortRequest,
205 };
206 } else {
207 defer = this.defer();
208 ret = defer.promise;
209 ret.abort = abortRequest;
210 }
211
212 if (body && params.method === 'GET') {
213 utils.nextTick(
214 respond,
215 new TypeError('Body can not be sent with method "GET"')
216 );
217 return ret;
218 }
219
220 // serialize the body
221 if (body) {
222 var serializer = self.serializer;
223 var serializeFn = serializer[params.bulkBody ? 'bulkBody' : 'serialize'];
224
225 body = serializeFn.call(serializer, body);
226 if (!headers['content-type']) {
227 headers['content-type'] = serializeFn.contentType;
228 }
229 }
230
231 if (params.hasOwnProperty('maxRetries')) {
232 remainingRetries = params.maxRetries;
233 }
234
235 if (params.hasOwnProperty('requestTimeout')) {
236 requestTimeout = params.requestTimeout;
237 }
238
239 const pingRequest = params.path === '/' && params.method === 'HEAD';
240 if (pingRequest) {
241 const requestParam =
242 params.hasOwnProperty('requestTimeout') && params.requestTimeout;
243 requestTimeout = requestParam || this.pingTimeout;
244 }
245
246 params.req = {
247 method: params.method,
248 path: params.path || '/',
249 query: params.query,
250 body: body,
251 headers: headers,
252 };
253
254 function sendReqWithConnection(err, _connection) {
255 if (aborted) {
256 return;
257 }
258
259 if (err) {
260 respond(err);
261 } else if (_connection) {
262 connection = _connection;
263 requestAborter = connection.request(params.req, checkRespForFailure);
264 } else {
265 self.log.warning('No living connections');
266 respond(new errors.NoConnections());
267 }
268 }
269
270 function checkRespForFailure(err, body, status, headers) {
271 if (aborted) {
272 return;
273 }
274
275 requestAborter = void 0;
276
277 if (err instanceof errors.RequestTypeError) {
278 self.log.error('Connection refused to execute the request', err);
279 respond(err, body, status, headers);
280 return;
281 }
282
283 if (err) {
284 connection.setStatus('dead');
285
286 var errMsg = err.message || '';
287
288 errMsg =
289 '\n' +
290 params.req.method +
291 ' ' +
292 connection.host.makeUrl(params.req) +
293 (errMsg.length ? ' => ' : '') +
294 errMsg;
295
296 if (remainingRetries) {
297 remainingRetries--;
298 self.log.error('Request error, retrying' + errMsg);
299 self.connectionPool.select(sendReqWithConnection);
300 } else {
301 self.log.error('Request complete with error' + errMsg);
302 respond(new errors.ConnectionFault(err));
303 }
304 } else {
305 self.log.debug('Request complete');
306 respond(void 0, body, status, headers);
307 }
308 }
309
310 function respond(err, body, status, headers) {
311 if (aborted) {
312 return;
313 }
314
315 self._timeout(requestTimeoutId);
316 var parsedBody;
317 var isJson =
318 !headers ||
319 (headers['content-type'] &&
320 ~headers['content-type'].indexOf('application/json'));
321
322 if (!err && body) {
323 if (isJson) {
324 parsedBody = self.serializer.deserialize(body);
325 if (parsedBody == null) {
326 err = new errors.Serialization();
327 parsedBody = body;
328 }
329 } else {
330 parsedBody = body;
331 }
332 }
333
334 // does the response represent an error?
335 if (
336 (!err || err instanceof errors.Serialization) &&
337 (status < 200 || status >= 300) &&
338 (!params.ignore || !_.includes(params.ignore, status))
339 ) {
340 var errorMetadata = _.pick(params.req, ['path', 'query', 'body']);
341 errorMetadata.statusCode = status;
342 errorMetadata.response = body;
343
344 if (status === 401 && headers && headers['www-authenticate']) {
345 errorMetadata.wwwAuthenticateDirective = headers['www-authenticate'];
346 }
347
348 if (errors[status]) {
349 err = new errors[status](parsedBody && parsedBody.error, errorMetadata);
350 } else {
351 err = new errors.Generic('unknown error', errorMetadata);
352 }
353 }
354
355 // can we cast notfound to false?
356 if (params.castExists) {
357 if (err && err instanceof errors.NotFound) {
358 parsedBody = false;
359 err = void 0;
360 } else {
361 parsedBody = !err;
362 }
363 }
364
365 // how do we send the response?
366 if (typeof cb === 'function') {
367 if (err) {
368 cb(err, parsedBody, status);
369 } else {
370 cb(void 0, parsedBody, status);
371 }
372 } else if (err) {
373 err.body = parsedBody;
374 err.status = status;
375 defer.reject(err);
376 } else {
377 defer.resolve(parsedBody);
378 }
379 }
380
381 function abortRequest() {
382 if (aborted) {
383 return;
384 }
385
386 aborted = true;
387 remainingRetries = 0;
388 self._timeout(requestTimeoutId);
389 if (typeof requestAborter === 'function') {
390 requestAborter();
391 }
392 }
393
394 if (requestTimeout && requestTimeout !== Infinity) {
395 requestTimeoutId = this._timeout(function() {
396 respond(
397 new errors.RequestTimeout(
398 'Request Timeout after ' + requestTimeout + 'ms'
399 )
400 );
401 abortRequest();
402 }, requestTimeout);
403 }
404
405 if (connection) {
406 sendReqWithConnection(void 0, connection);
407 } else {
408 self.connectionPool.select(sendReqWithConnection);
409 }
410
411 return ret;
412};
413
414Transport.prototype._timeout = function(cb, delay) {
415 if (this.closed) return;
416
417 var id;
418 var timers = this._timers || (this._timers = []);
419
420 if (typeof cb !== 'function') {
421 id = cb;
422 cb = void 0;
423 }
424
425 if (cb) {
426 // set the timer
427 id = setTimeout(function() {
428 _.pull(timers, id);
429 cb();
430 }, delay);
431
432 timers.push(id);
433 return id;
434 }
435
436 if (id) {
437 clearTimeout(id);
438
439 var i = this._timers.indexOf(id);
440 if (i !== -1) {
441 this._timers.splice(i, 1);
442 }
443 }
444};
445
446/**
447 * Ask an ES node for a list of all the nodes, add/remove nodes from the connection
448 * pool as appropriate
449 *
450 * @param {Function} cb - Function to call back once complete
451 */
452Transport.prototype.sniff = function(cb) {
453 var self = this;
454 var nodesToHostCallback = this.nodesToHostCallback;
455 var log = this.log;
456 var sniffedNodesProtocol = this.sniffedNodesProtocol;
457 var sniffedNodesFilterPath = this.sniffedNodesFilterPath;
458
459 // make cb a function if it isn't
460 cb = typeof cb === 'function' ? cb : _.noop;
461
462 this.request(
463 {
464 path: this.sniffEndpoint,
465 query: { filter_path: sniffedNodesFilterPath },
466 method: 'GET',
467 },
468 function(err, resp, status) {
469 if (!err && resp && resp.nodes) {
470 var hostsConfigs;
471
472 try {
473 hostsConfigs = nodesToHostCallback(resp.nodes);
474 } catch (e) {
475 log.error(
476 new Error(
477 'Unable to convert node list from ' +
478 self.sniffEndpoint +
479 ' to hosts durring sniff. Encountered error:\n' +
480 (e.stack || e.message)
481 )
482 );
483 return;
484 }
485
486 _.forEach(hostsConfigs, function(hostConfig) {
487 if (sniffedNodesProtocol) hostConfig.protocol = sniffedNodesProtocol;
488 });
489
490 self.setHosts(hostsConfigs);
491 }
492 cb(err, resp, status);
493 }
494 );
495};
496
497/**
498 * Set the host list that the transport should use.
499 *
500 * @param {Array<HostConfig>} hostsConfigs - an array of Hosts, or configuration objects
501 * that will be used to create Host objects.
502 */
503Transport.prototype.setHosts = function(hostsConfigs) {
504 var globalConfig = this._config;
505 this.connectionPool.setHosts(
506 _.map(hostsConfigs, function(conf) {
507 return conf instanceof Host ? conf : new Host(conf, globalConfig);
508 })
509 );
510};
511
512/**
513 * Close the Transport, which closes the logs and connection pool
514 * @return {[type]} [description]
515 */
516Transport.prototype.close = function() {
517 this.log.close();
518 this.closed = true;
519 _.each(this._timers, clearTimeout);
520 this._timers = null;
521 this.connectionPool.close();
522};