UNPKG

16.6 kBJavaScriptView Raw
1const util = require('util'),
2 helpers = require('./helpers'),
3 applyStatsFns = require('./statsFunctions');
4
5const { PROTOCOL } = require('./constants');
6const createTransport = require('./transport');
7
8const UDS_DEFAULT_GRACEFUL_RESTART_LIMIT = 1000;
9const CACHE_DNS_TTL_DEFAULT = 60000;
10
11/**
12 * The Client for StatsD. The main entry-point for hot-shots. Note adding new parameters
13 * to the constructor is deprecated- please use the constructor as one options object.
14 * @constructor
15 */
16const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock,
17 globalTags, maxBufferSize, bufferFlushInterval, telegraf, sampleRate, protocol) {
18 let options = host || {};
19
20 // Adding options below is DEPRECATED. Use the options object instead.
21 if (arguments.length > 1 || typeof(host) === 'string') {
22 options = {
23 host : host,
24 port : port,
25 prefix : prefix,
26 suffix : suffix,
27 globalize : globalize,
28 cacheDns : cacheDns,
29 mock : mock === true,
30 globalTags : globalTags,
31 maxBufferSize : maxBufferSize,
32 bufferFlushInterval: bufferFlushInterval,
33 telegraf : telegraf,
34 sampleRate : sampleRate,
35 protocol : protocol
36 };
37 }
38
39
40 // hidden global_tags option for backwards compatibility
41 options.globalTags = options.globalTags || options.global_tags;
42
43 this.protocol = (options.protocol && options.protocol.toLowerCase());
44 if (! this.protocol) {
45 this.protocol = PROTOCOL.UDP;
46 }
47 this.cacheDns = options.cacheDns === true;
48 this.cacheDnsTtl = options.cacheDnsTtl || CACHE_DNS_TTL_DEFAULT;
49 this.host = options.host || process.env.DD_AGENT_HOST || 'localhost';
50 this.port = options.port || parseInt(process.env.DD_DOGSTATSD_PORT, 10) || 8125;
51 this.prefix = options.prefix || '';
52 this.suffix = options.suffix || '';
53 this.tagPrefix = options.tagPrefix || '#';
54 this.tagSeparator = options.tagSeparator || ',';
55 this.mock = options.mock;
56 this.globalTags = typeof options.globalTags === 'object' ?
57 helpers.formatTags(options.globalTags, options.telegraf) : [];
58 if (process.env.DD_ENTITY_ID) {
59 this.globalTags = this.globalTags.filter((item) => {
60 return item.indexOf('dd.internal.entity_id:') !== 0;
61 });
62 this.globalTags.push('dd.internal.entity_id:'.concat(helpers.sanitizeTags(process.env.DD_ENTITY_ID)));
63 }
64 this.telegraf = options.telegraf || false;
65 this.maxBufferSize = options.maxBufferSize || 0;
66 this.sampleRate = options.sampleRate || 1;
67 this.bufferFlushInterval = options.bufferFlushInterval || 1000;
68 this.bufferHolder = options.isChild ? options.bufferHolder : { buffer: '' };
69 this.errorHandler = options.errorHandler;
70 this.udsGracefulErrorHandling = 'udsGracefulErrorHandling' in options ? options.udsGracefulErrorHandling : true;
71
72 // If we're mocking the client, create a buffer to record the outgoing calls.
73 if (this.mock) {
74 this.mockBuffer = [];
75 }
76
77 // We only want a single flush event per parent and all its child clients
78 if (!options.isChild && this.maxBufferSize > 0) {
79 this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval);
80 // do not block node from shutting down
81 this.intervalHandle.unref();
82 }
83
84 if (options.isChild) {
85 if (options.dnsError) {
86 this.dnsError = options.dnsError;
87 }
88 this.socket = options.socket;
89 } else if (options.useDefaultRoute) {
90 const defaultRoute = helpers.getDefaultRoute();
91 if (defaultRoute) {
92 console.log(`Got ${defaultRoute} for the system's default route`);
93 this.host = defaultRoute;
94 }
95 }
96
97 if (! this.socket) {
98 this.socket = createTransport(this, {
99 host: this.host,
100 cacheDns: this.cacheDns,
101 cacheDnsTtl: this.cacheDnsTtl,
102 path: options.path,
103 port: this.port,
104 protocol: this.protocol,
105 stream: options.stream
106 });
107 }
108
109 if (!options.isChild && options.errorHandler) {
110 this.socket.on('error', options.errorHandler);
111 }
112
113 if (options.globalize) {
114 global.statsd = this;
115 }
116
117 // only for uds (options.protocol uds)
118 // enabled with the extra flag options.udsGracefulErrorHandling
119 // will gracefully (attempt) to re-open the socket with a small delay
120 // options.udsGracefulRestartRateLimit is the minimum time (ms) between creating sockets
121 // does not support options.isChild (how to re-create a socket you didn't create?)
122 if (!options.isChild && options.protocol === PROTOCOL.UDS && options.udsGracefulErrorHandling) {
123 const socketCreateLimit = options.udsGracefulRestartRateLimit || UDS_DEFAULT_GRACEFUL_RESTART_LIMIT; // only recreate once per second
124 const lastSocketCreateTime = Date.now();
125 this.socket.on('error', (err) => {
126 const code = err.code;
127 switch (code) {
128 case 107:
129 case 111: {
130 if (Date.now() - lastSocketCreateTime >= socketCreateLimit) {
131 // recreate the socket, but only once per 30 seconds
132 if (this.errorHandler) {
133 this.socket.removeListener('error', this.errorHandler);
134 }
135 this.socket.close();
136 this.socket = createTransport(this, {
137 host: this.host,
138 path: options.path,
139 port: this.port,
140 protocol: this.protocol
141 });
142
143 if (this.errorHandler) {
144 this.socket.on('error', this.errorHandler);
145 } else {
146 this.socket.on('error', error => console.error(`hot-shots UDS error: ${error}`));
147 }
148 }
149 break;
150 }
151 default: {
152 break;
153 }
154 }
155 });
156 }
157
158
159 this.messagesInFlight = 0;
160 this.CHECKS = {
161 OK: 0,
162 WARNING: 1,
163 CRITICAL: 2,
164 UNKNOWN: 3,
165 };
166};
167
168applyStatsFns(Client);
169
170/**
171 * Checks if stats is an array and sends all stats calling back once all have sent
172 * @param stat {String|Array} The stat(s) to send
173 * @param value The value to send
174 * @param type The type of the metric
175 * @param sampleRate {Number=} The Number of times to sample (0 to 1). Optional.
176 * @param tags {Array=} The Array of tags to add to metrics. Optional.
177 * @param callback {Function=} Callback when message is done being delivered. Optional.
178 */
179Client.prototype.sendAll = function (stat, value, type, sampleRate, tags, callback) {
180 let completed = 0;
181 let calledback = false;
182 let sentBytes = 0;
183 const self = this;
184
185 if (sampleRate && typeof sampleRate !== 'number') {
186 callback = tags;
187 tags = sampleRate;
188 sampleRate = undefined;
189 }
190
191 if (tags && typeof tags !== 'object') {
192 callback = tags;
193 tags = undefined;
194 }
195
196 /**
197 * Gets called once for each callback, when all callbacks return we will
198 * call back from the function
199 * @private
200 */
201 function onSend(error, bytes) {
202 completed += 1;
203 if (calledback) {
204 return;
205 }
206
207 if (error) {
208 if (typeof callback === 'function') {
209 calledback = true;
210 callback(error);
211 } else if (self.errorHandler) {
212 calledback = true;
213 self.errorHandler(error);
214 }
215 return;
216 }
217
218 if (bytes) {
219 sentBytes += bytes;
220 }
221
222 if (completed === stat.length && typeof callback === 'function') {
223 callback(null, sentBytes);
224 }
225 }
226
227 if (Array.isArray(stat)) {
228 stat.forEach(item => {
229 self.sendStat(item, value, type, sampleRate, tags, onSend);
230 });
231 } else {
232 this.sendStat(stat, value, type, sampleRate, tags, callback);
233 }
234};
235
236/**
237 * Sends a stat across the wire
238 * @param stat {String|Array} The stat(s) to send
239 * @param value The value to send
240 * @param type {String} The type of message to send to statsd
241 * @param sampleRate {Number} The Number of times to sample (0 to 1)
242 * @param tags {Array} The Array of tags to add to metrics
243 * @param callback {Function=} Callback when message is done being delivered. Optional.
244 */
245Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callback) {
246 let message = `${this.prefix + stat + this.suffix}:${value}|${type}`;
247 sampleRate = sampleRate || this.sampleRate;
248 if (sampleRate && sampleRate < 1) {
249 if (Math.random() < sampleRate) {
250 message += `|@${sampleRate}`;
251 } else {
252 // don't want to send if we don't meet the sample ratio
253 return callback ? callback() : undefined;
254 }
255 }
256 this.send(message, tags, callback);
257};
258
259/**
260 * Send a stat or event across the wire
261 * @param message {String} The constructed message without tags
262 * @param tags {Array} The tags to include (along with global tags). Optional.
263 * @param callback {Function=} Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
264 */
265Client.prototype.send = function (message, tags, callback) {
266 let mergedTags = this.globalTags;
267 if (tags && typeof tags === 'object') {
268 mergedTags = helpers.overrideTags(mergedTags, tags, this.telegraf);
269 }
270 if (mergedTags.length > 0) {
271 if (this.telegraf) {
272 message = message.split(':');
273 message = `${message[0]},${mergedTags.join(',').replace(/:/g, '=')}:${message.slice(1).join(':')}`;
274 } else {
275 message += `|${this.tagPrefix}${mergedTags.join(this.tagSeparator)}`;
276 }
277 }
278
279 this._send(message, callback);
280};
281
282/**
283 * Send a stat or event across the wire
284 * @param message {String} The constructed message without tags
285 * @param callback {Function=} Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
286 */
287Client.prototype._send = function (message, callback) {
288 // we may have a cached error rather than a cached lookup, so
289 // throw it on
290 if (this.dnsError) {
291 if (callback) {
292 return callback(this.dnsError);
293 } else if (this.errorHandler) {
294 return this.errorHandler(this.dnsError);
295 }
296 throw this.dnsError;
297 }
298
299 // Only send this stat if we're not a mock Client.
300 if (!this.mock) {
301 if (this.maxBufferSize === 0) {
302 this.sendMessage(message, callback);
303 } else {
304 this.enqueue(message, callback);
305 }
306 } else {
307 this.mockBuffer.push(message);
308 if (typeof callback === 'function') {
309 callback(null, 0);
310 }
311 }
312};
313
314/**
315 * Add the message to the buffer and flush the buffer if needed
316 *
317 * @param message {String} The constructed message without tags
318 */
319Client.prototype.enqueue = function (message, callback) {
320 message += '\n';
321
322 if (this.bufferHolder.buffer.length + message.length > this.maxBufferSize) {
323 this.flushQueue(callback);
324 this.bufferHolder.buffer += message;
325 }
326 else {
327 this.bufferHolder.buffer += message;
328 if (callback) {
329 callback(null);
330 }
331 }
332};
333
334/**
335 * Flush the buffer, sending on the messages
336 */
337Client.prototype.flushQueue = function (callback) {
338 this.sendMessage(this.bufferHolder.buffer, callback);
339 this.bufferHolder.buffer = '';
340};
341
342/**
343 * Send on the message through the socket
344 *
345 * @param message {String} The constructed message without tags
346 * @param callback {Function=} Callback when message is done being delivered. Optional.
347 */
348Client.prototype.sendMessage = function (message, callback) {
349 // don't waste the time if we aren't sending anything
350 if (message === '') {
351 if (callback) {
352 callback(null);
353 }
354 return;
355 }
356
357 if (!this.socket) {
358 const error = 'Socket not created properly. Check previous errors for details.';
359 if (callback) {
360 return callback(new Error(error));
361 } else {
362 return console.error(error);
363 }
364 }
365
366 const handleCallback = (err, bytes) => {
367 this.messagesInFlight--;
368 const errFormatted = err ? new Error(`Error sending hot-shots message: ${err}`) : null;
369 if (errFormatted) {
370 errFormatted.code = err.code;
371 }
372 if (callback) {
373 callback(errFormatted, bytes);
374 } else if (errFormatted) {
375 if (this.errorHandler) {
376 this.errorHandler(errFormatted);
377 } else {
378 console.error(String(errFormatted));
379 // emit error ourselves on the socket for backwards compatibility
380 this.socket.emit('error', errFormatted);
381 }
382 }
383 };
384
385 try {
386 this.messagesInFlight++;
387 this.socket.send(Buffer.from(message), handleCallback);
388 } catch (err) {
389 handleCallback(err);
390 }
391};
392
393/**
394 * Called every bufferFlushInterval to flush any buffer that is around
395 */
396Client.prototype.onBufferFlushInterval = function () {
397 this.flushQueue();
398};
399
400/**
401 * Close the underlying socket and stop listening for data on it.
402 */
403Client.prototype.close = function (callback) {
404 // stop trying to flush the queue on an interval
405 if (this.intervalHandle) {
406 clearInterval(this.intervalHandle);
407 }
408
409 // flush the queue one last time, if needed
410 this.flushQueue((err) => {
411 if (err) {
412 if (callback) {
413 return callback(err);
414 }
415 else {
416 return console.error(err);
417 }
418 }
419
420 // FIXME: we have entered callback hell, and this whole file is in need of an async rework
421
422 // wait until there are no more messages in flight before really closing the socket
423 let intervalAttempts = 0;
424 const waitForMessages = setInterval(() => {
425 intervalAttempts++;
426 if (intervalAttempts > 10) {
427 console.log('hot-shots could not clear out messages in flight but closing anyways');
428 this.messagesInFlight = 0;
429 }
430 if (this.messagesInFlight <= 0) {
431 clearInterval(waitForMessages);
432 this._close(callback);
433 }
434 }, 50);
435 });
436};
437
438/**
439 * Really close the socket and handle any errors related to it
440 */
441Client.prototype._close = function (callback) {
442 // error function to use in callback and catch below
443 let handledError = false;
444 const handleErr = (err) => {
445 const errMessage = `Error closing hot-shots socket: ${err}`;
446 if (handledError) {
447 console.error(errMessage);
448 }
449 else {
450 // The combination of catch and error can lead to some errors
451 // showing up twice. So we just show one of the errors that occur
452 // on close.
453 handledError = true;
454
455 if (callback) {
456 callback(new Error(errMessage));
457 } else if (this.errorHandler) {
458 this.errorHandler(new Error(errMessage));
459 } else {
460 console.error(errMessage);
461 }
462 }
463 };
464
465 if (this.errorHandler) {
466 this.socket.removeListener('error', this.errorHandler);
467 }
468
469 // handle error and close events
470 this.socket.on('error', handleErr);
471 if (callback) {
472 this.socket.on('close', err => {
473 if (! handledError && callback) {
474 callback(err);
475 }
476 });
477 }
478
479 try {
480 this.socket.close();
481 } catch (err) {
482 handleErr(err);
483 }
484};
485
486const ChildClient = function (parent, options) {
487 options = options || {};
488 Client.call(this, {
489 isChild : true,
490 socket : parent.socket, // Child inherits socket from parent. Parent itself can be a child.
491 // All children and parent share the same buffer via sharing an object (cannot mutate strings)
492 bufferHolder: parent.bufferHolder,
493 dnsError : parent.dnsError, // Child inherits an error from parent (if it is there)
494 errorHandler: options.errorHandler || parent.errorHandler, // Handler for callback errors
495 host : parent.host,
496 port : parent.port,
497 tagPrefix : parent.tagPrefix,
498 tagSeparator : parent.tagSeparator,
499 prefix : (options.prefix || '') + parent.prefix, // Child has its prefix prepended to parent's prefix
500 suffix : parent.suffix + (options.suffix || ''), // Child has its suffix appended to parent's suffix
501 globalize : false, // Only 'root' client can be global
502 mock : parent.mock,
503 // Append child's tags to parent's tags
504 globalTags : typeof options.globalTags === 'object' ?
505 helpers.overrideTags(parent.globalTags, options.globalTags, parent.telegraf) : parent.globalTags,
506 maxBufferSize : parent.maxBufferSize,
507 bufferFlushInterval: parent.bufferFlushInterval,
508 telegraf : parent.telegraf,
509 protocol : parent.protocol
510 });
511};
512util.inherits(ChildClient, Client);
513
514/**
515 * Creates a child client that adds prefix, suffix and/or tags to this client. Child client can itself have children.
516 * @param options
517 * @option prefix {String} An optional prefix to assign to each stat name sent
518 * @option suffix {String} An optional suffix to assign to each stat name sent
519 * @option globalTags {Array=} Optional tags that will be added to every metric
520 */
521Client.prototype.childClient = function (options) {
522 return new ChildClient(this, options);
523};
524
525exports = module.exports = Client;
526exports.StatsD = Client;