UNPKG

4.09 kBJavaScriptView Raw
1const _ = require('lodash');
2const Promise = require('bluebird');
3const elasticsearch = require('elasticsearch');
4const sr = require('sync-request');
5const bunyan = require('bunyan');
6const PrettyStream = require('bunyan-prettystream');
7const semver = require('semver');
8const config = require('../config');
9
10const prettyStdOut = new PrettyStream({mode: 'dev'});
11prettyStdOut.pipe(process.stdout);
12
13const LogToBunyan = function () {
14 const self = this;
15 const bun = bunyan.createLogger({
16 name: `${config.FRAMEWORK_NAME}-es`,
17 type: 'raw',
18 level: config.elasticsearch.logLevel,
19 stream: prettyStdOut
20 });
21
22 self.error = bun.error.bind(bun);
23 self.warning = bun.warn.bind(bun);
24 self.info = bun.info.bind(bun);
25 self.debug = bun.debug.bind(bun);
26 self.trace = (method, requestUrl, body, responseBody, responseStatus) => {
27 bun.trace({
28 method: method,
29 requestUrl: requestUrl,
30 body: body,
31 responseBody: responseBody,
32 responseStatus: responseStatus
33 });
34 };
35 self.close = () => {
36 };
37};
38
39const DEFAULT_ELASTICSEARCH_PORT = 9200;
40const SUPPORTED_API_VERSION = _.keys(elasticsearch.Client.apis);
41
42const createEsClient = (hostConfig) => {
43 let host = hostConfig.host || 'localhost';
44 const port = hostConfig.port || DEFAULT_ELASTICSEARCH_PORT;
45
46 const protocol = (host.startsWith('https') || port === 443) ? 'https' : 'http';
47 host = host.replace('https://', '').replace('http://', '');
48
49 let path = hostConfig.path || '/';
50 if (!path.startsWith('/')) {
51 path = `/${path}`;
52 }
53
54 const headers = {};
55 if (process.env.AUTH_TOKEN) {
56 headers['Authorization'] = process.env.AUTH_TOKEN;
57 }
58
59 const uri = `${protocol}://${host}:${port}${path}`;
60 let apiVersion = null;
61
62 let currentLatestMajorVersion = 0;
63 try {
64 for (const v of SUPPORTED_API_VERSION) {
65 try {
66 const majorVersion = semver.major(`${v}.0`);
67 if (_.isNumber(majorVersion) && majorVersion > currentLatestMajorVersion) {
68 currentLatestMajorVersion = majorVersion;
69 }
70 } catch (e) {
71 config.log.trace(`${e}`);
72 }
73 }
74
75 /**
76 * NOTE: The below section should be removed and replaced with an async implementation.
77 * This is blocking the main thread while it waits for the result of the API check, meaning nothing else
78 * can run.
79 */
80 const results = sr('GET', uri, {
81 maxRetries: 5,
82 retry: true,
83 timeout: 5000,
84 headers
85 });
86
87 const version = JSON.parse(results.getBody('utf8')).version.number;
88 const majorVersion = semver.major(version);
89 let minorVersion = semver.minor(version);
90
91 while (true) { // eslint-disable-line no-constant-condition
92 apiVersion = `${majorVersion}.${minorVersion}`;
93 if (SUPPORTED_API_VERSION.includes(apiVersion)) {
94 config.log.info(`${apiVersion} supported`);
95 break;
96 } else {
97 if (majorVersion === currentLatestMajorVersion) {
98 minorVersion = 'x';
99 } else {
100 config.log.info(`${apiVersion} is not supported version for this ES client, incrementing minor version`);
101 minorVersion++;
102 }
103 }
104 }
105 } catch (e) {
106 config.log.error(e);
107 }
108
109 if (!apiVersion) {
110 throw new Error(`unable to connect to '${uri}' to get es version`);
111 }
112
113 return new elasticsearch.Client({
114 host: {host, port, protocol, path, headers},
115 apiVersion: apiVersion,
116 suggestCompression: true,
117 log: LogToBunyan,
118 defer: function () {
119 let resolve = null;
120 let reject = null;
121
122 const promise = new Promise((res, rej) => {
123 resolve = res;
124 reject = rej;
125 });
126 return {
127 resolve: resolve,
128 reject: reject,
129 promise: promise
130 };
131 },
132 maxRetries: 3,
133 requestTimeout: 240000,
134 pingTimeout: 240000,
135 deadTimeout: 240000,
136 keepAlive: false
137 });
138};
139
140module.exports = createEsClient;
\No newline at end of file