UNPKG

7.65 kBJavaScriptView Raw
1var fs = require('fs'),
2 net = require('net'),
3 temp = require('temp'),
4 cp = require('child_process'),
5 util = require('util'),
6 urlparse = require('url').parse,
7 _ = require('underscore'),
8 dgram = require('dgram'),
9 qsparse = require('querystring').parse,
10 http = require('http');
11
12var spawn = cp.spawn;
13
14var writeconfig = function(text,worker,cb,obj){
15 temp.open({suffix: '-statsdconf.js'}, function(err, info) {
16 if (err) throw err;
17 fs.writeSync(info.fd, text);
18 fs.close(info.fd, function(err) {
19 if (err) throw err;
20 worker(info.path,cb,obj);
21 });
22 });
23};
24
25var statsd_send = function(data,sock,host,port,cb){
26 send_data = new Buffer(data);
27 sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
28 if (err) {
29 throw err;
30 }
31 cb();
32 });
33};
34
35// keep collecting data until a specified timeout period has elapsed
36// this will let us capture all data chunks so we don't miss one
37var collect_for = function(server,timeout,cb){
38 // We have binary data arriving over the wire. Avoid strings.
39 var received = new Buffer(0);
40 var in_flight = 0;
41 var timed_out = false;
42 var collector = function(req,res){
43 in_flight += 1;
44 req.on('data',function(data){ received = Buffer.concat([received,data]); });
45 req.on('end',function(){
46 in_flight -= 1;
47 if((in_flight < 1) && timed_out){
48 server.removeListener('request',collector);
49 cb(received);
50 }
51 });
52 };
53
54 setTimeout(function (){
55 timed_out = true;
56 if((in_flight < 1)) {
57 server.removeListener('connection',collector);
58 cb(received);
59 }
60 },timeout);
61
62 server.on('connection',collector);
63};
64
65// A python script that converts from the graphite pickle-based
66// wire protocol into JSON written to stdout.
67var script =
68 "import sys\n" +
69 "import pickle\n" +
70 "import struct\n" +
71 "import json\n" +
72 "payload = open(sys.argv[1], 'rb').read()\n" +
73 "pack_format = '!L'\n" +
74 "header_length = struct.calcsize(pack_format)\n" +
75 "payload_length, = struct.unpack(pack_format, payload[:header_length])\n" +
76 "batch_length = header_length + payload_length\n" +
77 "metrics = pickle.loads(payload[header_length:batch_length])\n" +
78 "print(json.dumps(metrics))\n";
79
80// Write our binary payload and unpickling script to disk
81// then process the unserialized results.
82var unpickle = function(payload, cb) {
83 temp.open({suffix: '-payload.pickle'}, function(err, payload_info) {
84 if (err) throw err;
85
86 // the header may contain null characters. explicit length is necessary.
87 var len = fs.writeSync(payload_info.fd, payload, 0, payload.length);
88 fs.close(payload_info.fd, function(err) {
89 if (err) throw err;
90
91 temp.open({suffix:'-unpickle.py'}, function(err, unpickle_info) {
92 if (err) throw err;
93
94 fs.writeSync(unpickle_info.fd, script);
95 fs.close(unpickle_info.fd, function(err) {
96 if (err) throw err;
97
98 var cmd = 'python ' + unpickle_info.path + ' ' + payload_info.path;
99 var python = cp.exec(cmd, function(err, stdout, stderr) {
100 if (err) throw err;
101 var metrics = JSON.parse(stdout);
102 // Transform the output into the same list of dictionaries
103 // used by the other graphite_* tests so our tests look
104 // the same.
105 var hashes = _.map(metrics, function(m) {
106 var data = {};
107 data[m[0]] = m[1][1];
108 return data;
109 });
110 cb(hashes);
111 });
112 });
113 });
114 });
115 });
116};
117
118module.exports = {
119 setUp: function (callback) {
120 this.testport = 31337;
121 this.myflush = 200;
122 var configfile = "{graphService: \"graphite\"\n\
123 , batch: 200 \n\
124 , flushInterval: " + this.myflush + " \n\
125 , percentThreshold: 90\n\
126 , histogram: [ { metric: \"a_test_value\", bins: [1000] } ]\n\
127 , port: 8125\n\
128 , dumpMessages: false \n\
129 , debug: false\n\
130 , graphite: { legacyNamespace: false }\n\
131 , graphitePicklePort: " + this.testport + "\n\
132 , graphiteHost: \"127.0.0.1\"\n\
133 , graphiteProtocol: \"pickle\"}";
134
135 this.acceptor = net.createServer();
136 this.acceptor.listen(this.testport);
137 this.sock = dgram.createSocket('udp4');
138
139 this.server_up = true;
140 this.ok_to_die = false;
141 this.exit_callback_callback = process.exit;
142
143 writeconfig(configfile,function(path,cb,obj){
144 obj.path = path;
145 obj.server = spawn('node',['stats.js', path]);
146 obj.exit_callback = function (code) {
147 obj.server_up = false;
148 if(!obj.ok_to_die){
149 console.log('node server unexpectedly quit with code: ' + code);
150 process.exit(1);
151 }
152 else {
153 obj.exit_callback_callback();
154 }
155 };
156 obj.server.on('exit', obj.exit_callback);
157 obj.server.stderr.on('data', function (data) {
158 console.log('stderr: ' + data.toString().replace(/\n$/,''));
159 });
160 /*
161 obj.server.stdout.on('data', function (data) {
162 console.log('stdout: ' + data.toString().replace(/\n$/,''));
163 });
164 */
165 obj.server.stdout.on('data', function (data) {
166 // wait until server is up before we finish setUp
167 if (data.toString().match(/server is up/)) {
168 cb();
169 }
170 });
171
172 },callback,this);
173 },
174
175 tearDown: function (callback) {
176 this.sock.close();
177 this.acceptor.close();
178 this.ok_to_die = true;
179 if(this.server_up){
180 this.exit_callback_callback = callback;
181 this.server.kill();
182 } else {
183 callback();
184 }
185 },
186
187 timers_are_valid: function (test) {
188 test.expect(6);
189
190 var testvalue = 100;
191 var me = this;
192 this.acceptor.once('connection',function(c){
193 statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
194 collect_for(me.acceptor,me.myflush*2,function(payload){
195 test.ok(payload.length > 0,'should receive some data');
196 unpickle(payload, function(hashes) {
197 var numstat_test = function(post){
198 var mykey = 'stats.statsd.numStats';
199 return _.include(_.keys(post),mykey) && (post[mykey] == 5);
200 };
201 test.ok(_.any(hashes,numstat_test), 'stats.statsd.numStats should be 5');
202
203 var testtimervalue_test = function(post){
204 var mykey = 'stats.timers.a_test_value.mean_90';
205 return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
206 };
207 var testtimerhistogramvalue_test = function(post){
208 var mykey = 'stats.timers.a_test_value.histogram.bin_1000';
209 return _.include(_.keys(post),mykey) && (post[mykey] == 1);
210 };
211 test.ok(_.any(hashes,testtimerhistogramvalue_test), 'stats.timers.a_test_value.histogram.bin_1000 should be ' + 1);
212 test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean_90 should be ' + testvalue);
213
214 var count_test = function(post, metric){
215 var mykey = 'stats.timers.a_test_value.' + metric;
216 return _.first(_.filter(_.pluck(post, mykey), function (e) { return e; }));
217 };
218 test.equals(count_test(hashes, 'count_ps'), 5, 'count_ps should be 5');
219 test.equals(count_test(hashes, 'count'), 1, 'count should be 1');
220
221 test.done();
222 });
223 });
224 });
225 });
226 }
227};