1 |
|
2 |
|
3 |
|
4 |
|
5 | module.exports = Transport;
|
6 |
|
7 | var _ = require('lodash');
|
8 | var utils = require('./utils');
|
9 | var errors = require('./errors');
|
10 | var Host = require('./host');
|
11 | var patchSniffOnConnectionFault = require('./transport/sniff_on_connection_fault');
|
12 | var findCommonProtocol = require('./transport/find_common_protocol');
|
13 |
|
14 | function Transport(config) {
|
15 | var self = this;
|
16 | config = self._config = config || {};
|
17 |
|
18 | var LogClass =
|
19 | typeof config.log === 'function' ? config.log : require('./log');
|
20 | config.log = self.log = new LogClass(config);
|
21 |
|
22 |
|
23 | var ConnectionPool = utils.funcEnum(
|
24 | config,
|
25 | 'connectionPool',
|
26 | Transport.connectionPools,
|
27 | 'main'
|
28 | );
|
29 | self.connectionPool = new ConnectionPool(config);
|
30 |
|
31 |
|
32 | var Serializer = utils.funcEnum(
|
33 | config,
|
34 | 'serializer',
|
35 | Transport.serializers,
|
36 | 'json'
|
37 | );
|
38 | self.serializer = new Serializer(config);
|
39 |
|
40 |
|
41 | self.nodesToHostCallback = utils.funcEnum(
|
42 | config,
|
43 | 'nodesToHostCallback',
|
44 | Transport.nodesToHostCallbacks,
|
45 | 'main'
|
46 | );
|
47 |
|
48 |
|
49 | self.maxRetries = config.hasOwnProperty('maxRetries') ? config.maxRetries : 3;
|
50 |
|
51 |
|
52 | self.sniffEndpoint = config.hasOwnProperty('sniffEndpoint')
|
53 | ? config.sniffEndpoint
|
54 | : '/_nodes/_all/http';
|
55 |
|
56 |
|
57 | self.requestTimeout = config.hasOwnProperty('requestTimeout')
|
58 | ? config.requestTimeout
|
59 | : 30000;
|
60 | self.pingTimeout = config.hasOwnProperty('pingTimeout')
|
61 | ? config.pingTimeout
|
62 | : 3000;
|
63 |
|
64 | if (config.hasOwnProperty('defer')) {
|
65 | self.defer = config.defer;
|
66 | }
|
67 |
|
68 |
|
69 | var randomizeHosts = config.hasOwnProperty('randomizeHosts')
|
70 | ? !!config.randomizeHosts
|
71 | : true;
|
72 |
|
73 | if (config.host) {
|
74 | config.hosts = config.host;
|
75 | }
|
76 |
|
77 | if (config.hosts) {
|
78 | var hostsConfig = utils.createArray(config.hosts, function(val) {
|
79 | if (_.isPlainObject(val) || _.isString(val) || val instanceof Host) {
|
80 | return val;
|
81 | }
|
82 | });
|
83 |
|
84 | if (!hostsConfig) {
|
85 | throw new TypeError(
|
86 | 'Invalid hosts config. Expected a URL, an array of urls, a host config object, ' +
|
87 | 'or an array of host config objects.'
|
88 | );
|
89 | }
|
90 |
|
91 | if (randomizeHosts) {
|
92 | hostsConfig = _.shuffle(hostsConfig);
|
93 | }
|
94 |
|
95 | self.setHosts(hostsConfig);
|
96 | }
|
97 |
|
98 | if (config.hasOwnProperty('sniffedNodesProtocol')) {
|
99 | self.sniffedNodesProtocol = config.sniffedNodesProtocol || null;
|
100 | } else {
|
101 | self.sniffedNodesProtocol =
|
102 | findCommonProtocol(self.connectionPool.getAllHosts()) || null;
|
103 | }
|
104 |
|
105 | if (config.hasOwnProperty('sniffedNodesFilterPath')) {
|
106 | self.sniffedNodesFilterPath = config.sniffedNodesFilterPath;
|
107 | } else {
|
108 | self.sniffedNodesFilterPath = [
|
109 | 'nodes.*.http.publish_address',
|
110 | 'nodes.*.name',
|
111 | 'nodes.*.hostname',
|
112 | 'nodes.*.host',
|
113 | 'nodes.*.version',
|
114 | ].join(',');
|
115 | }
|
116 |
|
117 | if (config.sniffOnStart) {
|
118 | self.sniff();
|
119 | }
|
120 |
|
121 | if (config.sniffInterval) {
|
122 | self._timeout(function doSniff() {
|
123 | self.sniff();
|
124 | self._timeout(doSniff, config.sniffInterval);
|
125 | }, config.sniffInterval);
|
126 | }
|
127 |
|
128 | if (config.sniffOnConnectionFault) {
|
129 | patchSniffOnConnectionFault(self);
|
130 | }
|
131 | }
|
132 |
|
133 | Transport.connectionPools = {
|
134 | main: require('./connection_pool'),
|
135 | };
|
136 |
|
137 | Transport.serializers = require('./serializers');
|
138 |
|
139 | Transport.nodesToHostCallbacks = {
|
140 | main: require('./nodes_to_host'),
|
141 | };
|
142 |
|
143 | Transport.prototype.defer = function() {
|
144 | if (typeof Promise === 'undefined') {
|
145 | throw new Error(
|
146 | 'No Promise implementation found. In order for elasticsearch-js to create promises ' +
|
147 | 'either specify the `defer` configuration or include a global Promise shim'
|
148 | );
|
149 | }
|
150 |
|
151 | var defer = {};
|
152 | defer.promise = new Promise(function(resolve, reject) {
|
153 | defer.resolve = resolve;
|
154 | defer.reject = reject;
|
155 | });
|
156 | return defer;
|
157 | };
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 | Transport.prototype.request = function(params, cb) {
|
177 | var self = this;
|
178 | var remainingRetries = this.maxRetries;
|
179 | var requestTimeout = this.requestTimeout;
|
180 |
|
181 | var connection;
|
182 | var aborted = false;
|
183 | var requestAborter;
|
184 | var requestTimeoutId;
|
185 | var ret;
|
186 | var defer;
|
187 |
|
188 | var body = params.body;
|
189 | var headers = !params.headers
|
190 | ? {}
|
191 | : _.transform(params.headers, function(headers, val, name) {
|
192 | headers[String(name).toLowerCase()] = val;
|
193 | });
|
194 |
|
195 | self.log.debug('starting request', params);
|
196 |
|
197 |
|
198 | if (typeof cb === 'function') {
|
199 |
|
200 | if (process.domain) {
|
201 | cb = process.domain.bind(cb);
|
202 | }
|
203 | ret = {
|
204 | abort: abortRequest,
|
205 | };
|
206 | } else {
|
207 | defer = this.defer();
|
208 | ret = defer.promise;
|
209 | ret.abort = abortRequest;
|
210 | }
|
211 |
|
212 | if (body && params.method === 'GET') {
|
213 | utils.nextTick(
|
214 | respond,
|
215 | new TypeError('Body can not be sent with method "GET"')
|
216 | );
|
217 | return ret;
|
218 | }
|
219 |
|
220 |
|
221 | if (body) {
|
222 | var serializer = self.serializer;
|
223 | var serializeFn = serializer[params.bulkBody ? 'bulkBody' : 'serialize'];
|
224 |
|
225 | body = serializeFn.call(serializer, body);
|
226 | if (!headers['content-type']) {
|
227 | headers['content-type'] = serializeFn.contentType;
|
228 | }
|
229 | }
|
230 |
|
231 | if (params.hasOwnProperty('maxRetries')) {
|
232 | remainingRetries = params.maxRetries;
|
233 | }
|
234 |
|
235 | if (params.hasOwnProperty('requestTimeout')) {
|
236 | requestTimeout = params.requestTimeout;
|
237 | }
|
238 |
|
239 | const pingRequest = params.path === '/' && params.method === 'HEAD';
|
240 | if (pingRequest) {
|
241 | const requestParam =
|
242 | params.hasOwnProperty('requestTimeout') && params.requestTimeout;
|
243 | requestTimeout = requestParam || this.pingTimeout;
|
244 | }
|
245 |
|
246 | params.req = {
|
247 | method: params.method,
|
248 | path: params.path || '/',
|
249 | query: params.query,
|
250 | body: body,
|
251 | headers: headers,
|
252 | };
|
253 |
|
254 | function sendReqWithConnection(err, _connection) {
|
255 | if (aborted) {
|
256 | return;
|
257 | }
|
258 |
|
259 | if (err) {
|
260 | respond(err);
|
261 | } else if (_connection) {
|
262 | connection = _connection;
|
263 | requestAborter = connection.request(params.req, checkRespForFailure);
|
264 | } else {
|
265 | self.log.warning('No living connections');
|
266 | respond(new errors.NoConnections());
|
267 | }
|
268 | }
|
269 |
|
270 | function checkRespForFailure(err, body, status, headers) {
|
271 | if (aborted) {
|
272 | return;
|
273 | }
|
274 |
|
275 | requestAborter = void 0;
|
276 |
|
277 | if (err instanceof errors.RequestTypeError) {
|
278 | self.log.error('Connection refused to execute the request', err);
|
279 | respond(err, body, status, headers);
|
280 | return;
|
281 | }
|
282 |
|
283 | if (err) {
|
284 | connection.setStatus('dead');
|
285 |
|
286 | var errMsg = err.message || '';
|
287 |
|
288 | errMsg =
|
289 | '\n' +
|
290 | params.req.method +
|
291 | ' ' +
|
292 | connection.host.makeUrl(params.req) +
|
293 | (errMsg.length ? ' => ' : '') +
|
294 | errMsg;
|
295 |
|
296 | if (remainingRetries) {
|
297 | remainingRetries--;
|
298 | self.log.error('Request error, retrying' + errMsg);
|
299 | self.connectionPool.select(sendReqWithConnection);
|
300 | } else {
|
301 | self.log.error('Request complete with error' + errMsg);
|
302 | respond(new errors.ConnectionFault(err));
|
303 | }
|
304 | } else {
|
305 | self.log.debug('Request complete');
|
306 | respond(void 0, body, status, headers);
|
307 | }
|
308 | }
|
309 |
|
310 | function respond(err, body, status, headers) {
|
311 | if (aborted) {
|
312 | return;
|
313 | }
|
314 |
|
315 | self._timeout(requestTimeoutId);
|
316 | var parsedBody;
|
317 | var isJson =
|
318 | !headers ||
|
319 | (headers['content-type'] &&
|
320 | ~headers['content-type'].indexOf('application/json'));
|
321 |
|
322 | if (!err && body) {
|
323 | if (isJson) {
|
324 | parsedBody = self.serializer.deserialize(body);
|
325 | if (parsedBody == null) {
|
326 | err = new errors.Serialization();
|
327 | parsedBody = body;
|
328 | }
|
329 | } else {
|
330 | parsedBody = body;
|
331 | }
|
332 | }
|
333 |
|
334 |
|
335 | if (
|
336 | (!err || err instanceof errors.Serialization) &&
|
337 | (status < 200 || status >= 300) &&
|
338 | (!params.ignore || !_.includes(params.ignore, status))
|
339 | ) {
|
340 | var errorMetadata = _.pick(params.req, ['path', 'query', 'body']);
|
341 | errorMetadata.statusCode = status;
|
342 | errorMetadata.response = body;
|
343 |
|
344 | if (status === 401 && headers && headers['www-authenticate']) {
|
345 | errorMetadata.wwwAuthenticateDirective = headers['www-authenticate'];
|
346 | }
|
347 |
|
348 | if (errors[status]) {
|
349 | err = new errors[status](parsedBody && parsedBody.error, errorMetadata);
|
350 | } else {
|
351 | err = new errors.Generic('unknown error', errorMetadata);
|
352 | }
|
353 | }
|
354 |
|
355 |
|
356 | if (params.castExists) {
|
357 | if (err && err instanceof errors.NotFound) {
|
358 | parsedBody = false;
|
359 | err = void 0;
|
360 | } else {
|
361 | parsedBody = !err;
|
362 | }
|
363 | }
|
364 |
|
365 |
|
366 | if (typeof cb === 'function') {
|
367 | if (err) {
|
368 | cb(err, parsedBody, status);
|
369 | } else {
|
370 | cb(void 0, parsedBody, status);
|
371 | }
|
372 | } else if (err) {
|
373 | err.body = parsedBody;
|
374 | err.status = status;
|
375 | defer.reject(err);
|
376 | } else {
|
377 | defer.resolve(parsedBody);
|
378 | }
|
379 | }
|
380 |
|
381 | function abortRequest() {
|
382 | if (aborted) {
|
383 | return;
|
384 | }
|
385 |
|
386 | aborted = true;
|
387 | remainingRetries = 0;
|
388 | self._timeout(requestTimeoutId);
|
389 | if (typeof requestAborter === 'function') {
|
390 | requestAborter();
|
391 | }
|
392 | }
|
393 |
|
394 | if (requestTimeout && requestTimeout !== Infinity) {
|
395 | requestTimeoutId = this._timeout(function() {
|
396 | respond(
|
397 | new errors.RequestTimeout(
|
398 | 'Request Timeout after ' + requestTimeout + 'ms'
|
399 | )
|
400 | );
|
401 | abortRequest();
|
402 | }, requestTimeout);
|
403 | }
|
404 |
|
405 | if (connection) {
|
406 | sendReqWithConnection(void 0, connection);
|
407 | } else {
|
408 | self.connectionPool.select(sendReqWithConnection);
|
409 | }
|
410 |
|
411 | return ret;
|
412 | };
|
413 |
|
414 | Transport.prototype._timeout = function(cb, delay) {
|
415 | if (this.closed) return;
|
416 |
|
417 | var id;
|
418 | var timers = this._timers || (this._timers = []);
|
419 |
|
420 | if (typeof cb !== 'function') {
|
421 | id = cb;
|
422 | cb = void 0;
|
423 | }
|
424 |
|
425 | if (cb) {
|
426 |
|
427 | id = setTimeout(function() {
|
428 | _.pull(timers, id);
|
429 | cb();
|
430 | }, delay);
|
431 |
|
432 | timers.push(id);
|
433 | return id;
|
434 | }
|
435 |
|
436 | if (id) {
|
437 | clearTimeout(id);
|
438 |
|
439 | var i = this._timers.indexOf(id);
|
440 | if (i !== -1) {
|
441 | this._timers.splice(i, 1);
|
442 | }
|
443 | }
|
444 | };
|
445 |
|
446 |
|
447 |
|
448 |
|
449 |
|
450 |
|
451 |
|
452 | Transport.prototype.sniff = function(cb) {
|
453 | var self = this;
|
454 | var nodesToHostCallback = this.nodesToHostCallback;
|
455 | var log = this.log;
|
456 | var sniffedNodesProtocol = this.sniffedNodesProtocol;
|
457 | var sniffedNodesFilterPath = this.sniffedNodesFilterPath;
|
458 |
|
459 |
|
460 | cb = typeof cb === 'function' ? cb : _.noop;
|
461 |
|
462 | this.request(
|
463 | {
|
464 | path: this.sniffEndpoint,
|
465 | query: { filter_path: sniffedNodesFilterPath },
|
466 | method: 'GET',
|
467 | },
|
468 | function(err, resp, status) {
|
469 | if (!err && resp && resp.nodes) {
|
470 | var hostsConfigs;
|
471 |
|
472 | try {
|
473 | hostsConfigs = nodesToHostCallback(resp.nodes);
|
474 | } catch (e) {
|
475 | log.error(
|
476 | new Error(
|
477 | 'Unable to convert node list from ' +
|
478 | self.sniffEndpoint +
|
479 | ' to hosts durring sniff. Encountered error:\n' +
|
480 | (e.stack || e.message)
|
481 | )
|
482 | );
|
483 | return;
|
484 | }
|
485 |
|
486 | _.forEach(hostsConfigs, function(hostConfig) {
|
487 | if (sniffedNodesProtocol) hostConfig.protocol = sniffedNodesProtocol;
|
488 | });
|
489 |
|
490 | self.setHosts(hostsConfigs);
|
491 | }
|
492 | cb(err, resp, status);
|
493 | }
|
494 | );
|
495 | };
|
496 |
|
497 |
|
498 |
|
499 |
|
500 |
|
501 |
|
502 |
|
503 | Transport.prototype.setHosts = function(hostsConfigs) {
|
504 | var globalConfig = this._config;
|
505 | this.connectionPool.setHosts(
|
506 | _.map(hostsConfigs, function(conf) {
|
507 | return conf instanceof Host ? conf : new Host(conf, globalConfig);
|
508 | })
|
509 | );
|
510 | };
|
511 |
|
512 |
|
513 |
|
514 |
|
515 |
|
516 | Transport.prototype.close = function() {
|
517 | this.log.close();
|
518 | this.closed = true;
|
519 | _.each(this._timers, clearTimeout);
|
520 | this._timers = null;
|
521 | this.connectionPool.close();
|
522 | };
|