1 |
|
2 |
|
3 | const util = require('util');
|
4 | const config = require('./lib/config');
|
5 | const helpers = require('./lib/helpers');
|
6 | const fs = require('fs');
|
7 | const events = require('events');
|
8 | const logger = require('./lib/logger');
|
9 | const set = require('./lib/set');
|
10 | const pm = require('./lib/process_metrics');
|
11 | const process_mgmt = require('./lib/process_mgmt');
|
12 | const mgmt_server = require('./lib/mgmt_server');
|
13 | const mgmt = require('./lib/mgmt_console');
|
14 |
|
15 | // initialize data structures with defaults for statsd stats
|
16 | let keyCounter = {};
|
17 | let counters = {};
|
18 | let timers = {};
|
19 | let timer_counters = {};
|
20 | let gauges = {};
|
21 | let gaugesTTL = {};
|
22 | let sets = {};
|
23 | let counter_rates = {};
|
24 | let timer_data = {};
|
25 | let pctThreshold = null;
|
26 | let flushInterval, keyFlushInt, serversLoaded, mgmtServer;
|
27 | let startup_time = Math.round(new Date().getTime() / 1000);
|
28 | let backendEvents = new events.EventEmitter();
|
29 | let healthStatus = config.healthStatus || 'up';
|
30 | let old_timestamp = 0;
|
31 | let timestamp_lag_namespace;
|
32 | let keyNameSanitize = true;
|
33 |
|
34 |
|
35 | function loadBackend(config, name) {
|
36 | const backendmod = require(name);
|
37 |
|
38 | if (config.debug) {
|
39 | l.log("Loading backend: " + name, 'DEBUG');
|
40 | }
|
41 |
|
42 | const ret = backendmod.init(startup_time, config, backendEvents, l);
|
43 | if (!ret) {
|
44 | l.log("Failed to load backend: " + name, "ERROR");
|
45 | process.exit(1);
|
46 | }
|
47 | }
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 | function startServer(config, name, callback) {
|
55 | const servermod = require(name);
|
56 |
|
57 | if (config.debug) {
|
58 | l.log("Loading server: " + name, 'DEBUG');
|
59 | }
|
60 |
|
61 | const ret = servermod.start(config, callback);
|
62 | if (!ret) {
|
63 | l.log("Failed to load server: " + name, "ERROR");
|
64 | process.exit(1);
|
65 | }
|
66 | }
|
67 |
|
68 |
|
69 | let conf;
|
70 |
|
71 |
|
72 | function flushMetrics() {
|
73 | const time_stamp = Math.round(new Date().getTime() / 1000);
|
74 | if (old_timestamp > 0) {
|
75 | gauges[timestamp_lag_namespace] = (time_stamp - old_timestamp - (Number(conf.flushInterval)/1000));
|
76 | }
|
77 | old_timestamp = time_stamp;
|
78 |
|
79 | const metrics_hash = {
|
80 | counters: counters,
|
81 | gauges: gauges,
|
82 | timers: timers,
|
83 | timer_counters: timer_counters,
|
84 | sets: sets,
|
85 | counter_rates: counter_rates,
|
86 | timer_data: timer_data,
|
87 | pctThreshold: pctThreshold,
|
88 | histogram: conf.histogram
|
89 | };
|
90 |
|
91 |
|
92 | backendEvents.once('flush', function clear_metrics(ts, metrics) {
|
93 |
|
94 |
|
95 | conf.deleteCounters = conf.deleteCounters || false;
|
96 | for (const counter_key in metrics.counters) {
|
97 | if (conf.deleteCounters) {
|
98 | if ((counter_key.indexOf("packets_received") != -1) ||
|
99 | (counter_key.indexOf("metrics_received") != -1) ||
|
100 | (counter_key.indexOf("bad_lines_seen") != -1)) {
|
101 | metrics.counters[counter_key] = 0;
|
102 | } else {
|
103 | delete(metrics.counters[counter_key]);
|
104 | }
|
105 | } else {
|
106 | metrics.counters[counter_key] = 0;
|
107 | }
|
108 | }
|
109 |
|
110 |
|
111 | conf.deleteTimers = conf.deleteTimers || false;
|
112 | for (const timer_key in metrics.timers) {
|
113 | if (conf.deleteTimers) {
|
114 | delete(metrics.timers[timer_key]);
|
115 | delete(metrics.timer_counters[timer_key]);
|
116 | } else {
|
117 | metrics.timers[timer_key] = [];
|
118 | metrics.timer_counters[timer_key] = 0;
|
119 | }
|
120 | }
|
121 |
|
122 |
|
123 | conf.deleteSets = conf.deleteSets || false;
|
124 | for (const set_key in metrics.sets) {
|
125 | if (conf.deleteSets) {
|
126 | delete(metrics.sets[set_key]);
|
127 | } else {
|
128 | metrics.sets[set_key] = new set.Set();
|
129 | }
|
130 | }
|
131 |
|
132 | // Normally gauges are not reset. so if we don't delete them, continue to persist previous value
|
133 | conf.deleteGauges = conf.deleteGauges || false;
|
134 | if (conf.deleteGauges) {
|
135 | for (const gauge_key in gaugesTTL) {
|
136 | gaugesTTL[gauge_key]--;
|
137 |
|
138 |
|
139 | if (gaugesTTL[gauge_key] < 1) {
|
140 | delete(metrics.gauges[gauge_key]);
|
141 | delete(gaugesTTL[gauge_key]);
|
142 | }
|
143 | }
|
144 | }
|
145 | });
|
146 |
|
147 | pm.process_metrics(metrics_hash, conf.calculatedTimerMetrics, flushInterval, time_stamp, function emitFlush(metrics) {
|
148 | backendEvents.emit('flush', time_stamp, metrics);
|
149 | });
|
150 |
|
151 |
|
152 |
|
153 |
|
154 | setTimeout(flushMetrics, getFlushTimeout(flushInterval));
|
155 | }
|
156 |
|
157 | const stats = {
|
158 | messages: {
|
159 | last_msg_seen: startup_time,
|
160 | bad_lines_seen: 0
|
161 | }
|
162 | };
|
163 |
|
164 | function sanitizeKeyName(key) {
|
165 | if (keyNameSanitize) {
|
166 | return key.replace(/\s+/g, '_')
|
167 | .replace(/\//g, '-')
|
168 | .replace(/[^a-zA-Z_\-0-9\.]/g, '');
|
169 | } else {
|
170 | return key;
|
171 | }
|
172 | }
|
173 |
|
174 | function getFlushTimeout(interval) {
|
175 | const now = new Date().getTime()
|
176 | const deltaTime = now - startup_time * 1000;
|
177 | const timeoutAttempt = Math.round(deltaTime / interval) + 1;
|
178 | const fixedTimeout = (startup_time * 1000 + timeoutAttempt * interval) - now;
|
179 |
|
180 | return fixedTimeout;
|
181 | }
|
182 |
|
183 |
|
184 | let l;
|
185 |
|
186 | config.configFile(process.argv[2], function (config) {
|
187 | conf = config;
|
188 |
|
189 | process_mgmt.init(config);
|
190 |
|
191 | l = new logger.Logger(config.log || {});
|
192 |
|
193 |
|
194 | if (helpers.isInteger(conf.gaugesMaxTTL) && conf.gaugesMaxTTL > 1) {
|
195 | conf.gaugesMaxTTL = conf.gaugesMaxTTL;
|
196 | } else {
|
197 | conf.gaugesMaxTTL = 1;
|
198 | }
|
199 |
|
200 |
|
201 | conf.deleteIdleStats = conf.deleteIdleStats !== undefined ? conf.deleteIdleStats : false;
|
202 | if (conf.deleteIdleStats) {
|
203 | conf.deleteCounters = conf.deleteCounters !== undefined ? conf.deleteCounters : true;
|
204 | conf.deleteTimers = conf.deleteTimers !== undefined ? conf.deleteTimers : true;
|
205 | conf.deleteSets = conf.deleteSets !== undefined ? conf.deleteSets : true;
|
206 | conf.deleteGauges = conf.deleteGauges !== undefined ? conf.deleteGauges : true;
|
207 | }
|
208 |
|
209 |
|
210 | if (! conf.deleteGauges) {
|
211 | gaugesTTL = {}
|
212 | }
|
213 |
|
214 | let prefixStats = config.prefixStats;
|
215 | prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
|
216 |
|
217 | bad_lines_seen = prefixStats + ".bad_lines_seen";
|
218 | packets_received = prefixStats + ".packets_received";
|
219 | metrics_received = prefixStats + ".metrics_received";
|
220 | timestamp_lag_namespace = prefixStats + ".timestamp_lag";
|
221 |
|
222 |
|
223 | counters[bad_lines_seen] = 0;
|
224 | counters[packets_received] = 0;
|
225 | counters[metrics_received] = 0;
|
226 |
|
227 | if (config.keyNameSanitize !== undefined) {
|
228 | keyNameSanitize = config.keyNameSanitize;
|
229 | }
|
230 | if (!serversLoaded) {
|
231 |
|
232 |
|
233 | const keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0);
|
234 |
|
235 | const handlePacket = function (msg, rinfo) {
|
236 | backendEvents.emit('packet', msg, rinfo);
|
237 | counters[packets_received]++;
|
238 | let metrics;
|
239 | const packet_data = msg.toString();
|
240 | if (packet_data.indexOf("\n") > -1) {
|
241 | metrics = packet_data.split("\n");
|
242 | } else {
|
243 | metrics = [ packet_data ] ;
|
244 | }
|
245 |
|
246 | for (const midx in metrics) {
|
247 | if (metrics[midx].length === 0) {
|
248 | continue;
|
249 | }
|
250 |
|
251 | counters[metrics_received]++;
|
252 | if (config.dumpMessages) {
|
253 | l.log(metrics[midx].toString());
|
254 | }
|
255 | const bits = metrics[midx].toString().split(':');
|
256 | const key = sanitizeKeyName(bits.shift());
|
257 |
|
258 | if (keyFlushInterval > 0) {
|
259 | if (! keyCounter[key]) {
|
260 | keyCounter[key] = 0;
|
261 | }
|
262 | keyCounter[key] += 1;
|
263 | }
|
264 |
|
265 | if (bits.length === 0) {
|
266 | bits.push("1");
|
267 | }
|
268 |
|
269 | for (let i = 0; i < bits.length; i++) {
|
270 | let sampleRate = 1;
|
271 | const fields = bits[i].split("|");
|
272 | if (!helpers.is_valid_packet(fields)) {
|
273 | l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
|
274 | counters[bad_lines_seen]++;
|
275 | stats.messages.bad_lines_seen++;
|
276 | continue;
|
277 | }
|
278 | if (fields[2]) {
|
279 | sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
|
280 | }
|
281 |
|
282 | const metric_type = fields[1].trim();
|
283 | if (metric_type === "ms") {
|
284 | if (! timers[key]) {
|
285 | timers[key] = [];
|
286 | timer_counters[key] = 0;
|
287 | }
|
288 | timers[key].push(Number(fields[0] || 0));
|
289 | timer_counters[key] += (1 / sampleRate);
|
290 | } else if (metric_type === "g") {
|
291 |
|
292 | if (conf.deleteGauges) {
|
293 | gaugesTTL[key] = conf.gaugesMaxTTL;
|
294 | }
|
295 | if (gauges[key] && fields[0].match(/^[-+]/)) {
|
296 | gauges[key] += Number(fields[0] || 0);
|
297 | } else {
|
298 | gauges[key] = Number(fields[0] || 0);
|
299 | }
|
300 | } else if (metric_type === "s") {
|
301 | if (! sets[key]) {
|
302 | sets[key] = new set.Set();
|
303 | }
|
304 | sets[key].insert(fields[0] || '0');
|
305 | } else {
|
306 | if (! counters[key]) {
|
307 | counters[key] = 0;
|
308 | }
|
309 | counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
|
310 | }
|
311 | }
|
312 | }
|
313 |
|
314 | stats.messages.last_msg_seen = Math.round(new Date().getTime() / 1000);
|
315 | };
|
316 |
|
317 |
|
318 | const server_config = config.servers || [config];
|
319 | for (let i = 0; i < server_config.length; i++) {
|
320 |
|
321 | const server = server_config[i].server || './servers/udp';
|
322 | startServer(server_config[i], server, handlePacket);
|
323 | }
|
324 |
|
325 | mgmt_server.start(
|
326 | config,
|
327 | function(cmd, parameters, stream) {
|
328 | switch(cmd) {
|
329 | case "help":
|
330 | stream.write("Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, health, config, quit\n\n");
|
331 | break;
|
332 |
|
333 | case "config":
|
334 | helpers.writeConfig(config, stream);
|
335 | break;
|
336 |
|
337 | case "health":
|
338 | if (parameters.length > 0) {
|
339 | const cmdaction = parameters[0].toLowerCase();
|
340 | if (cmdaction === 'up') {
|
341 | healthStatus = 'up';
|
342 | } else if (cmdaction === 'down') {
|
343 | healthStatus = 'down';
|
344 | }
|
345 | }
|
346 | stream.write("health: " + healthStatus + "\n");
|
347 | break;
|
348 |
|
349 | case "stats":
|
350 | const now = Math.round(new Date().getTime() / 1000);
|
351 | const uptime = now - startup_time;
|
352 |
|
353 | stream.write("uptime: " + uptime + "\n");
|
354 |
|
355 | const stat_writer = function(group, metric, val) {
|
356 | let delta;
|
357 |
|
358 | if (metric.match("^last_")) {
|
359 | delta = now - val;
|
360 | }
|
361 | else {
|
362 | delta = val;
|
363 | }
|
364 |
|
365 | stream.write(group + "." + metric + ": " + delta + "\n");
|
366 | };
|
367 |
|
368 |
|
369 | for (const group in stats) {
|
370 | for (const metric in stats[group]) {
|
371 | stat_writer(group, metric, stats[group][metric]);
|
372 | }
|
373 | }
|
374 |
|
375 | backendEvents.once('status', function() {
|
376 | stream.write("END\n\n");
|
377 | });
|
378 |
|
379 |
|
380 | backendEvents.emit('status', function(err, name, stat, val) {
|
381 | if (err) {
|
382 | l.log("Failed to read stats for backend " +
|
383 | name + ": " + err);
|
384 | } else {
|
385 | stat_writer(name, stat, val);
|
386 | }
|
387 | });
|
388 |
|
389 | break;
|
390 |
|
391 | case "counters":
|
392 | stream.write(util.inspect(counters) + "\n");
|
393 | stream.write("END\n\n");
|
394 | break;
|
395 |
|
396 | case "timers":
|
397 | stream.write(util.inspect(timers) + "\n");
|
398 | stream.write("END\n\n");
|
399 | break;
|
400 |
|
401 | case "gauges":
|
402 | stream.write(util.inspect(gauges) + "\n");
|
403 | stream.write("END\n\n");
|
404 | break;
|
405 |
|
406 | case "delcounters":
|
407 | mgmt.delete_stats(counters, parameters, stream);
|
408 | break;
|
409 |
|
410 | case "deltimers":
|
411 | mgmt.delete_stats(timers, parameters, stream);
|
412 | break;
|
413 |
|
414 | case "delgauges":
|
415 | mgmt.delete_stats(gauges, parameters, stream);
|
416 | break;
|
417 |
|
418 | case "quit":
|
419 | stream.end();
|
420 | break;
|
421 |
|
422 | default:
|
423 | stream.write("ERROR\n");
|
424 | break;
|
425 | }
|
426 | },
|
427 | function(err, stream) {
|
428 | l.log('MGMT: Caught ' + err +', Moving on', 'WARNING');
|
429 | }
|
430 | );
|
431 |
|
432 | serversLoaded = true;
|
433 | util.log("server is up", "INFO");
|
434 |
|
435 | pctThreshold = config.percentThreshold || 90;
|
436 | if (!Array.isArray(pctThreshold)) {
|
437 | pctThreshold = [ pctThreshold ];
|
438 | }
|
439 |
|
440 | flushInterval = Number(config.flushInterval || 10000);
|
441 | config.flushInterval = flushInterval;
|
442 |
|
443 | if (config.backends) {
|
444 | for (let j = 0; j < config.backends.length; j++) {
|
445 | loadBackend(config, config.backends[j]);
|
446 | }
|
447 | } else {
|
448 |
|
449 | loadBackend(config, './backends/graphite');
|
450 | }
|
451 |
|
452 |
|
453 | const flushInt = setTimeout(flushMetrics, getFlushTimeout(flushInterval));
|
454 |
|
455 | if (keyFlushInterval > 0) {
|
456 | const keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
|
457 | const keyFlushLog = config.keyFlush && config.keyFlush.log;
|
458 |
|
459 | keyFlushInt = setInterval(function () {
|
460 | const sortedKeys = [];
|
461 |
|
462 | for (const key in keyCounter) {
|
463 | sortedKeys.push([key, keyCounter[key]]);
|
464 | }
|
465 |
|
466 | sortedKeys.sort(function(a, b) { return b[1] - a[1]; });
|
467 |
|
468 | let logMessage = "";
|
469 | const timeString = (new Date()) + "";
|
470 |
|
471 |
|
472 | for (let i = 0, e = sortedKeys.length * (keyFlushPercent / 100); i < e; i++) {
|
473 | logMessage += timeString + " count=" + sortedKeys[i][1] + " key=" + sortedKeys[i][0] + "\n";
|
474 | }
|
475 |
|
476 | if (keyFlushLog) {
|
477 | const logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
|
478 | logFile.write(logMessage);
|
479 | logFile.end();
|
480 | } else {
|
481 | process.stdout.write(logMessage);
|
482 | }
|
483 |
|
484 |
|
485 | keyCounter = {};
|
486 | }, keyFlushInterval);
|
487 | }
|
488 | }
|
489 | });
|
490 |
|
491 | process.on('exit', function () {
|
492 | flushMetrics();
|
493 | });
|