1 | var fs = require('fs'),
|
2 | net = require('net'),
|
3 | temp = require('temp'),
|
4 | cp = require('child_process'),
|
5 | util = require('util'),
|
6 | urlparse = require('url').parse,
|
7 | _ = require('underscore'),
|
8 | dgram = require('dgram'),
|
9 | qsparse = require('querystring').parse,
|
10 | http = require('http');
|
11 |
|
12 | var spawn = cp.spawn;
|
13 |
|
14 | var writeconfig = function(text,worker,cb,obj){
|
15 | temp.open({suffix: '-statsdconf.js'}, function(err, info) {
|
16 | if (err) throw err;
|
17 | fs.writeSync(info.fd, text);
|
18 | fs.close(info.fd, function(err) {
|
19 | if (err) throw err;
|
20 | worker(info.path,cb,obj);
|
21 | });
|
22 | });
|
23 | };
|
24 |
|
25 | var statsd_send = function(data,sock,host,port,cb){
|
26 | send_data = new Buffer(data);
|
27 | sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
|
28 | if (err) {
|
29 | throw err;
|
30 | }
|
31 | cb();
|
32 | });
|
33 | };
|
34 |
|
35 |
|
36 |
|
37 | var collect_for = function(server,timeout,cb){
|
38 |
|
39 | var received = new Buffer(0);
|
40 | var in_flight = 0;
|
41 | var timed_out = false;
|
42 | var collector = function(req,res){
|
43 | in_flight += 1;
|
44 | req.on('data',function(data){ received = Buffer.concat([received,data]); });
|
45 | req.on('end',function(){
|
46 | in_flight -= 1;
|
47 | if((in_flight < 1) && timed_out){
|
48 | server.removeListener('request',collector);
|
49 | cb(received);
|
50 | }
|
51 | });
|
52 | };
|
53 |
|
54 | setTimeout(function (){
|
55 | timed_out = true;
|
56 | if((in_flight < 1)) {
|
57 | server.removeListener('connection',collector);
|
58 | cb(received);
|
59 | }
|
60 | },timeout);
|
61 |
|
62 | server.on('connection',collector);
|
63 | };
|
64 |
|
65 |
|
66 |
|
67 | var script =
|
68 | "import sys\n" +
|
69 | "import pickle\n" +
|
70 | "import struct\n" +
|
71 | "import json\n" +
|
72 | "payload = open(sys.argv[1], 'rb').read()\n" +
|
73 | "pack_format = '!L'\n" +
|
74 | "header_length = struct.calcsize(pack_format)\n" +
|
75 | "payload_length, = struct.unpack(pack_format, payload[:header_length])\n" +
|
76 | "batch_length = header_length + payload_length\n" +
|
77 | "metrics = pickle.loads(payload[header_length:batch_length])\n" +
|
78 | "print(json.dumps(metrics))\n";
|
79 |
|
80 |
|
81 |
|
82 | var unpickle = function(payload, cb) {
|
83 | temp.open({suffix: '-payload.pickle'}, function(err, payload_info) {
|
84 | if (err) throw err;
|
85 |
|
86 |
|
87 | var len = fs.writeSync(payload_info.fd, payload, 0, payload.length);
|
88 | fs.close(payload_info.fd, function(err) {
|
89 | if (err) throw err;
|
90 |
|
91 | temp.open({suffix:'-unpickle.py'}, function(err, unpickle_info) {
|
92 | if (err) throw err;
|
93 |
|
94 | fs.writeSync(unpickle_info.fd, script);
|
95 | fs.close(unpickle_info.fd, function(err) {
|
96 | if (err) throw err;
|
97 |
|
98 | var cmd = 'python ' + unpickle_info.path + ' ' + payload_info.path;
|
99 | var python = cp.exec(cmd, function(err, stdout, stderr) {
|
100 | if (err) throw err;
|
101 | var metrics = JSON.parse(stdout);
|
102 |
|
103 |
|
104 |
|
105 | var hashes = _.map(metrics, function(m) {
|
106 | var data = {};
|
107 | data[m[0]] = m[1][1];
|
108 | return data;
|
109 | });
|
110 | cb(hashes);
|
111 | });
|
112 | });
|
113 | });
|
114 | });
|
115 | });
|
116 | };
|
117 |
|
118 | module.exports = {
|
119 | setUp: function (callback) {
|
120 | this.testport = 31337;
|
121 | this.myflush = 200;
|
122 | var configfile = "{graphService: \"graphite\"\n\
|
123 | , batch: 200 \n\
|
124 | , flushInterval: " + this.myflush + " \n\
|
125 | , percentThreshold: 90\n\
|
126 | , histogram: [ { metric: \"a_test_value\", bins: [1000] } ]\n\
|
127 | , port: 8125\n\
|
128 | , dumpMessages: false \n\
|
129 | , debug: false\n\
|
130 | , graphite: { legacyNamespace: false }\n\
|
131 | , graphitePicklePort: " + this.testport + "\n\
|
132 | , graphiteHost: \"127.0.0.1\"\n\
|
133 | , graphiteProtocol: \"pickle\"}";
|
134 |
|
135 | this.acceptor = net.createServer();
|
136 | this.acceptor.listen(this.testport);
|
137 | this.sock = dgram.createSocket('udp4');
|
138 |
|
139 | this.server_up = true;
|
140 | this.ok_to_die = false;
|
141 | this.exit_callback_callback = process.exit;
|
142 |
|
143 | writeconfig(configfile,function(path,cb,obj){
|
144 | obj.path = path;
|
145 | obj.server = spawn('node',['stats.js', path]);
|
146 | obj.exit_callback = function (code) {
|
147 | obj.server_up = false;
|
148 | if(!obj.ok_to_die){
|
149 | console.log('node server unexpectedly quit with code: ' + code);
|
150 | process.exit(1);
|
151 | }
|
152 | else {
|
153 | obj.exit_callback_callback();
|
154 | }
|
155 | };
|
156 | obj.server.on('exit', obj.exit_callback);
|
157 | obj.server.stderr.on('data', function (data) {
|
158 | console.log('stderr: ' + data.toString().replace(/\n$/,''));
|
159 | });
|
160 | |
161 |
|
162 |
|
163 |
|
164 |
|
165 | obj.server.stdout.on('data', function (data) {
|
166 |
|
167 | if (data.toString().match(/server is up/)) {
|
168 | cb();
|
169 | }
|
170 | });
|
171 |
|
172 | },callback,this);
|
173 | },
|
174 |
|
175 | tearDown: function (callback) {
|
176 | this.sock.close();
|
177 | this.acceptor.close();
|
178 | this.ok_to_die = true;
|
179 | if(this.server_up){
|
180 | this.exit_callback_callback = callback;
|
181 | this.server.kill();
|
182 | } else {
|
183 | callback();
|
184 | }
|
185 | },
|
186 |
|
187 | timers_are_valid: function (test) {
|
188 | test.expect(6);
|
189 |
|
190 | var testvalue = 100;
|
191 | var me = this;
|
192 | this.acceptor.once('connection',function(c){
|
193 | statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
|
194 | collect_for(me.acceptor,me.myflush*2,function(payload){
|
195 | test.ok(payload.length > 0,'should receive some data');
|
196 | unpickle(payload, function(hashes) {
|
197 | var numstat_test = function(post){
|
198 | var mykey = 'stats.statsd.numStats';
|
199 | return _.include(_.keys(post),mykey) && (post[mykey] == 5);
|
200 | };
|
201 | test.ok(_.any(hashes,numstat_test), 'stats.statsd.numStats should be 5');
|
202 |
|
203 | var testtimervalue_test = function(post){
|
204 | var mykey = 'stats.timers.a_test_value.mean_90';
|
205 | return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
|
206 | };
|
207 | var testtimerhistogramvalue_test = function(post){
|
208 | var mykey = 'stats.timers.a_test_value.histogram.bin_1000';
|
209 | return _.include(_.keys(post),mykey) && (post[mykey] == 1);
|
210 | };
|
211 | test.ok(_.any(hashes,testtimerhistogramvalue_test), 'stats.timers.a_test_value.histogram.bin_1000 should be ' + 1);
|
212 | test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean_90 should be ' + testvalue);
|
213 |
|
214 | var count_test = function(post, metric){
|
215 | var mykey = 'stats.timers.a_test_value.' + metric;
|
216 | return _.first(_.filter(_.pluck(post, mykey), function (e) { return e; }));
|
217 | };
|
218 | test.equals(count_test(hashes, 'count_ps'), 5, 'count_ps should be 5');
|
219 | test.equals(count_test(hashes, 'count'), 1, 'count should be 1');
|
220 |
|
221 | test.done();
|
222 | });
|
223 | });
|
224 | });
|
225 | });
|
226 | }
|
227 | };
|