UNPKG

6.13 kBPlain TextView Raw
1/**
2 * StatsdClient.kt
3 *
4 *
5 * Example usage:
6 *
7 * val client = StatsdClient("statsd.example.com", 8125)
8 * // increment by 1
9 * client.increment("foo.bar.baz")
10 * // increment by 10
11 * client.increment("foo.bar.baz", magnitude = 10)
12 * // sample rate
13 * client.increment("foo.bar.baz", sampleRate = 0.1)
14 * // magnitude and sample rate
15 * client.increment("foo.bar.baz", magnitude = 10, sampleRate = 0.1)
16 * // increment multiple keys by 1
17 * client.increment("foo.bar.baz", "foo.bar.boo", "foo.baz.bar")
18 * // increment multiple keys by 10
19 * client.increment("foo.bar.baz", "foo.bar.boo", "foo.baz.bar", magnitude = 10)
20 * // multiple keys with a sample rate and magnitude
21 * client.increment("foo.bar.baz", "foo.bar.boo", "foo.baz.bar", magnitude = 10, sampleRate = 0.1)
22 */
23
24import org.apache.log4j.Logger
25import java.io.IOException
26import java.net.InetAddress
27import java.net.InetSocketAddress
28import java.net.StandardSocketOptions
29import java.net.UnknownHostException
30import java.nio.ByteBuffer
31import java.nio.channels.DatagramChannel
32import java.util.Locale
33import java.util.Random
34import java.util.Timer
35import java.util.TimerTask
36
37class StatsdClient @Throws(IOException::class)
38constructor(host: InetAddress, port: Int) : TimerTask() {
39 private var sendBuffer: ByteBuffer? = null
40 private var flushTimer: Timer? = null
41 private var multiMetrics = false
42
43 private val address: InetSocketAddress = InetSocketAddress(host, port)
44 private val channel: DatagramChannel = DatagramChannel.open()
45
46 @Throws(UnknownHostException::class, IOException::class)
47 constructor(host: String, port: Int) : this(InetAddress.getByName(host), port)
48
49 init {
50 // Put this in non-blocking mode so send does not block forever.
51 channel.configureBlocking(false)
52 // Increase the size of the output buffer so that the size is larger than our buffer size.
53 channel.setOption(StandardSocketOptions.SO_SNDBUF, 4096)
54 setBufferSize(1500)
55 }
56
57 @Synchronized
58 fun setBufferSize(packetBufferSize: Short) {
59 if (sendBuffer != null) {
60 flush()
61 }
62 sendBuffer = ByteBuffer.allocate(packetBufferSize.toInt())
63 }
64
65 @Synchronized
66 fun enableMultiMetrics(enable: Boolean) {
67 multiMetrics = enable
68 }
69
70 @Synchronized
71 fun startFlushTimer(period: Long = 2000): Boolean {
72 return if (flushTimer == null) {
73 flushTimer = Timer()
74
75 // We pass this object in as the TimerTask (which calls run())
76 flushTimer!!.schedule(this, period, period)
77 true
78 } else {
79 false
80 }
81 }
82
83 @Synchronized
84 fun stopFlushTimer() {
85 if (flushTimer != null) {
86 flushTimer!!.cancel()
87 flushTimer = null
88 }
89 }
90
91 // used by Timer, we're a Runnable TimerTask
92 override fun run() {
93 flush()
94 }
95
96 fun timing(key: String, value: Int, sampleRate: Double = 1.0): Boolean {
97 return send(sampleRate, String.format(Locale.ENGLISH, "%s:%d|ms", key, value))
98 }
99
100 fun decrement(vararg keys: String, magnitude: Int = -1, sampleRate: Double = 1.0): Boolean {
101 val stats = keys.map { String.format(Locale.ENGLISH, "%s:%s|c", it, magnitude) }.toTypedArray()
102
103 return send(sampleRate, *stats)
104 }
105
106 fun increment(vararg keys: String, magnitude: Int = 1, sampleRate: Double = 1.0): Boolean {
107 val stats = keys.map { String.format(Locale.ENGLISH, "%s:%s|c", it, magnitude) }.toTypedArray()
108
109 return send(sampleRate, *stats)
110 }
111
112 fun gauge(key: String, magnitude: Double, sampleRate: Double = 1.0): Boolean {
113 val stat = String.format(Locale.ENGLISH, "%s:%s|g", key, magnitude)
114
115 return send(sampleRate, stat)
116 }
117
118 private fun send(sampleRate: Double, vararg stats: String): Boolean {
119 return if (sampleRate < 1.0) {
120 stats.any {
121 if (RNG.nextDouble() <= sampleRate) {
122 val stat = String.format(Locale.ENGLISH, "%s|@%f", it, sampleRate)
123
124 doSend(stat)
125 } else {
126 false
127 }
128 }
129 } else {
130 stats.any { doSend(it) }
131 }
132 }
133
134 @Synchronized
135 private fun doSend(stat: String): Boolean {
136 try {
137 val data = stat.toByteArray(charset("utf-8"))
138
139 // If we're going to go past the threshold of the buffer then flush.
140 // the +1 is for the potential '\n' in multi_metrics below
141 if (sendBuffer!!.remaining() < data.size + 1) {
142 flush()
143 }
144
145 // multiple metrics are separated by '\n'
146 if (sendBuffer!!.position() > 0) {
147 sendBuffer!!.put('\n'.toByte())
148 }
149
150 sendBuffer!!.put(data)
151
152 if (!multiMetrics) {
153 flush()
154 }
155
156 return true
157 } catch (e: IOException) {
158 log.error(String.format("Could not send stat %s to host %s:%d", sendBuffer!!.toString(), address.hostName, address.port), e)
159
160 return false
161 }
162 }
163
164 @Synchronized
165 fun flush(): Boolean {
166 try {
167 val sizeOfBuffer = sendBuffer!!.position()
168
169 if (sizeOfBuffer <= 0) {
170 return false
171 } // empty buffer
172
173 // send and reset the buffer
174 sendBuffer!!.flip()
175
176 val nbSentBytes = channel.send(sendBuffer, address)
177
178 sendBuffer!!.limit(sendBuffer!!.capacity())
179 sendBuffer!!.rewind()
180
181 return if (sizeOfBuffer == nbSentBytes) {
182 true
183 } else {
184 log.error(String.format(
185 "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
186 sendBuffer!!.toString(),
187 address.hostName,
188 address.port,
189 nbSentBytes,
190 sizeOfBuffer
191 ))
192
193 false
194 }
195 } catch (e: IOException) {
196 /* This would be a good place to close the channel down and recreate it. */
197 log.error(String.format("Could not send stat %s to host %s:%d", sendBuffer!!.toString(), address.hostName, address.port), e)
198 return false
199 }
200 }
201
202 companion object {
203 private val RNG = Random()
204 private val log = Logger.getLogger(StatsdClient::class.java.name)
205 }
206}