UNPKG

14.1 kBJavaScriptView Raw
1'use strict';
2
3var dgram = require('dgram'),
4 util = require('util'),
5 dns = require('dns'),
6 net = require('net'),
7 helpers = require('./helpers'),
8 applyStatsFns = require('./statsFunctions');
9
10/**
11 * The UDP Client for StatsD
12 * @param options
13 * @option host {String} The host to connect to default: localhost
14 * @option port {String|Integer} The port to connect to default: 8125
15 * @option prefix {String} An optional prefix to assign to each stat name sent
16 * @option suffix {String} An optional suffix to assign to each stat name sent
17 * @option globalize {boolean} An optional boolean to add 'statsd' as an object in the global namespace
18 * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
19 * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
20 * @option globalTags {Array=} Optional tags that will be added to every metric
21 * @option errorHandler {Function=} Optional function to handle errors when callback is not provided
22 * @maxBufferSize {Number} An optional value for aggregating metrics to send, mainly for performance improvement
23 * @bufferFlushInterval {Number} the time out value to flush out buffer if not
24 * @option sampleRate {Float} Global Sampling rate, default: 1 (No sampling)
25 * @option useDefaultRoute {boolean} An optional boolean to use the default route on Linux. Useful for containers
26 * @constructor
27 */
28var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock,
29 globalTags, maxBufferSize, bufferFlushInterval, telegraf, sampleRate, protocol) {
30 var options = host || {},
31 self = this;
32
33 if (arguments.length > 1 || typeof(host) === 'string') {
34 options = {
35 host : host,
36 port : port,
37 prefix : prefix,
38 suffix : suffix,
39 globalize : globalize,
40 cacheDns : cacheDns,
41 mock : mock === true,
42 globalTags : globalTags,
43 maxBufferSize : maxBufferSize,
44 bufferFlushInterval: bufferFlushInterval,
45 telegraf : telegraf,
46 sampleRate : sampleRate,
47 protocol : protocol
48 };
49 }
50
51 var createSocket = function createSocket(instance, args) {
52 var socket;
53 var errMessage;
54
55 if (args.protocol === 'tcp') {
56 try {
57 socket = net.connect(args.port, args.host);
58 socket.setKeepAlive(true);
59 } catch (e) {
60 errMessage = 'Could not establish connection to ' + args.host + ':' + args.port;
61 if (instance.errorHandler) {
62 instance.errorHandler(new Error(errMessage));
63 } else {
64 console.log(errMessage);
65 }
66 }
67 } else {
68 socket = dgram.createSocket('udp4');
69 }
70
71 return socket;
72 };
73
74 // hidden global_tags option for backwards compatibility
75 options.globalTags = options.globalTags || options.global_tags;
76
77 this.protocol = (options.protocol && options.protocol.toLowerCase());
78 this.host = options.host || 'localhost';
79 this.port = options.port || 8125;
80 this.prefix = options.prefix || '';
81 this.suffix = options.suffix || '';
82 this.socket = options.isChild ? options.socket : createSocket(this, {
83 host: this.host,
84 port: this.port,
85 protocol: this.protocol
86 });
87 this.mock = options.mock;
88 this.globalTags = typeof options.globalTags === 'object' ?
89 helpers.formatTags(options.globalTags, options.telegraf) : [];
90 this.telegraf = options.telegraf || false;
91 this.maxBufferSize = options.maxBufferSize || 0;
92 this.sampleRate = options.sampleRate || 1;
93 this.bufferFlushInterval = options.bufferFlushInterval || 1000;
94 this.bufferHolder = options.isChild ? options.bufferHolder : { buffer: '' };
95 this.errorHandler = options.errorHandler;
96
97 // If we're mocking the client, create a buffer to record the outgoing calls.
98 if (this.mock) {
99 this.mockBuffer = [];
100 }
101
102 // We only want a single flush event per parent and all its child clients
103 if (!options.isChild && this.maxBufferSize > 0) {
104 this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval);
105 }
106
107 if (options.isChild) {
108 if (options.dnsError) {
109 this.dnsError = options.dnsError;
110 }
111 } else if (options.cacheDns === true) {
112 dns.lookup(options.host, function (err, address, family) {
113 if (err === null) {
114 self.host = address;
115 } else {
116 self.dnsError = err;
117 }
118 });
119 }
120
121 if (!options.isChild && options.errorHandler) {
122 this.socket.on('error', options.errorHandler);
123 }
124
125 if (options.globalize) {
126 global.statsd = this;
127 }
128
129 if (options.useDefaultRoute) {
130 var defaultRoute = helpers.getDefaultRoute();
131 if (defaultRoute) {
132 console.log('Got ' + defaultRoute + ' for the system\'s default route');
133 this.host = defaultRoute;
134 }
135 }
136
137 this.CHECKS = {
138 OK: 0,
139 WARNING: 1,
140 CRITICAL: 2,
141 UNKNOWN: 3,
142 };
143};
144
145applyStatsFns(Client);
146
147/**
148 * Checks if stats is an array and sends all stats calling back once all have sent
149 * @param stat {String|Array} The stat(s) to send
150 * @param value The value to send
151 * @param type The type of the metric
152 * @param sampleRate {Number=} The Number of times to sample (0 to 1). Optional.
153 * @param tags {Array=} The Array of tags to add to metrics. Optional.
154 * @param callback {Function=} Callback when message is done being delivered. Optional.
155 */
156Client.prototype.sendAll = function (stat, value, type, sampleRate, tags, callback) {
157 var completed = 0,
158 calledback = false,
159 sentBytes = 0,
160 self = this;
161
162 if (sampleRate && typeof sampleRate !== 'number') {
163 callback = tags;
164 tags = sampleRate;
165 sampleRate = undefined;
166 }
167
168 if (tags && typeof tags !== 'object') {
169 callback = tags;
170 tags = undefined;
171 }
172
173 /**
174 * Gets called once for each callback, when all callbacks return we will
175 * call back from the function
176 * @private
177 */
178 function onSend(error, bytes) {
179 completed += 1;
180 if (calledback) {
181 return;
182 }
183
184 if (error) {
185 if (typeof callback === 'function') {
186 calledback = true;
187 callback(error);
188 } else if (self.errorHandler) {
189 calledback = true;
190 self.errorHandler(error);
191 }
192 return;
193 }
194
195 if (bytes) {
196 sentBytes += bytes;
197 }
198
199 if (completed === stat.length && typeof callback === 'function') {
200 callback(null, sentBytes);
201 }
202 }
203
204 if (Array.isArray(stat)) {
205 stat.forEach(function (item) {
206 self.sendStat(item, value, type, sampleRate, tags, onSend);
207 });
208 } else {
209 this.sendStat(stat, value, type, sampleRate, tags, callback);
210 }
211};
212
213/**
214 * Sends a stat across the wire
215 * @param stat {String|Array} The stat(s) to send
216 * @param value The value to send
217 * @param type {String} The type of message to send to statsd
218 * @param sampleRate {Number} The Number of times to sample (0 to 1)
219 * @param tags {Array} The Array of tags to add to metrics
220 * @param callback {Function=} Callback when message is done being delivered. Optional.
221 */
222Client.prototype.sendStat = function (stat, value, type, sampleRate, tags, callback) {
223 var message = this.prefix + stat + this.suffix + ':' + value + '|' + type;
224
225 sampleRate = sampleRate || this.sampleRate;
226 if (sampleRate && sampleRate < 1) {
227 if (Math.random() < sampleRate) {
228 message += '|@' + sampleRate;
229 } else {
230 // don't want to send if we don't meet the sample ratio
231 return callback ? callback() : undefined;
232 }
233 }
234 this.send(message, tags, callback);
235};
236
237/**
238 * Send a stat or event across the wire
239 * @param message {String} The constructed message without tags
240 * @param tags {Array} The tags to include (along with global tags). Optional.
241 * @param callback {Function=} Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
242 */
243Client.prototype.send = function (message, tags, callback) {
244 var mergedTags = this.globalTags;
245 if (tags && typeof tags === 'object') {
246 mergedTags = helpers.overrideTags(mergedTags, tags, this.telegraf);
247 }
248 if (mergedTags.length > 0) {
249 if (this.telegraf) {
250 message = message.split(':');
251 message = message[0] + ',' + mergedTags.join(',').replace(/:/g, '=') + ':' + message.slice(1).join(':');
252 } else {
253 message += '|#' + mergedTags.join(',');
254 }
255 }
256
257 this._send(message, callback);
258};
259
260/**
261 * Send a stat or event across the wire
262 * @param message {String} The constructed message without tags
263 * @param callback {Function=} Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
264 */
265Client.prototype._send = function (message, callback) {
266 // we may have a cached error rather than a cached lookup, so
267 // throw it on
268 if (this.dnsError) {
269 if (callback) {
270 return callback(this.dnsError);
271 } else if (this.errorHandler) {
272 return this.errorHandler(this.dnsError);
273 }
274 throw this.dnsError;
275 }
276
277 // Only send this stat if we're not a mock Client.
278 if (!this.mock) {
279 if (this.maxBufferSize === 0) {
280 this.sendMessage(message, callback);
281 } else {
282 this.enqueue(message, callback);
283 }
284 } else {
285 this.mockBuffer.push(message);
286 if (typeof callback === 'function') {
287 callback(null, 0);
288 }
289 }
290};
291
292/**
293 * Add the message to the buffer and flush the buffer if needed
294 *
295 * @param message {String} The constructed message without tags
296 */
297Client.prototype.enqueue = function (message, callback) {
298 message += '\n';
299
300 if (this.bufferHolder.buffer.length + message.length > this.maxBufferSize) {
301 this.flushQueue(callback);
302 this.bufferHolder.buffer += message;
303 }
304 else {
305 this.bufferHolder.buffer += message;
306 if (callback) {
307 callback(null);
308 }
309 }
310};
311
312/**
313 * Flush the buffer, sending on the messages
314 */
315Client.prototype.flushQueue = function (callback) {
316 this.sendMessage(this.bufferHolder.buffer, callback);
317 this.bufferHolder.buffer = '';
318};
319
320/**
321 * Send on the message through the socket
322 *
323 * @param message {String} The constructed message without tags
324 * @param callback {Function=} Callback when message is done being delivered. Optional.
325 */
326Client.prototype.sendMessage = function (message, callback) {
327 // Guard against 'RangeError: Offset into buffer too large' in node 0.10
328 // https://github.com/nodejs/node-v0.x-archive/issues/7884
329 if (message === '') {
330 if (callback) {
331 callback(null);
332 }
333 return;
334 }
335
336 if (this.protocol === 'tcp' && message.lastIndexOf('\n') !== message.length - 1) {
337 message += '\n';
338 }
339
340 var buf = new Buffer(message);
341 try {
342 if (this.protocol === 'tcp') {
343 this.socket.write(buf, 'ascii', callback);
344 } else {
345 this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
346 }
347 } catch (err) {
348 var errMessage = 'Error sending hot-shots message: ' + err;
349 if (callback) {
350 callback(new Error(errMessage));
351 } else if (this.errorHandler) {
352 this.errorHandler(new Error(errMessage));
353 } else {
354 console.log(errMessage);
355 }
356 }
357};
358
359/**
360 * Called every bufferFlushInterval to flush any buffer that is around
361 */
362Client.prototype.onBufferFlushInterval = function () {
363 this.flushQueue();
364};
365
366/**
367 * Close the underlying socket and stop listening for data on it.
368 */
369Client.prototype.close = function (callback) {
370 if (this.intervalHandle) {
371 clearInterval(this.intervalHandle);
372 }
373
374 this.flushQueue();
375
376 if (callback) {
377 // use the close event rather than adding a callback to close()
378 // because that API is not available in older Node versions
379 this.socket.on('close', callback);
380 }
381
382 try {
383 if (this.protocol === 'tcp') {
384 this.socket.destroy();
385 } else {
386 this.socket.close();
387 }
388 } catch (err) {
389 var errMessage = 'Error closing hot-shots socket: ' + err;
390 if (callback) {
391 callback(new Error(errMessage));
392 } else if (this.errorHandler) {
393 this.errorHandler(new Error(errMessage));
394 } else {
395 console.log(errMessage);
396 }
397 }
398};
399
400var ChildClient = function (parent, options) {
401 options = options || {};
402 Client.call(this, {
403 isChild : true,
404 socket : parent.socket, // Child inherits socket from parent. Parent itself can be a child.
405 // All children and parent share the same buffer via sharing an object (cannot mutate strings)
406 bufferHolder: parent.bufferHolder,
407 dnsError : parent.dnsError, // Child inherits an error from parent (if it is there)
408 errorHandler: options.errorHandler || parent.errorHandler, // Handler for callback errors
409 host : parent.host,
410 port : parent.port,
411 prefix : (options.prefix || '') + parent.prefix, // Child has its prefix prepended to parent's prefix
412 suffix : parent.suffix + (options.suffix || ''), // Child has its suffix appended to parent's suffix
413 globalize : false, // Only 'root' client can be global
414 mock : parent.mock,
415 // Append child's tags to parent's tags
416 globalTags : typeof options.globalTags === 'object' ?
417 helpers.overrideTags(parent.globalTags, options.globalTags, parent.telegraf) : parent.globalTags,
418 maxBufferSize : parent.maxBufferSize,
419 bufferFlushInterval: parent.bufferFlushInterval,
420 telegraf : parent.telegraf,
421 protocol : parent.protocol
422 });
423};
424util.inherits(ChildClient, Client);
425
426/**
427 * Creates a child client that adds prefix, suffix and/or tags to this client. Child client can itself have children.
428 * @param options
429 * @option prefix {String} An optional prefix to assign to each stat name sent
430 * @option suffix {String} An optional suffix to assign to each stat name sent
431 * @option globalTags {Array=} Optional tags that will be added to every metric
432 */
433Client.prototype.childClient = function (options) {
434 return new ChildClient(this, options);
435};
436
437exports = module.exports = Client;
438exports.StatsD = Client;