UNPKG

11.5 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.Pool = exports.RequestError = exports.ServiceNotAvailableError = void 0;
4const exponential_1 = require("./backoff/exponential");
5const host_1 = require("./host");
6const http = require("http");
7const https = require("https");
8const querystring = require("querystring");
9/**
10 * Status codes that will cause a host to be marked as 'failed' if we get
11 * them from a request to Influx.
12 * @type {Array}
13 */
14const resubmitErrorCodes = [
15 "ETIMEDOUT",
16 "ESOCKETTIMEDOUT",
17 "ECONNRESET",
18 "ECONNREFUSED",
19 "EHOSTUNREACH",
20];
21/**
22 * An ServiceNotAvailableError is returned as an error from requests that
23 * result in a > 500 error code.
24 */
25class ServiceNotAvailableError extends Error {
26 constructor(message) {
27 super();
28 this.message = message;
29 Object.setPrototypeOf(this, ServiceNotAvailableError.prototype);
30 }
31}
32exports.ServiceNotAvailableError = ServiceNotAvailableError;
33/**
34 * An RequestError is returned as an error from requests that
35 * result in a 300 <= error code <= 500.
36 */
37class RequestError extends Error {
38 constructor(req, res, body) {
39 super();
40 this.req = req;
41 this.res = res;
42 this.message = `A ${res.statusCode} ${res.statusMessage} error occurred: ${body}`;
43 Object.setPrototypeOf(this, RequestError.prototype);
44 }
45 static Create(req, res, callback) {
46 let body = "";
47 res.on("data", (str) => {
48 body += str.toString();
49 });
50 res.on("end", () => callback(new RequestError(req, res, body)));
51 }
52}
53exports.RequestError = RequestError;
54/**
55 * Creates a function generation that returns a wrapper which only allows
56 * through the first call of any function that it generated.
57 */
58function doOnce() {
59 let handled = false;
60 return (fn) => {
61 return (arg) => {
62 if (handled) {
63 return;
64 }
65 handled = true;
66 fn(arg);
67 };
68 };
69}
70function setToArray(itemSet) {
71 const output = [];
72 itemSet.forEach((value) => {
73 output.push(value);
74 });
75 return output;
76}
77const request = (options, callback) => {
78 if (options.protocol === "https:") {
79 return https.request(options, callback);
80 }
81 return http.request(options, callback);
82};
83/**
84 *
85 * The Pool maintains a list available Influx hosts and dispatches requests
86 * to them. If there are errors connecting to hosts, it will disable that
87 * host for a period of time.
88 */
89class Pool {
90 /**
91 * Creates a new Pool instance.
92 * @param {IPoolOptions} options
93 */
94 constructor(options) {
95 this._options = Object.assign({ backoff: new exponential_1.ExponentialBackoff({
96 initial: 300,
97 max: 10 * 1000,
98 random: 1,
99 }), maxRetries: 2, requestTimeout: 30 * 1000 }, options);
100 this._index = 0;
101 this._hostsAvailable = new Set();
102 this._hostsDisabled = new Set();
103 this._timeout = this._options.requestTimeout;
104 }
105 /**
106 * Returns a list of currently active hosts.
107 * @return {Host[]}
108 */
109 getHostsAvailable() {
110 return setToArray(this._hostsAvailable);
111 }
112 /**
113 * Returns a list of hosts that are currently disabled due to network
114 * errors.
115 * @return {Host[]}
116 */
117 getHostsDisabled() {
118 return setToArray(this._hostsDisabled);
119 }
120 /**
121 * Inserts a new host to the pool.
122 */
123 addHost(url, options = {}) {
124 const host = new host_1.Host(url, this._options.backoff.reset(), options);
125 this._hostsAvailable.add(host);
126 return host;
127 }
128 /**
129 * Returns true if there's any host available to by queried.
130 * @return {Boolean}
131 */
132 hostIsAvailable() {
133 return this._hostsAvailable.size > 0;
134 }
135 /**
136 * Makes a request and calls back with the response, parsed as JSON.
137 * An error is returned on a non-2xx status code or on a parsing exception.
138 */
139 json(options) {
140 return this.text(options).then((res) => JSON.parse(res));
141 }
142 /**
143 * Makes a request and resolves with the plain text response,
144 * if possible. An error is raised on a non-2xx status code.
145 */
146 text(options) {
147 return new Promise((resolve, reject) => {
148 this.stream(options, (err, res) => {
149 if (err) {
150 return reject(err);
151 }
152 let output = "";
153 res.on("data", (str) => {
154 output += str.toString();
155 });
156 res.on("end", () => resolve(output));
157 });
158 });
159 }
160 /**
161 * Makes a request and discards any response body it receives.
162 * An error is returned on a non-2xx status code.
163 */
164 discard(options) {
165 return new Promise((resolve, reject) => {
166 this.stream(options, (err, res) => {
167 if (err) {
168 return reject(err);
169 }
170 res.on("data", () => {
171 /* ignore */
172 });
173 res.on("end", () => resolve());
174 });
175 });
176 }
177 /**
178 * Ping sends out a request to all available Influx servers, reporting on
179 * their response time and version number.
180 */
181 ping(timeout, path = "/ping", auth = undefined) {
182 const todo = [];
183 setToArray(this._hostsAvailable)
184 .concat(setToArray(this._hostsDisabled))
185 .forEach((host) => {
186 const start = Date.now();
187 const url = host.url;
188 const once = doOnce();
189 return todo.push(new Promise((resolve) => {
190 const headers = {};
191 if (typeof auth !== "undefined") {
192 const encodedAuth = Buffer.from(auth).toString("base64");
193 headers["Authorization"] = `Basic ${encodedAuth}`;
194 }
195 const req = request(Object.assign({ hostname: url.hostname, method: "GET", path, port: Number(url.port), protocol: url.protocol, timeout, headers: headers }, host.options), once((res) => {
196 resolve({
197 url,
198 res: res.resume(),
199 online: res.statusCode < 300,
200 rtt: Date.now() - start,
201 version: res.headers["x-influxdb-version"],
202 });
203 }));
204 const fail = once(() => {
205 req.abort();
206 resolve({
207 online: false,
208 res: null,
209 rtt: Infinity,
210 url,
211 version: null,
212 });
213 });
214 // Support older Nodes and polyfills which don't allow .timeout() in
215 // the request options, wrapped in a conditional for even worse
216 // polyfills. See: https://github.com/node-influx/node-influx/issues/221
217 if (typeof req.setTimeout === "function") {
218 req.setTimeout(timeout, () => {
219 fail.call(fail, arguments);
220 }); // Tslint:disable-line
221 }
222 req.on("timeout", fail);
223 req.on("error", fail);
224 req.end();
225 }));
226 });
227 return Promise.all(todo);
228 }
229 /**
230 * Makes a request and calls back with the IncomingMessage stream,
231 * if possible. An error is returned on a non-2xx status code.
232 */
233 stream(options, callback) {
234 if (!this.hostIsAvailable()) {
235 return callback(new ServiceNotAvailableError("No host available"), null);
236 }
237 const once = doOnce();
238 const host = this._getHost();
239 let path = host.url.pathname === "/" ? "" : host.url.pathname;
240 path += options.path;
241 if (options.query) {
242 path += "?" + querystring.stringify(options.query);
243 }
244 const req = request(Object.assign({ headers: {
245 "content-length": options.body ? Buffer.from(options.body).length : 0,
246 }, hostname: host.url.hostname, method: options.method, path, port: Number(host.url.port), protocol: host.url.protocol, timeout: this._timeout }, host.options), once((res) => {
247 res.setEncoding("utf8");
248 if (res.statusCode >= 500) {
249 res.on("data", () => {
250 /* ignore */
251 });
252 res.on("end", () => {
253 return this._handleRequestError(new ServiceNotAvailableError(res.statusMessage), host, options, callback);
254 });
255 return;
256 }
257 if (res.statusCode >= 300) {
258 return RequestError.Create(req, res, (err) => callback(err, res));
259 }
260 host.success();
261 return callback(undefined, res);
262 }));
263 // Handle network or HTTP parsing errors:
264 req.on("error", once((err) => {
265 this._handleRequestError(err, host, options, callback);
266 }));
267 // Handle timeouts:
268 req.on("timeout", once(() => {
269 req.abort();
270 this._handleRequestError(new ServiceNotAvailableError("Request timed out"), host, options, callback);
271 }));
272 // Support older Nodes and polyfills which don't allow .timeout() in the
273 // request options, wrapped in a conditional for even worse polyfills. See:
274 // https://github.com/node-influx/node-influx/issues/221
275 if (typeof req.setTimeout === "function") {
276 req.setTimeout(host.options.timeout || this._timeout); // Tslint:disable-line
277 }
278 // Write out the body:
279 if (options.body) {
280 req.write(options.body);
281 }
282 req.end();
283 }
284 /**
285 * Returns the next available host for querying.
286 * @return {Host}
287 */
288 _getHost() {
289 const available = setToArray(this._hostsAvailable);
290 const host = available[this._index];
291 this._index = (this._index + 1) % available.length;
292 return host;
293 }
294 /**
295 * Re-enables the provided host, returning it to the pool to query.
296 * @param {Host} host
297 */
298 _enableHost(host) {
299 this._hostsDisabled.delete(host);
300 this._hostsAvailable.add(host);
301 }
302 /**
303 * Disables the provided host, removing it from the query pool. It will be
304 * re-enabled after a backoff interval
305 */
306 _disableHost(host) {
307 const delay = host.fail();
308 if (delay > 0) {
309 this._hostsAvailable.delete(host);
310 this._hostsDisabled.add(host);
311 this._index %= Math.max(1, this._hostsAvailable.size);
312 setTimeout(() => this._enableHost(host), delay);
313 }
314 }
315 _handleRequestError(err, host, options, callback) {
316 if (!(err instanceof ServiceNotAvailableError) &&
317 !resubmitErrorCodes.includes(err.code)) {
318 return callback(err, null);
319 }
320 this._disableHost(host);
321 const retries = options.retries || 0;
322 if (retries < this._options.maxRetries && this.hostIsAvailable()) {
323 options.retries = retries + 1;
324 return this.stream(options, callback);
325 }
326 callback(err, null);
327 }
328}
329exports.Pool = Pool;