UNPKG

15.3 kBJavaScriptView Raw
1/*jshint node:true, laxcomma:true */
2
3const util = require('util');
4const config = require('./lib/config');
5const helpers = require('./lib/helpers');
6const fs = require('fs');
7const events = require('events');
8const logger = require('./lib/logger');
9const set = require('./lib/set');
10const pm = require('./lib/process_metrics');
11const process_mgmt = require('./lib/process_mgmt');
12const mgmt_server = require('./lib/mgmt_server');
13const mgmt = require('./lib/mgmt_console');
14
15// initialize data structures with defaults for statsd stats
16let keyCounter = {};
17let counters = {};
18let timers = {};
19let timer_counters = {};
20let gauges = {};
21let gaugesTTL = {};
22let sets = {};
23let counter_rates = {};
24let timer_data = {};
25let pctThreshold = null;
26let flushInterval, keyFlushInt, serversLoaded, mgmtServer;
27let startup_time = Math.round(new Date().getTime() / 1000);
28let backendEvents = new events.EventEmitter();
29let healthStatus = config.healthStatus || 'up';
30let old_timestamp = 0;
31let timestamp_lag_namespace;
32let keyNameSanitize = true;
33
34// Load and init the backend from the backends/ directory.
35function loadBackend(config, name) {
36 const backendmod = require(name);
37
38 if (config.debug) {
39 l.log("Loading backend: " + name, 'DEBUG');
40 }
41
42 const ret = backendmod.init(startup_time, config, backendEvents, l);
43 if (!ret) {
44 l.log("Failed to load backend: " + name, "ERROR");
45 process.exit(1);
46 }
47}
48
49// Load and init the server from the servers/ directory.
50// The callback mimics the dgram 'message' event parameters (msg, rinfo)
51// msg: the message received by the server. may contain more than one metric
52// rinfo: contains remote address information and message length
53// (attributes are .address, .port, .family, .size - you're welcome)
54function startServer(config, name, callback) {
55 const servermod = require(name);
56
57 if (config.debug) {
58 l.log("Loading server: " + name, 'DEBUG');
59 }
60
61 const ret = servermod.start(config, callback);
62 if (!ret) {
63 l.log("Failed to load server: " + name, "ERROR");
64 process.exit(1);
65 }
66}
67
68// global for conf
69let conf;
70
71// Flush metrics to each backend.
72function flushMetrics() {
73 const time_stamp = Math.round(new Date().getTime() / 1000);
74 if (old_timestamp > 0) {
75 gauges[timestamp_lag_namespace] = (time_stamp - old_timestamp - (Number(conf.flushInterval)/1000));
76 }
77 old_timestamp = time_stamp;
78
79 const metrics_hash = {
80 counters: counters,
81 gauges: gauges,
82 timers: timers,
83 timer_counters: timer_counters,
84 sets: sets,
85 counter_rates: counter_rates,
86 timer_data: timer_data,
87 pctThreshold: pctThreshold,
88 histogram: conf.histogram
89 };
90
91 // After all listeners, reset the stats
92 backendEvents.once('flush', function clear_metrics(ts, metrics) {
93
94 // Clear the counters
95 conf.deleteCounters = conf.deleteCounters || false;
96 for (const counter_key in metrics.counters) {
97 if (conf.deleteCounters) {
98 if ((counter_key.indexOf("packets_received") != -1) ||
99 (counter_key.indexOf("metrics_received") != -1) ||
100 (counter_key.indexOf("bad_lines_seen") != -1)) {
101 metrics.counters[counter_key] = 0;
102 } else {
103 delete(metrics.counters[counter_key]);
104 }
105 } else {
106 metrics.counters[counter_key] = 0;
107 }
108 }
109
110 // Clear the timers
111 conf.deleteTimers = conf.deleteTimers || false;
112 for (const timer_key in metrics.timers) {
113 if (conf.deleteTimers) {
114 delete(metrics.timers[timer_key]);
115 delete(metrics.timer_counters[timer_key]);
116 } else {
117 metrics.timers[timer_key] = [];
118 metrics.timer_counters[timer_key] = 0;
119 }
120 }
121
122 // Clear the sets
123 conf.deleteSets = conf.deleteSets || false;
124 for (const set_key in metrics.sets) {
125 if (conf.deleteSets) {
126 delete(metrics.sets[set_key]);
127 } else {
128 metrics.sets[set_key] = new set.Set();
129 }
130 }
131
132 // Normally gauges are not reset. so if we don't delete them, continue to persist previous value
133 conf.deleteGauges = conf.deleteGauges || false;
134 if (conf.deleteGauges) {
135 for (const gauge_key in gaugesTTL) {
136 gaugesTTL[gauge_key]--;
137
138 // if the gauge has been idle for more than the allowed TTL cycles delete it
139 if (gaugesTTL[gauge_key] < 1) {
140 delete(metrics.gauges[gauge_key]);
141 delete(gaugesTTL[gauge_key]);
142 }
143 }
144 }
145 });
146
147 pm.process_metrics(metrics_hash, conf.calculatedTimerMetrics, flushInterval, time_stamp, function emitFlush(metrics) {
148 backendEvents.emit('flush', time_stamp, metrics);
149 });
150
151 // Performing this setTimeout at the end of this method rather than the beginning
152 // helps ensure we adapt to negative clock skew by letting the method's latency
153 // introduce a short delay that should more than compensate.
154 setTimeout(flushMetrics, getFlushTimeout(flushInterval));
155}
156
157const stats = {
158 messages: {
159 last_msg_seen: startup_time,
160 bad_lines_seen: 0
161 }
162};
163
164function sanitizeKeyName(key) {
165 if (keyNameSanitize) {
166 return key.replace(/\s+/g, '_')
167 .replace(/\//g, '-')
168 .replace(/[^a-zA-Z_\-0-9\.]/g, '');
169 } else {
170 return key;
171 }
172}
173
174function getFlushTimeout(interval) {
175 const now = new Date().getTime()
176 const deltaTime = now - startup_time * 1000;
177 const timeoutAttempt = Math.round(deltaTime / interval) + 1;
178 const fixedTimeout = (startup_time * 1000 + timeoutAttempt * interval) - now;
179
180 return fixedTimeout;
181}
182
183// Global for the logger
184let l;
185
186config.configFile(process.argv[2], function (config) {
187 conf = config;
188
189 process_mgmt.init(config);
190
191 l = new logger.Logger(config.log || {});
192
193 // force conf.gaugesMaxTTL to 1 if it not a positive integer > 1
194 if (helpers.isInteger(conf.gaugesMaxTTL) && conf.gaugesMaxTTL > 1) {
195 conf.gaugesMaxTTL = conf.gaugesMaxTTL;
196 } else {
197 conf.gaugesMaxTTL = 1;
198 }
199
200 // allows us to flag all of these on with a single config but still override them individually
201 conf.deleteIdleStats = conf.deleteIdleStats !== undefined ? conf.deleteIdleStats : false;
202 if (conf.deleteIdleStats) {
203 conf.deleteCounters = conf.deleteCounters !== undefined ? conf.deleteCounters : true;
204 conf.deleteTimers = conf.deleteTimers !== undefined ? conf.deleteTimers : true;
205 conf.deleteSets = conf.deleteSets !== undefined ? conf.deleteSets : true;
206 conf.deleteGauges = conf.deleteGauges !== undefined ? conf.deleteGauges : true;
207 }
208
209 // if gauges are not being deleted, clear gaugesTTL counters to save memory
210 if (! conf.deleteGauges) {
211 gaugesTTL = {}
212 }
213 // setup config for stats prefix
214 let prefixStats = config.prefixStats;
215 prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
216 //setup the names for the stats stored in counters{}
217 bad_lines_seen = prefixStats + ".bad_lines_seen";
218 packets_received = prefixStats + ".packets_received";
219 metrics_received = prefixStats + ".metrics_received";
220 timestamp_lag_namespace = prefixStats + ".timestamp_lag";
221
222 //now set to zero so we can increment them
223 counters[bad_lines_seen] = 0;
224 counters[packets_received] = 0;
225 counters[metrics_received] = 0;
226
227 if (config.keyNameSanitize !== undefined) {
228 keyNameSanitize = config.keyNameSanitize;
229 }
230 if (!serversLoaded) {
231
232 // key counting
233 const keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0);
234
235 const handlePacket = function (msg, rinfo) {
236 backendEvents.emit('packet', msg, rinfo);
237 counters[packets_received]++;
238 let metrics;
239 const packet_data = msg.toString();
240 if (packet_data.indexOf("\n") > -1) {
241 metrics = packet_data.split("\n");
242 } else {
243 metrics = [ packet_data ] ;
244 }
245
246 for (const midx in metrics) {
247 if (metrics[midx].length === 0) {
248 continue;
249 }
250
251 counters[metrics_received]++;
252 if (config.dumpMessages) {
253 l.log(metrics[midx].toString());
254 }
255 const bits = metrics[midx].toString().split(':');
256 const key = sanitizeKeyName(bits.shift());
257
258 if (keyFlushInterval > 0) {
259 if (! keyCounter[key]) {
260 keyCounter[key] = 0;
261 }
262 keyCounter[key] += 1;
263 }
264
265 if (bits.length === 0) {
266 bits.push("1");
267 }
268
269 for (let i = 0; i < bits.length; i++) {
270 let sampleRate = 1;
271 const fields = bits[i].split("|");
272 if (!helpers.is_valid_packet(fields)) {
273 l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
274 counters[bad_lines_seen]++;
275 stats.messages.bad_lines_seen++;
276 continue;
277 }
278 if (fields[2]) {
279 sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
280 }
281
282 const metric_type = fields[1].trim();
283 if (metric_type === "ms") {
284 if (! timers[key]) {
285 timers[key] = [];
286 timer_counters[key] = 0;
287 }
288 timers[key].push(Number(fields[0] || 0));
289 timer_counters[key] += (1 / sampleRate);
290 } else if (metric_type === "g") {
291 // if deleteGauges is true reset the max TTL to its initial value
292 if (conf.deleteGauges) {
293 gaugesTTL[key] = conf.gaugesMaxTTL;
294 }
295 if (gauges[key] && fields[0].match(/^[-+]/)) {
296 gauges[key] += Number(fields[0] || 0);
297 } else {
298 gauges[key] = Number(fields[0] || 0);
299 }
300 } else if (metric_type === "s") {
301 if (! sets[key]) {
302 sets[key] = new set.Set();
303 }
304 sets[key].insert(fields[0] || '0');
305 } else {
306 if (! counters[key]) {
307 counters[key] = 0;
308 }
309 counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
310 }
311 }
312 }
313
314 stats.messages.last_msg_seen = Math.round(new Date().getTime() / 1000);
315 };
316
317 // If config.servers isn't specified, use the top-level config for backwards-compatibility
318 const server_config = config.servers || [config];
319 for (let i = 0; i < server_config.length; i++) {
320 // The default server is UDP
321 const server = server_config[i].server || './servers/udp';
322 startServer(server_config[i], server, handlePacket);
323 }
324
325 mgmt_server.start(
326 config,
327 function(cmd, parameters, stream) {
328 switch(cmd) {
329 case "help":
330 stream.write("Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, health, config, quit\n\n");
331 break;
332
333 case "config":
334 helpers.writeConfig(config, stream);
335 break;
336
337 case "health":
338 if (parameters.length > 0) {
339 const cmdaction = parameters[0].toLowerCase();
340 if (cmdaction === 'up') {
341 healthStatus = 'up';
342 } else if (cmdaction === 'down') {
343 healthStatus = 'down';
344 }
345 }
346 stream.write("health: " + healthStatus + "\n");
347 break;
348
349 case "stats":
350 const now = Math.round(new Date().getTime() / 1000);
351 const uptime = now - startup_time;
352
353 stream.write("uptime: " + uptime + "\n");
354
355 const stat_writer = function(group, metric, val) {
356 let delta;
357
358 if (metric.match("^last_")) {
359 delta = now - val;
360 }
361 else {
362 delta = val;
363 }
364
365 stream.write(group + "." + metric + ": " + delta + "\n");
366 };
367
368 // Loop through the base stats
369 for (const group in stats) {
370 for (const metric in stats[group]) {
371 stat_writer(group, metric, stats[group][metric]);
372 }
373 }
374
375 backendEvents.once('status', function() {
376 stream.write("END\n\n");
377 });
378
379 // Let each backend contribute its status
380 backendEvents.emit('status', function(err, name, stat, val) {
381 if (err) {
382 l.log("Failed to read stats for backend " +
383 name + ": " + err);
384 } else {
385 stat_writer(name, stat, val);
386 }
387 });
388
389 break;
390
391 case "counters":
392 stream.write(util.inspect(counters) + "\n");
393 stream.write("END\n\n");
394 break;
395
396 case "timers":
397 stream.write(util.inspect(timers) + "\n");
398 stream.write("END\n\n");
399 break;
400
401 case "gauges":
402 stream.write(util.inspect(gauges) + "\n");
403 stream.write("END\n\n");
404 break;
405
406 case "delcounters":
407 mgmt.delete_stats(counters, parameters, stream);
408 break;
409
410 case "deltimers":
411 mgmt.delete_stats(timers, parameters, stream);
412 break;
413
414 case "delgauges":
415 mgmt.delete_stats(gauges, parameters, stream);
416 break;
417
418 case "quit":
419 stream.end();
420 break;
421
422 default:
423 stream.write("ERROR\n");
424 break;
425 }
426 },
427 function(err, stream) {
428 l.log('MGMT: Caught ' + err +', Moving on', 'WARNING');
429 }
430 );
431
432 serversLoaded = true;
433 util.log("server is up", "INFO");
434
435 pctThreshold = config.percentThreshold || 90;
436 if (!Array.isArray(pctThreshold)) {
437 pctThreshold = [ pctThreshold ]; // listify percentiles so single values work the same
438 }
439
440 flushInterval = Number(config.flushInterval || 10000);
441 config.flushInterval = flushInterval;
442
443 if (config.backends) {
444 for (let j = 0; j < config.backends.length; j++) {
445 loadBackend(config, config.backends[j]);
446 }
447 } else {
448 // The default backend is graphite
449 loadBackend(config, './backends/graphite');
450 }
451
452 // Setup the flush timer
453 const flushInt = setTimeout(flushMetrics, getFlushTimeout(flushInterval));
454
455 if (keyFlushInterval > 0) {
456 const keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
457 const keyFlushLog = config.keyFlush && config.keyFlush.log;
458
459 keyFlushInt = setInterval(function () {
460 const sortedKeys = [];
461
462 for (const key in keyCounter) {
463 sortedKeys.push([key, keyCounter[key]]);
464 }
465
466 sortedKeys.sort(function(a, b) { return b[1] - a[1]; });
467
468 let logMessage = "";
469 const timeString = (new Date()) + "";
470
471 // only show the top "keyFlushPercent" keys
472 for (let i = 0, e = sortedKeys.length * (keyFlushPercent / 100); i < e; i++) {
473 logMessage += timeString + " count=" + sortedKeys[i][1] + " key=" + sortedKeys[i][0] + "\n";
474 }
475
476 if (keyFlushLog) {
477 const logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
478 logFile.write(logMessage);
479 logFile.end();
480 } else {
481 process.stdout.write(logMessage);
482 }
483
484 // clear the counter
485 keyCounter = {};
486 }, keyFlushInterval);
487 }
488 }
489});
490
491process.on('exit', function () {
492 flushMetrics();
493});