UNPKG

15.3 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17'use strict';
18
19const os = require('os');
20const path = require('path');
21const fs = require('fs');
22const utils = require('./utils');
23const promiseUtils = require('./promise-utils');
24const types = require('./types');
25const requests = require('./requests');
26const { ExecutionOptions } = require('./execution-options');
27const packageInfo = require('../package.json');
28const VersionNumber = require('./types/version-number');
29const { NoAuthProvider } = require('./auth');
30
31let kerberosModule;
32
33try {
34 // eslint-disable-next-line
35 kerberosModule = require('kerberos');
36}
37catch (err) {
38 // Kerberos is an optional dependency
39}
40
41const minDse6Version = new VersionNumber(6, 0, 5);
42const minDse51Version = new VersionNumber(5, 1, 13);
43const dse600Version = new VersionNumber(6, 0, 0);
44const rpc = "CALL InsightsRpc.reportInsight(?)";
45const maxStatusErrorLogs = 5;
46
47/**
48 * Contains methods and functionality to send events to DSE Insights.
49 */
50class InsightsClient {
51
52 /**
53 * Creates a new instance of the {@link InsightsClient} using the driver {@link Client}.
54 * @param {Client} client
55 * @param {Object} [options]
56 * @param {Number} [options.statusEventDelay]
57 * @param {Function} [options.errorCallback]
58 */
59 constructor(client, options) {
60 this._client = client;
61 this._sessionId = types.Uuid.random().toString();
62 this._enabled = false;
63 this._closed = false;
64 this._firstTimeout = null;
65 this._recurrentTimeout = null;
66 this._statusErrorLogs = 0;
67
68 options = options || {};
69
70 this._statusEventDelay = options.statusEventDelay || 300000;
71 this._errorCallback = options.errorCallback || utils.noop;
72 }
73
74 /**
75 * Initializes the insights client in the background by sending the startup event and scheduling status events at
76 * regular intervals.
77 * @returns {undefined}
78 */
79 init() {
80 this._enabled = this._client.options.monitorReporting.enabled && this._dseSupportsInsights();
81 if (!this._enabled) {
82 return;
83 }
84
85 promiseUtils.toBackground(this._init());
86 }
87
88 async _init() {
89 try {
90 await this._sendStartupEvent();
91
92 if (this._closed) {
93 // The client was shutdown
94 return;
95 }
96
97 // Send the status event the first time with a delay containing some random portion
98 // Initial delay should be statusEventDelay - (0 to 10%)
99 const firstDelay = Math.floor(this._statusEventDelay - 0.1 * this._statusEventDelay * Math.random());
100 // Schedule the first timer
101 this._firstTimeout = setTimeout(() => {
102 // Send the first status event, the promise will never be rejected
103 this._sendStatusEvent();
104 // The following status events are sent at regular intervals
105 this._recurrentTimeout = setInterval(() => this._sendStatusEvent(), this._statusEventDelay);
106 }, firstDelay);
107 } catch (err) {
108 if (this._closed) {
109 // Sending failed because the Client was shutdown
110 return;
111 }
112 // We shouldn't try to recover
113 this._client.log('verbose', `Insights startup message could not be sent (${err})`, err);
114 this._errorCallback(err);
115 }
116 }
117
118 /**
119 * Sends the startup event.
120 * @returns {Promise}
121 * @private
122 */
123 async _sendStartupEvent() {
124 const message = await this._getStartupMessage();
125 const request = new requests.QueryRequest(rpc, [message], ExecutionOptions.empty());
126 await this._client.controlConnection.query(request, false);
127 }
128
129 /**
130 * Sends the status event.
131 * @returns {Promise} A promise that is never rejected.
132 * @private
133 */
134 async _sendStatusEvent() {
135 const request = new requests.QueryRequest(rpc, [ this._getStatusEvent() ], ExecutionOptions.empty());
136
137 try {
138 await this._client.controlConnection.query(request, false);
139 } catch (err) {
140 if (this._closed) {
141 // Sending failed because the Client was shutdown
142 return;
143 }
144
145 if (this._statusErrorLogs < maxStatusErrorLogs) {
146 this._client.log('warning', `Insights status message could not be sent (${err})`, err);
147 this._statusErrorLogs++;
148 }
149
150 this._errorCallback(err);
151 }
152 }
153
154 /**
155 * Validates the minimum server version for all nodes in the cluster.
156 * @private
157 */
158 _dseSupportsInsights() {
159 if (this._client.hosts.length === 0) {
160 return false;
161 }
162
163 return this._client.hosts.values().reduce((acc, host) => {
164 if (!acc) {
165 return acc;
166 }
167
168 const versionArr = host.getDseVersion();
169
170 if (versionArr.length === 0) {
171 return false;
172 }
173
174 const version = new VersionNumber(...versionArr);
175
176 return version.compare(minDse6Version) >= 0 ||
177 (version.compare(dse600Version) < 0 && version.compare(minDse51Version) >= 0);
178
179 }, true);
180 }
181
182 /**
183 * @returns {Promise<String>} Returns a json string with the startup message.
184 * @private
185 */
186 async _getStartupMessage() {
187 const cc = this._client.controlConnection;
188 const options = this._client.options;
189
190
191 const appInfo = await this._getAppInfo(options);
192 const message = {
193 metadata: {
194 name: 'driver.startup',
195 insightMappingId: 'v1',
196 insightType: 'EVENT',
197 timestamp: Date.now(),
198 tags: { language: 'nodejs' }
199 },
200 data: {
201 driverName: packageInfo.description,
202 driverVersion: packageInfo.version,
203 clientId: options.id,
204 sessionId: this._sessionId,
205 applicationName: appInfo.applicationName,
206 applicationVersion: appInfo.applicationVersion,
207 applicationNameWasGenerated: appInfo.applicationNameWasGenerated,
208 contactPoints: mapToObject(cc.getResolvedContactPoints()),
209 dataCenters: this._getDataCenters(),
210 initialControlConnection: cc.host ? cc.host.address : undefined,
211 protocolVersion: cc.protocolVersion,
212 localAddress: cc.getLocalAddress(),
213 hostName: os.hostname(),
214 executionProfiles: getExecutionProfiles(this._client),
215 poolSizeByHostDistance: {
216 local: options.pooling.coreConnectionsPerHost[types.distance.local],
217 remote: options.pooling.coreConnectionsPerHost[types.distance.remote]
218 },
219 heartbeatInterval: options.pooling.heartBeatInterval,
220 compression: 'NONE',
221 reconnectionPolicy: getPolicyInfo(options.policies.reconnection),
222 ssl: {
223 enabled: !!options.sslOptions,
224 certValidation: options.sslOptions ? !!options.sslOptions.rejectUnauthorized : undefined
225 },
226 authProvider: {
227 type: !(options.authProvider instanceof NoAuthProvider) ? getConstructor(options.authProvider) : undefined,
228 },
229 otherOptions: {
230 coalescingThreshold: options.socketOptions.coalescingThreshold,
231 },
232 platformInfo: {
233 os: {
234 name: os.platform(),
235 version: os.release(),
236 arch: os.arch()
237 },
238 cpus: {
239 length: os.cpus().length,
240 model: os.cpus()[0].model
241 },
242 runtime: {
243 node: process.versions['node'],
244 v8: process.versions['v8'],
245 uv: process.versions['uv'],
246 openssl: process.versions['openssl'],
247 kerberos: kerberosModule ? kerberosModule.version : undefined
248 }
249 },
250 configAntiPatterns: this._getConfigAntiPatterns(),
251 periodicStatusInterval: Math.floor(this._statusEventDelay / 1000)
252 }
253 };
254
255 return JSON.stringify(message);
256 }
257
258 _getConfigAntiPatterns() {
259 const options = this._client.options;
260 const result = {};
261
262 if (options.sslOptions && !options.sslOptions.rejectUnauthorized) {
263 result.sslWithoutCertValidation =
264 'Client-to-node encryption is enabled but server certificate validation is disabled';
265 }
266
267 return result;
268 }
269
270 /**
271 * Gets an array of data centers the driver connects to.
272 * Whether the driver connects to a certain host is determined by the host distance (local and remote hosts)
273 * and the pooling options (whether connection length for remote hosts is greater than 0).
274 * @returns {Array}
275 * @private
276 */
277 _getDataCenters() {
278 const remoteConnectionsLength = this._client.options.pooling.coreConnectionsPerHost[types.distance.remote];
279 const dataCenters = new Set();
280
281 this._client.hosts.values().forEach(h => {
282 const distance = this._client.profileManager.getDistance(h);
283 if (distance === types.distance.local || (distance === types.distance.remote && remoteConnectionsLength > 0)) {
284 dataCenters.add(h.datacenter);
285 }
286 });
287
288 return Array.from(dataCenters);
289 }
290
291 /**
292 * Tries to obtain the application name and version from
293 * @param {DseClientOptions} options
294 * @returns {Promise}
295 * @private
296 */
297 async _getAppInfo(options) {
298 if (typeof options.applicationName === 'string') {
299 return Promise.resolve({
300 applicationName: options.applicationName,
301 applicationVersion: options.applicationVersion,
302 applicationNameWasGenerated: false
303 });
304 }
305
306 let readPromise = Promise.resolve();
307
308 if (require.main && require.main.filename) {
309 const packageInfoPath = path.dirname(require.main.filename);
310 readPromise = this._readPackageInfoFile(packageInfoPath);
311 }
312
313 const text = await readPromise;
314 let applicationName = 'Default Node.js Application';
315 let applicationVersion;
316
317 if (text) {
318 try {
319 const packageInfo = JSON.parse(text);
320 if (packageInfo.name) {
321 applicationName = packageInfo.name;
322 applicationVersion = packageInfo.version;
323 }
324 }
325 catch (err) {
326 // The package.json file could not be parsed
327 // Use the default name
328 }
329 }
330
331 return {
332 applicationName,
333 applicationVersion,
334 applicationNameWasGenerated: true
335 };
336 }
337
338 /**
339 * @private
340 * @returns {Promise<string>} A Promise that will never be rejected
341 */
342 _readPackageInfoFile(packageInfoPath) {
343 return new Promise(resolve => {
344 fs.readFile(path.join(packageInfoPath, 'package.json'), 'utf8', (err, data) => {
345 // Swallow error
346 resolve(data);
347 });
348 });
349 }
350
351 /**
352 * @returns {String} Returns a json string with the startup message.
353 * @private
354 */
355 _getStatusEvent() {
356 const cc = this._client.controlConnection;
357 const options = this._client.options;
358 const state = this._client.getState();
359 const connectedNodes = {};
360
361 state.getConnectedHosts().forEach(h => {
362 connectedNodes[h.address] = {
363 connections: state.getOpenConnections(h),
364 inFlightQueries: state.getInFlightQueries(h)
365 };
366 });
367
368 const message = {
369 metadata: {
370 name: 'driver.status',
371 insightMappingId: 'v1',
372 insightType: 'EVENT',
373 timestamp: Date.now(),
374 tags: { language: 'nodejs' }
375 },
376 data: {
377 clientId: options.id,
378 sessionId: this._sessionId,
379 controlConnection: cc.host ? cc.host.address : undefined,
380 connectedNodes
381 }
382 };
383
384 return JSON.stringify(message);
385 }
386
387 /**
388 * Cleans any timer used internally and sets the client as closed.
389 */
390 shutdown() {
391 if (!this._enabled) {
392 return;
393 }
394
395 this._closed = true;
396
397 if (this._firstTimeout !== null) {
398 clearTimeout(this._firstTimeout);
399 }
400
401 if (this._recurrentTimeout !== null) {
402 clearInterval(this._recurrentTimeout);
403 }
404 }
405}
406
407module.exports = InsightsClient;
408
409function mapToObject(map) {
410 const result = {};
411 map.forEach((value, key) => result[key] = value);
412 return result;
413}
414
415function getPolicyInfo(policy) {
416 if (!policy) {
417 return undefined;
418 }
419
420 const options = policy.getOptions && policy.getOptions();
421
422 return {
423 type: policy.constructor.name,
424 options: (options instanceof Map) ? mapToObject(options) : utils.emptyObject
425 };
426}
427
428function getConsistencyString(c) {
429 if (typeof c !== 'number') {
430 return undefined;
431 }
432
433 return types.consistencyToString[c];
434}
435
436function getConstructor(instance) {
437 return instance ? instance.constructor.name : undefined;
438}
439
440function getExecutionProfiles(client) {
441 const executionProfiles = {};
442
443 const defaultProfile = client.profileManager.getDefault();
444 setExecutionProfileProperties(client, executionProfiles, defaultProfile, defaultProfile);
445
446 client.profileManager.getAll()
447 .filter(p => p !== defaultProfile)
448 .forEach(profile => setExecutionProfileProperties(client, executionProfiles, profile, defaultProfile));
449
450 return executionProfiles;
451}
452
453function setExecutionProfileProperties(client, parent, profile, defaultProfile) {
454 const output = parent[profile.name] = {};
455 setExecutionProfileItem(output, profile, defaultProfile, 'readTimeout');
456 setExecutionProfileItem(output, profile, defaultProfile, 'loadBalancing', getPolicyInfo);
457 setExecutionProfileItem(output, profile, defaultProfile, 'retry', getPolicyInfo);
458 setExecutionProfileItem(output, profile, defaultProfile, 'consistency', getConsistencyString);
459 setExecutionProfileItem(output, profile, defaultProfile, 'serialConsistency', getConsistencyString);
460
461 if (profile === defaultProfile) {
462 // Speculative execution policy is included in the profiles as some drivers support
463 // different spec exec policy per profile, in this case is fixed for all profiles
464 output.speculativeExecution = getPolicyInfo(client.options.policies.speculativeExecution);
465 }
466
467 if (profile.graphOptions) {
468 output.graphOptions = {};
469 const defaultGraphOptions = defaultProfile.graphOptions || utils.emptyObject;
470 setExecutionProfileItem(output.graphOptions, profile.graphOptions, defaultGraphOptions, 'language');
471 setExecutionProfileItem(output.graphOptions, profile.graphOptions, defaultGraphOptions, 'name');
472 setExecutionProfileItem(output.graphOptions, profile.graphOptions, defaultGraphOptions, 'readConsistency',
473 getConsistencyString);
474 setExecutionProfileItem(output.graphOptions, profile.graphOptions, defaultGraphOptions, 'source');
475 setExecutionProfileItem(output.graphOptions, profile.graphOptions, defaultGraphOptions, 'writeConsistency',
476 getConsistencyString);
477
478 if (Object.keys(output.graphOptions).length === 0) {
479 // Properties that are undefined will not be included in the JSON
480 output.graphOptions = undefined;
481 }
482 }
483}
484
485function setExecutionProfileItem(output, profile, defaultProfile, prop, valueGetter) {
486 const value = profile[prop];
487 valueGetter = valueGetter || (x => x);
488
489 if ((profile === defaultProfile && value !== undefined) || value !== defaultProfile[prop]) {
490 output[prop] = valueGetter(value);
491 }
492}
\No newline at end of file