1 | const util = require('util'),
|
2 | helpers = require('./helpers'),
|
3 | applyStatsFns = require('./statsFunctions');
|
4 |
|
5 | const { PROTOCOL } = require('./constants');
|
6 | const createTransport = require('./transport');
|
7 |
|
8 | const UDS_DEFAULT_GRACEFUL_RESTART_LIMIT = 1000;
|
9 | const CACHE_DNS_TTL_DEFAULT = 60000;
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 | const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock,
|
17 | globalTags, maxBufferSize, bufferFlushInterval, telegraf, sampleRate, protocol) {
|
18 | let options = host || {};
|
19 |
|
20 |
|
21 | if (arguments.length > 1 || typeof(host) === 'string') {
|
22 | options = {
|
23 | host : host,
|
24 | port : port,
|
25 | prefix : prefix,
|
26 | suffix : suffix,
|
27 | globalize : globalize,
|
28 | cacheDns : cacheDns,
|
29 | mock : mock === true,
|
30 | globalTags : globalTags,
|
31 | maxBufferSize : maxBufferSize,
|
32 | bufferFlushInterval: bufferFlushInterval,
|
33 | telegraf : telegraf,
|
34 | sampleRate : sampleRate,
|
35 | protocol : protocol
|
36 | };
|
37 | }
|
38 |
|
39 |
|
40 |
|
41 | options.globalTags = options.globalTags || options.global_tags;
|
42 |
|
43 | this.protocol = (options.protocol && options.protocol.toLowerCase());
|
44 | if (! this.protocol) {
|
45 | this.protocol = PROTOCOL.UDP;
|
46 | }
|
47 | this.cacheDns = options.cacheDns === true;
|
48 | this.cacheDnsTtl = options.cacheDnsTtl || CACHE_DNS_TTL_DEFAULT;
|
49 | this.host = options.host || process.env.DD_AGENT_HOST || 'localhost';
|
50 | this.port = options.port || parseInt(process.env.DD_DOGSTATSD_PORT, 10) || 8125;
|
51 | this.prefix = options.prefix || '';
|
52 | this.suffix = options.suffix || '';
|
53 | this.tagPrefix = options.tagPrefix || '#';
|
54 | this.tagSeparator = options.tagSeparator || ',';
|
55 | this.mock = options.mock;
|
56 | this.globalTags = typeof options.globalTags === 'object' ?
|
57 | helpers.formatTags(options.globalTags, options.telegraf) : [];
|
58 | if (process.env.DD_ENTITY_ID) {
|
59 | this.globalTags = this.globalTags.filter((item) => {
|
60 | return item.indexOf('dd.internal.entity_id:') !== 0;
|
61 | });
|
62 | this.globalTags.push('dd.internal.entity_id:'.concat(helpers.sanitizeTags(process.env.DD_ENTITY_ID)));
|
63 | }
|
64 | this.telegraf = options.telegraf || false;
|
65 | this.maxBufferSize = options.maxBufferSize || 0;
|
66 | this.sampleRate = options.sampleRate || 1;
|
67 | this.bufferFlushInterval = options.bufferFlushInterval || 1000;
|
68 | this.bufferHolder = options.isChild ? options.bufferHolder : { buffer: '' };
|
69 | this.errorHandler = options.errorHandler;
|
70 | this.udsGracefulErrorHandling = 'udsGracefulErrorHandling' in options ? options.udsGracefulErrorHandling : true;
|
71 |
|
72 |
|
73 | if (this.mock) {
|
74 | this.mockBuffer = [];
|
75 | }
|
76 |
|
77 |
|
78 | if (!options.isChild && this.maxBufferSize > 0) {
|
79 | this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval);
|
80 |
|
81 | this.intervalHandle.unref();
|
82 | }
|
83 |
|
84 | if (options.isChild) {
|
85 | if (options.dnsError) {
|
86 | this.dnsError = options.dnsError;
|
87 | }
|
88 | this.socket = options.socket;
|
89 | } else if (options.useDefaultRoute) {
|
90 | const defaultRoute = helpers.getDefaultRoute();
|
91 | if (defaultRoute) {
|
92 | console.log(`Got ${defaultRoute} for the system's default route`);
|
93 | this.host = defaultRoute;
|
94 | }
|
95 | }
|
96 |
|
97 | if (! this.socket) {
|
98 | this.socket = createTransport(this, {
|
99 | host: this.host,
|
100 | cacheDns: this.cacheDns,
|
101 | cacheDnsTtl: this.cacheDnsTtl,
|
102 | path: options.path,
|
103 | port: this.port,
|
104 | protocol: this.protocol,
|
105 | stream: options.stream
|
106 | });
|
107 | }
|
108 |
|
109 | if (!options.isChild && options.errorHandler) {
|
110 | this.socket.on('error', options.errorHandler);
|
111 | }
|
112 |
|
113 | if (options.globalize) {
|
114 | global.statsd = this;
|
115 | }
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 | if (!options.isChild && options.protocol === PROTOCOL.UDS && options.udsGracefulErrorHandling) {
|
123 | const socketCreateLimit = options.udsGracefulRestartRateLimit || UDS_DEFAULT_GRACEFUL_RESTART_LIMIT;
|
124 | const lastSocketCreateTime = Date.now();
|
125 | this.socket.on('error', (err) => {
|
126 | const code = err.code;
|
127 | switch (code) {
|
128 | case 107:
|
129 | case 111: {
|
130 | if (Date.now() - lastSocketCreateTime >= socketCreateLimit) {
|
131 |
|
132 | if (this.errorHandler) {
|
133 | this.socket.removeListener('error', this.errorHandler);
|
134 | }
|
135 | this.socket.close();
|
136 | this.socket = createTransport(this, {
|
137 | host: this.host,
|
138 | path: options.path,
|
139 | port: this.port,
|
140 | protocol: this.protocol
|
141 | });
|
142 |
|
143 | if (this.errorHandler) {
|
144 | this.socket.on('error', this.errorHandler);
|
145 | } else {
|
146 | this.socket.on('error', error => console.error(`hot-shots UDS error: ${error}`));
|
147 | }
|
148 | }
|
149 | break;
|
150 | }
|
151 | default: {
|
152 | break;
|
153 | }
|
154 | }
|
155 | });
|
156 | }
|
157 |
|
158 |
|
159 | this.messagesInFlight = 0;
|
160 | this.CHECKS = {
|
161 | OK: 0,
|
162 | WARNING: 1,
|
163 | CRITICAL: 2,
|
164 | UNKNOWN: 3,
|
165 | };
|
166 | };
|
167 |
|
168 | applyStatsFns(Client);
|
169 |
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 |
|
177 |
|
178 |
|
179 | Client.prototype.sendAll = function (stat, value, type, sampleRate, tags, callback) {
|
180 | let completed = 0;
|
181 | let calledback = false;
|
182 | let sentBytes = 0;
|
183 | const self = this;
|
184 |
|
185 | if (sampleRate && typeof sampleRate !== 'number') {
|
186 | callback = tags;
|
187 | tags = sampleRate;
|
188 | sampleRate = undefined;
|
189 | }
|
190 |
|
191 | if (tags && typeof tags !== 'object') {
|
192 | callback = tags;
|
193 | tags = undefined;
|
194 | }
|
195 |
|
196 | |
197 |
|
198 |
|
199 |
|
200 |
|
201 | function onSend(error, bytes) {
|
202 | completed += 1;
|
203 | if (calledback) {
|
204 | return;
|
205 | }
|
206 |
|
207 | if (error) {
|
208 | if (typeof callback === 'function') {
|
209 | calledback = true;
|
210 | callback(error);
|
211 | } else if (self.errorHandler) {
|
212 | calledback = true;
|
213 | self.errorHandler(error);
|
214 | }
|
215 | return;
|
216 | }
|
217 |
|
218 | if (bytes) {
|
219 | sentBytes += bytes;
|
220 | }
|
221 |
|
222 | if (completed === stat.length && typeof callback === 'function') {
|
223 | callback(null, sentBytes);
|
224 | }
|
225 | }
|
226 |
|
227 | if (Array.isArray(stat)) {
|
228 | stat.forEach(item => {
|
229 | self.sendStat(item, value, type, sampleRate, tags, onSend);
|
230 | });
|
231 | } else {
|
232 | this.sendStat(stat, value, type, sampleRate, tags, callback);
|
233 | }
|
234 | };
|
235 |
|
236 |
|
237 |
|
238 |
|
239 |
|
240 |
|
241 |
|
242 |
|
243 |
|
244 |
|
245 | Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callback) {
|
246 | let message = `${this.prefix + stat + this.suffix}:${value}|${type}`;
|
247 | sampleRate = sampleRate || this.sampleRate;
|
248 | if (sampleRate && sampleRate < 1) {
|
249 | if (Math.random() < sampleRate) {
|
250 | message += `|@${sampleRate}`;
|
251 | } else {
|
252 |
|
253 | return callback ? callback() : undefined;
|
254 | }
|
255 | }
|
256 | this.send(message, tags, callback);
|
257 | };
|
258 |
|
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 | Client.prototype.send = function (message, tags, callback) {
|
266 | let mergedTags = this.globalTags;
|
267 | if (tags && typeof tags === 'object') {
|
268 | mergedTags = helpers.overrideTags(mergedTags, tags, this.telegraf);
|
269 | }
|
270 | if (mergedTags.length > 0) {
|
271 | if (this.telegraf) {
|
272 | message = message.split(':');
|
273 | message = `${message[0]},${mergedTags.join(',').replace(/:/g, '=')}:${message.slice(1).join(':')}`;
|
274 | } else {
|
275 | message += `|${this.tagPrefix}${mergedTags.join(this.tagSeparator)}`;
|
276 | }
|
277 | }
|
278 |
|
279 | this._send(message, callback);
|
280 | };
|
281 |
|
282 |
|
283 |
|
284 |
|
285 |
|
286 |
|
287 | Client.prototype._send = function (message, callback) {
|
288 |
|
289 |
|
290 | if (this.dnsError) {
|
291 | if (callback) {
|
292 | return callback(this.dnsError);
|
293 | } else if (this.errorHandler) {
|
294 | return this.errorHandler(this.dnsError);
|
295 | }
|
296 | throw this.dnsError;
|
297 | }
|
298 |
|
299 |
|
300 | if (!this.mock) {
|
301 | if (this.maxBufferSize === 0) {
|
302 | this.sendMessage(message, callback);
|
303 | } else {
|
304 | this.enqueue(message, callback);
|
305 | }
|
306 | } else {
|
307 | this.mockBuffer.push(message);
|
308 | if (typeof callback === 'function') {
|
309 | callback(null, 0);
|
310 | }
|
311 | }
|
312 | };
|
313 |
|
314 |
|
315 |
|
316 |
|
317 |
|
318 |
|
319 | Client.prototype.enqueue = function (message, callback) {
|
320 | message += '\n';
|
321 |
|
322 | if (this.bufferHolder.buffer.length + message.length > this.maxBufferSize) {
|
323 | this.flushQueue(callback);
|
324 | this.bufferHolder.buffer += message;
|
325 | }
|
326 | else {
|
327 | this.bufferHolder.buffer += message;
|
328 | if (callback) {
|
329 | callback(null);
|
330 | }
|
331 | }
|
332 | };
|
333 |
|
334 |
|
335 |
|
336 |
|
337 | Client.prototype.flushQueue = function (callback) {
|
338 | this.sendMessage(this.bufferHolder.buffer, callback);
|
339 | this.bufferHolder.buffer = '';
|
340 | };
|
341 |
|
342 |
|
343 |
|
344 |
|
345 |
|
346 |
|
347 |
|
348 | Client.prototype.sendMessage = function (message, callback) {
|
349 |
|
350 | if (message === '') {
|
351 | if (callback) {
|
352 | callback(null);
|
353 | }
|
354 | return;
|
355 | }
|
356 |
|
357 | if (!this.socket) {
|
358 | const error = 'Socket not created properly. Check previous errors for details.';
|
359 | if (callback) {
|
360 | return callback(new Error(error));
|
361 | } else {
|
362 | return console.error(error);
|
363 | }
|
364 | }
|
365 |
|
366 | const handleCallback = (err, bytes) => {
|
367 | this.messagesInFlight--;
|
368 | const errFormatted = err ? new Error(`Error sending hot-shots message: ${err}`) : null;
|
369 | if (errFormatted) {
|
370 | errFormatted.code = err.code;
|
371 | }
|
372 | if (callback) {
|
373 | callback(errFormatted, bytes);
|
374 | } else if (errFormatted) {
|
375 | if (this.errorHandler) {
|
376 | this.errorHandler(errFormatted);
|
377 | } else {
|
378 | console.error(String(errFormatted));
|
379 |
|
380 | this.socket.emit('error', errFormatted);
|
381 | }
|
382 | }
|
383 | };
|
384 |
|
385 | try {
|
386 | this.messagesInFlight++;
|
387 | this.socket.send(Buffer.from(message), handleCallback);
|
388 | } catch (err) {
|
389 | handleCallback(err);
|
390 | }
|
391 | };
|
392 |
|
393 |
|
394 |
|
395 |
|
396 | Client.prototype.onBufferFlushInterval = function () {
|
397 | this.flushQueue();
|
398 | };
|
399 |
|
400 |
|
401 |
|
402 |
|
403 | Client.prototype.close = function (callback) {
|
404 |
|
405 | if (this.intervalHandle) {
|
406 | clearInterval(this.intervalHandle);
|
407 | }
|
408 |
|
409 |
|
410 | this.flushQueue((err) => {
|
411 | if (err) {
|
412 | if (callback) {
|
413 | return callback(err);
|
414 | }
|
415 | else {
|
416 | return console.error(err);
|
417 | }
|
418 | }
|
419 |
|
420 |
|
421 |
|
422 |
|
423 | let intervalAttempts = 0;
|
424 | const waitForMessages = setInterval(() => {
|
425 | intervalAttempts++;
|
426 | if (intervalAttempts > 10) {
|
427 | console.log('hot-shots could not clear out messages in flight but closing anyways');
|
428 | this.messagesInFlight = 0;
|
429 | }
|
430 | if (this.messagesInFlight <= 0) {
|
431 | clearInterval(waitForMessages);
|
432 | this._close(callback);
|
433 | }
|
434 | }, 50);
|
435 | });
|
436 | };
|
437 |
|
438 |
|
439 |
|
440 |
|
441 | Client.prototype._close = function (callback) {
|
442 |
|
443 | let handledError = false;
|
444 | const handleErr = (err) => {
|
445 | const errMessage = `Error closing hot-shots socket: ${err}`;
|
446 | if (handledError) {
|
447 | console.error(errMessage);
|
448 | }
|
449 | else {
|
450 |
|
451 |
|
452 |
|
453 | handledError = true;
|
454 |
|
455 | if (callback) {
|
456 | callback(new Error(errMessage));
|
457 | } else if (this.errorHandler) {
|
458 | this.errorHandler(new Error(errMessage));
|
459 | } else {
|
460 | console.error(errMessage);
|
461 | }
|
462 | }
|
463 | };
|
464 |
|
465 | if (this.errorHandler) {
|
466 | this.socket.removeListener('error', this.errorHandler);
|
467 | }
|
468 |
|
469 |
|
470 | this.socket.on('error', handleErr);
|
471 | if (callback) {
|
472 | this.socket.on('close', err => {
|
473 | if (! handledError && callback) {
|
474 | callback(err);
|
475 | }
|
476 | });
|
477 | }
|
478 |
|
479 | try {
|
480 | this.socket.close();
|
481 | } catch (err) {
|
482 | handleErr(err);
|
483 | }
|
484 | };
|
485 |
|
486 | const ChildClient = function (parent, options) {
|
487 | options = options || {};
|
488 | Client.call(this, {
|
489 | isChild : true,
|
490 | socket : parent.socket,
|
491 |
|
492 | bufferHolder: parent.bufferHolder,
|
493 | dnsError : parent.dnsError,
|
494 | errorHandler: options.errorHandler || parent.errorHandler,
|
495 | host : parent.host,
|
496 | port : parent.port,
|
497 | tagPrefix : parent.tagPrefix,
|
498 | tagSeparator : parent.tagSeparator,
|
499 | prefix : (options.prefix || '') + parent.prefix,
|
500 | suffix : parent.suffix + (options.suffix || ''),
|
501 | globalize : false,
|
502 | mock : parent.mock,
|
503 |
|
504 | globalTags : typeof options.globalTags === 'object' ?
|
505 | helpers.overrideTags(parent.globalTags, options.globalTags, parent.telegraf) : parent.globalTags,
|
506 | maxBufferSize : parent.maxBufferSize,
|
507 | bufferFlushInterval: parent.bufferFlushInterval,
|
508 | telegraf : parent.telegraf,
|
509 | protocol : parent.protocol
|
510 | });
|
511 | };
|
512 | util.inherits(ChildClient, Client);
|
513 |
|
514 |
|
515 |
|
516 |
|
517 |
|
518 |
|
519 |
|
520 |
|
521 | Client.prototype.childClient = function (options) {
|
522 | return new ChildClient(this, options);
|
523 | };
|
524 |
|
525 | exports = module.exports = Client;
|
526 | exports.StatsD = Client;
|