1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 | import java.io.IOException;
|
40 | import java.net.InetAddress;
|
41 | import java.net.InetSocketAddress;
|
42 | import java.net.UnknownHostException;
|
43 | import java.nio.ByteBuffer;
|
44 | import java.nio.channels.DatagramChannel;
|
45 | import java.util.Locale;
|
46 | import java.util.Random;
|
47 | import java.util.Timer;
|
48 | import java.util.TimerTask;
|
49 |
|
50 | import org.apache.log4j.Logger;
|
51 |
|
52 | public class StatsdClient extends TimerTask {
|
53 | private ByteBuffer sendBuffer;
|
54 | private Timer flushTimer;
|
55 | private boolean multi_metrics = false;
|
56 |
|
57 | private static final Random RNG = new Random();
|
58 | private static final Logger log = Logger.getLogger(StatsdClient.class.getName());
|
59 |
|
60 | private final InetSocketAddress _address;
|
61 | private final DatagramChannel _channel;
|
62 |
|
63 | public StatsdClient(String host, int port) throws UnknownHostException, IOException {
|
64 | this(InetAddress.getByName(host), port);
|
65 | }
|
66 |
|
67 | public StatsdClient(InetAddress host, int port) throws IOException {
|
68 | _address = new InetSocketAddress(host, port);
|
69 | _channel = DatagramChannel.open();
|
70 |
|
71 | _channel.configureBlocking(false);
|
72 |
|
73 | _channel.setOption(StandardSocketOptions.SO_SNDBUF, 4096);
|
74 | setBufferSize((short) 1500);
|
75 | }
|
76 |
|
77 | protected void finalize() {
|
78 | flush();
|
79 | }
|
80 |
|
81 | public synchronized void setBufferSize(short packetBufferSize) {
|
82 | if(sendBuffer != null) {
|
83 | flush();
|
84 | }
|
85 | sendBuffer = ByteBuffer.allocate(packetBufferSize);
|
86 | }
|
87 |
|
88 | public synchronized void enableMultiMetrics(boolean enable) {
|
89 | multi_metrics = enable;
|
90 | }
|
91 |
|
92 | public synchronized boolean startFlushTimer(long period) {
|
93 | if(flushTimer == null) {
|
94 |
|
95 | if(period <= 0) { period = 2000; }
|
96 | flushTimer = new Timer();
|
97 |
|
98 |
|
99 | flushTimer.schedule((TimerTask)this, period, period);
|
100 | return true;
|
101 | }
|
102 | return false;
|
103 | }
|
104 |
|
105 | public synchronized void stopFlushTimer() {
|
106 | if(flushTimer != null) {
|
107 | flushTimer.cancel();
|
108 | flushTimer = null;
|
109 | }
|
110 | }
|
111 |
|
112 | public void run() {
|
113 | flush();
|
114 | }
|
115 |
|
116 |
|
117 | public boolean timing(String key, int value) {
|
118 | return timing(key, value, 1.0);
|
119 | }
|
120 |
|
121 | public boolean timing(String key, int value, double sampleRate) {
|
122 | return send(sampleRate, String.format(Locale.ENGLISH, "%s:%d|ms", key, value));
|
123 | }
|
124 |
|
125 | public boolean decrement(String key) {
|
126 | return increment(key, -1, 1.0);
|
127 | }
|
128 |
|
129 | public boolean decrement(String key, int magnitude) {
|
130 | return decrement(key, magnitude, 1.0);
|
131 | }
|
132 |
|
133 | public boolean decrement(String key, int magnitude, double sampleRate) {
|
134 | magnitude = magnitude < 0 ? magnitude : -magnitude;
|
135 | return increment(key, magnitude, sampleRate);
|
136 | }
|
137 |
|
138 | public boolean decrement(String... keys) {
|
139 | return increment(-1, 1.0, keys);
|
140 | }
|
141 |
|
142 | public boolean decrement(int magnitude, String... keys) {
|
143 | magnitude = magnitude < 0 ? magnitude : -magnitude;
|
144 | return increment(magnitude, 1.0, keys);
|
145 | }
|
146 |
|
147 | public boolean decrement(int magnitude, double sampleRate, String... keys) {
|
148 | magnitude = magnitude < 0 ? magnitude : -magnitude;
|
149 | return increment(magnitude, sampleRate, keys);
|
150 | }
|
151 |
|
152 | public boolean increment(String key) {
|
153 | return increment(key, 1, 1.0);
|
154 | }
|
155 |
|
156 | public boolean increment(String key, int magnitude) {
|
157 | return increment(key, magnitude, 1.0);
|
158 | }
|
159 |
|
160 | public boolean increment(String key, int magnitude, double sampleRate) {
|
161 | String stat = String.format(Locale.ENGLISH, "%s:%s|c", key, magnitude);
|
162 | return send(sampleRate, stat);
|
163 | }
|
164 |
|
165 | public boolean increment(int magnitude, double sampleRate, String... keys) {
|
166 | String[] stats = new String[keys.length];
|
167 | for (int i = 0; i < keys.length; i++) {
|
168 | stats[i] = String.format(Locale.ENGLISH, "%s:%s|c", keys[i], magnitude);
|
169 | }
|
170 | return send(sampleRate, stats);
|
171 | }
|
172 |
|
173 | public boolean gauge(String key, double magnitude){
|
174 | return gauge(key, magnitude, 1.0);
|
175 | }
|
176 |
|
177 | public boolean gauge(String key, double magnitude, double sampleRate){
|
178 | final String stat = String.format(Locale.ENGLISH, "%s:%s|g", key, magnitude);
|
179 | return send(sampleRate, stat);
|
180 | }
|
181 |
|
182 | private boolean send(double sampleRate, String... stats) {
|
183 |
|
184 | boolean retval = false;
|
185 | if (sampleRate < 1.0) {
|
186 | for (String stat : stats) {
|
187 | if (RNG.nextDouble() <= sampleRate) {
|
188 | stat = String.format(Locale.ENGLISH, "%s|@%f", stat, sampleRate);
|
189 | if (doSend(stat)) {
|
190 | retval = true;
|
191 | }
|
192 | }
|
193 | }
|
194 | } else {
|
195 | for (String stat : stats) {
|
196 | if (doSend(stat)) {
|
197 | retval = true;
|
198 | }
|
199 | }
|
200 | }
|
201 |
|
202 | return retval;
|
203 | }
|
204 |
|
205 | private synchronized boolean doSend(String stat) {
|
206 | try {
|
207 | final byte[] data = stat.getBytes("utf-8");
|
208 |
|
209 |
|
210 |
|
211 | if(sendBuffer.remaining() < (data.length + 1)) {
|
212 | flush();
|
213 | }
|
214 |
|
215 | if(sendBuffer.position() > 0) {
|
216 | sendBuffer.put( (byte) '\n');
|
217 | }
|
218 |
|
219 | sendBuffer.put(data);
|
220 |
|
221 | if(! multi_metrics) {
|
222 | flush();
|
223 | }
|
224 |
|
225 | return true;
|
226 |
|
227 | } catch (IOException e) {
|
228 | log.error(
|
229 | String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
|
230 | _address.getPort()), e);
|
231 | return false;
|
232 | }
|
233 | }
|
234 |
|
235 | public synchronized boolean flush() {
|
236 | try {
|
237 | final int sizeOfBuffer = sendBuffer.position();
|
238 |
|
239 | if(sizeOfBuffer <= 0) { return false; }
|
240 |
|
241 |
|
242 | sendBuffer.flip();
|
243 | final int nbSentBytes = _channel.send(sendBuffer, _address);
|
244 | sendBuffer.limit(sendBuffer.capacity());
|
245 | sendBuffer.rewind();
|
246 |
|
247 | if (sizeOfBuffer == nbSentBytes) {
|
248 | return true;
|
249 | } else {
|
250 | log.error(String.format(
|
251 | "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", sendBuffer.toString(),
|
252 | _address.getHostName(), _address.getPort(), nbSentBytes, sizeOfBuffer));
|
253 | return false;
|
254 | }
|
255 |
|
256 | } catch (IOException e) {
|
257 |
|
258 | log.error(
|
259 | String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
|
260 | _address.getPort()), e);
|
261 | return false;
|
262 | }
|
263 | }
|
264 | }
|