UNPKG

8.84 kBtext/x-java-sourceView Raw
1/**
2 * StatsdClient.java
3 *
4 * (C) 2011 Meetup, Inc.
5 * Author: Andrew Gwozdziewycz <andrew@meetup.com>, @apgwoz
6 *
7 *
8 *
9 * Example usage:
10 *
11 * StatsdClient client = new StatsdClient("statsd.example.com", 8125);
12 * // increment by 1
13 * client.increment("foo.bar.baz");
14 * // increment by 10
15 * client.increment("foo.bar.baz", 10);
16 * // sample rate
17 * client.increment("foo.bar.baz", 10, .1);
18 * // increment multiple keys by 1
19 * client.increment("foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
20 * // increment multiple keys by 10 -- yeah, it's "backwards"
21 * client.increment(10, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
22 * // multiple keys with a sample rate
23 * client.increment(10, .1, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
24 *
25 * // To enable multi metrics (aka more than 1 metric in a UDP packet) (disabled by default)
26 * client.enableMultiMetrics(true); //disable by passing in false
27 * // To fine-tune udp packet buffer size (default=1500)
28 * client.setBufferSize((short) 1500);
29 * // To force flush the buffer out (good idea to add to your shutdown path)
30 * client.flush();
31 *
32 *
33 * Note: For best results, and greater availability, you'll probably want to
34 * create a wrapper class which creates a static client and proxies to it.
35 *
36 * You know... the "Java way."
37 */
38
39import java.io.IOException;
40import java.net.InetAddress;
41import java.net.InetSocketAddress;
42import java.net.UnknownHostException;
43import java.nio.ByteBuffer;
44import java.nio.channels.DatagramChannel;
45import java.util.Locale;
46import java.util.Random;
47import java.util.Timer;
48import java.util.TimerTask;
49
50import org.apache.log4j.Logger;
51
52public class StatsdClient extends TimerTask {
53 private ByteBuffer sendBuffer;
54 private Timer flushTimer;
55 private boolean multi_metrics = false;
56
57 private static final Random RNG = new Random();
58 private static final Logger log = Logger.getLogger(StatsdClient.class.getName());
59
60 private final InetSocketAddress _address;
61 private final DatagramChannel _channel;
62
63 public StatsdClient(String host, int port) throws UnknownHostException, IOException {
64 this(InetAddress.getByName(host), port);
65 }
66
67 public StatsdClient(InetAddress host, int port) throws IOException {
68 _address = new InetSocketAddress(host, port);
69 _channel = DatagramChannel.open();
70 /* Put this in non-blocking mode so send does not block forever. */
71 _channel.configureBlocking(false);
72 /* Increase the size of the output buffer so that the size is larger than our buffer size. */
73 _channel.setOption(StandardSocketOptions.SO_SNDBUF, 4096);
74 setBufferSize((short) 1500);
75 }
76
77 protected void finalize() {
78 flush();
79 }
80
81 public synchronized void setBufferSize(short packetBufferSize) {
82 if(sendBuffer != null) {
83 flush();
84 }
85 sendBuffer = ByteBuffer.allocate(packetBufferSize);
86 }
87
88 public synchronized void enableMultiMetrics(boolean enable) {
89 multi_metrics = enable;
90 }
91
92 public synchronized boolean startFlushTimer(long period) {
93 if(flushTimer == null) {
94 // period is in msecs
95 if(period <= 0) { period = 2000; }
96 flushTimer = new Timer();
97
98 // We pass this object in as the TimerTask (which calls run())
99 flushTimer.schedule((TimerTask)this, period, period);
100 return true;
101 }
102 return false;
103 }
104
105 public synchronized void stopFlushTimer() {
106 if(flushTimer != null) {
107 flushTimer.cancel();
108 flushTimer = null;
109 }
110 }
111
112 public void run() { // used by Timer, we're a Runnable TimerTask
113 flush();
114 }
115
116
117 public boolean timing(String key, int value) {
118 return timing(key, value, 1.0);
119 }
120
121 public boolean timing(String key, int value, double sampleRate) {
122 return send(sampleRate, String.format(Locale.ENGLISH, "%s:%d|ms", key, value));
123 }
124
125 public boolean decrement(String key) {
126 return increment(key, -1, 1.0);
127 }
128
129 public boolean decrement(String key, int magnitude) {
130 return decrement(key, magnitude, 1.0);
131 }
132
133 public boolean decrement(String key, int magnitude, double sampleRate) {
134 magnitude = magnitude < 0 ? magnitude : -magnitude;
135 return increment(key, magnitude, sampleRate);
136 }
137
138 public boolean decrement(String... keys) {
139 return increment(-1, 1.0, keys);
140 }
141
142 public boolean decrement(int magnitude, String... keys) {
143 magnitude = magnitude < 0 ? magnitude : -magnitude;
144 return increment(magnitude, 1.0, keys);
145 }
146
147 public boolean decrement(int magnitude, double sampleRate, String... keys) {
148 magnitude = magnitude < 0 ? magnitude : -magnitude;
149 return increment(magnitude, sampleRate, keys);
150 }
151
152 public boolean increment(String key) {
153 return increment(key, 1, 1.0);
154 }
155
156 public boolean increment(String key, int magnitude) {
157 return increment(key, magnitude, 1.0);
158 }
159
160 public boolean increment(String key, int magnitude, double sampleRate) {
161 String stat = String.format(Locale.ENGLISH, "%s:%s|c", key, magnitude);
162 return send(sampleRate, stat);
163 }
164
165 public boolean increment(int magnitude, double sampleRate, String... keys) {
166 String[] stats = new String[keys.length];
167 for (int i = 0; i < keys.length; i++) {
168 stats[i] = String.format(Locale.ENGLISH, "%s:%s|c", keys[i], magnitude);
169 }
170 return send(sampleRate, stats);
171 }
172
173 public boolean gauge(String key, double magnitude){
174 return gauge(key, magnitude, 1.0);
175 }
176
177 public boolean gauge(String key, double magnitude, double sampleRate){
178 final String stat = String.format(Locale.ENGLISH, "%s:%s|g", key, magnitude);
179 return send(sampleRate, stat);
180 }
181
182 private boolean send(double sampleRate, String... stats) {
183
184 boolean retval = false; // didn't send anything
185 if (sampleRate < 1.0) {
186 for (String stat : stats) {
187 if (RNG.nextDouble() <= sampleRate) {
188 stat = String.format(Locale.ENGLISH, "%s|@%f", stat, sampleRate);
189 if (doSend(stat)) {
190 retval = true;
191 }
192 }
193 }
194 } else {
195 for (String stat : stats) {
196 if (doSend(stat)) {
197 retval = true;
198 }
199 }
200 }
201
202 return retval;
203 }
204
205 private synchronized boolean doSend(String stat) {
206 try {
207 final byte[] data = stat.getBytes("utf-8");
208
209 // If we're going to go past the threshold of the buffer then flush.
210 // the +1 is for the potential '\n' in multi_metrics below
211 if(sendBuffer.remaining() < (data.length + 1)) {
212 flush();
213 }
214
215 if(sendBuffer.position() > 0) { // multiple metrics are separated by '\n'
216 sendBuffer.put( (byte) '\n');
217 }
218
219 sendBuffer.put(data); // append the data
220
221 if(! multi_metrics) {
222 flush();
223 }
224
225 return true;
226
227 } catch (IOException e) {
228 log.error(
229 String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
230 _address.getPort()), e);
231 return false;
232 }
233 }
234
235 public synchronized boolean flush() {
236 try {
237 final int sizeOfBuffer = sendBuffer.position();
238
239 if(sizeOfBuffer <= 0) { return false; } // empty buffer
240
241 // send and reset the buffer
242 sendBuffer.flip();
243 final int nbSentBytes = _channel.send(sendBuffer, _address);
244 sendBuffer.limit(sendBuffer.capacity());
245 sendBuffer.rewind();
246
247 if (sizeOfBuffer == nbSentBytes) {
248 return true;
249 } else {
250 log.error(String.format(
251 "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", sendBuffer.toString(),
252 _address.getHostName(), _address.getPort(), nbSentBytes, sizeOfBuffer));
253 return false;
254 }
255
256 } catch (IOException e) {
257 /* This would be a good place to close the channel down and recreate it. */
258 log.error(
259 String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
260 _address.getPort()), e);
261 return false;
262 }
263 }
264}