1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 | import org.apache.log4j.Logger
|
25 | import java.io.IOException
|
26 | import java.net.InetAddress
|
27 | import java.net.InetSocketAddress
|
28 | import java.net.StandardSocketOptions
|
29 | import java.net.UnknownHostException
|
30 | import java.nio.ByteBuffer
|
31 | import java.nio.channels.DatagramChannel
|
32 | import java.util.Locale
|
33 | import java.util.Random
|
34 | import java.util.Timer
|
35 | import java.util.TimerTask
|
36 |
|
37 | class StatsdClient @Throws(IOException::class)
|
38 | constructor(host: InetAddress, port: Int) : TimerTask() {
|
39 | private var sendBuffer: ByteBuffer? = null
|
40 | private var flushTimer: Timer? = null
|
41 | private var multiMetrics = false
|
42 |
|
43 | private val address: InetSocketAddress = InetSocketAddress(host, port)
|
44 | private val channel: DatagramChannel = DatagramChannel.open()
|
45 |
|
46 | @Throws(UnknownHostException::class, IOException::class)
|
47 | constructor(host: String, port: Int) : this(InetAddress.getByName(host), port)
|
48 |
|
49 | init {
|
50 |
|
51 | channel.configureBlocking(false)
|
52 |
|
53 | channel.setOption(StandardSocketOptions.SO_SNDBUF, 4096)
|
54 | setBufferSize(1500)
|
55 | }
|
56 |
|
57 | @Synchronized
|
58 | fun setBufferSize(packetBufferSize: Short) {
|
59 | if (sendBuffer != null) {
|
60 | flush()
|
61 | }
|
62 | sendBuffer = ByteBuffer.allocate(packetBufferSize.toInt())
|
63 | }
|
64 |
|
65 | @Synchronized
|
66 | fun enableMultiMetrics(enable: Boolean) {
|
67 | multiMetrics = enable
|
68 | }
|
69 |
|
70 | @Synchronized
|
71 | fun startFlushTimer(period: Long = 2000): Boolean {
|
72 | return if (flushTimer == null) {
|
73 | flushTimer = Timer()
|
74 |
|
75 |
|
76 | flushTimer!!.schedule(this, period, period)
|
77 | true
|
78 | } else {
|
79 | false
|
80 | }
|
81 | }
|
82 |
|
83 | @Synchronized
|
84 | fun stopFlushTimer() {
|
85 | if (flushTimer != null) {
|
86 | flushTimer!!.cancel()
|
87 | flushTimer = null
|
88 | }
|
89 | }
|
90 |
|
91 |
|
92 | override fun run() {
|
93 | flush()
|
94 | }
|
95 |
|
96 | fun timing(key: String, value: Int, sampleRate: Double = 1.0): Boolean {
|
97 | return send(sampleRate, String.format(Locale.ENGLISH, "%s:%d|ms", key, value))
|
98 | }
|
99 |
|
100 | fun decrement(vararg keys: String, magnitude: Int = -1, sampleRate: Double = 1.0): Boolean {
|
101 | val stats = keys.map { String.format(Locale.ENGLISH, "%s:%s|c", it, magnitude) }.toTypedArray()
|
102 |
|
103 | return send(sampleRate, *stats)
|
104 | }
|
105 |
|
106 | fun increment(vararg keys: String, magnitude: Int = 1, sampleRate: Double = 1.0): Boolean {
|
107 | val stats = keys.map { String.format(Locale.ENGLISH, "%s:%s|c", it, magnitude) }.toTypedArray()
|
108 |
|
109 | return send(sampleRate, *stats)
|
110 | }
|
111 |
|
112 | fun gauge(key: String, magnitude: Double, sampleRate: Double = 1.0): Boolean {
|
113 | val stat = String.format(Locale.ENGLISH, "%s:%s|g", key, magnitude)
|
114 |
|
115 | return send(sampleRate, stat)
|
116 | }
|
117 |
|
118 | private fun send(sampleRate: Double, vararg stats: String): Boolean {
|
119 | return if (sampleRate < 1.0) {
|
120 | stats.any {
|
121 | if (RNG.nextDouble() <= sampleRate) {
|
122 | val stat = String.format(Locale.ENGLISH, "%s|@%f", it, sampleRate)
|
123 |
|
124 | doSend(stat)
|
125 | } else {
|
126 | false
|
127 | }
|
128 | }
|
129 | } else {
|
130 | stats.any { doSend(it) }
|
131 | }
|
132 | }
|
133 |
|
134 | @Synchronized
|
135 | private fun doSend(stat: String): Boolean {
|
136 | try {
|
137 | val data = stat.toByteArray(charset("utf-8"))
|
138 |
|
139 |
|
140 |
|
141 | if (sendBuffer!!.remaining() < data.size + 1) {
|
142 | flush()
|
143 | }
|
144 |
|
145 |
|
146 | if (sendBuffer!!.position() > 0) {
|
147 | sendBuffer!!.put('\n'.toByte())
|
148 | }
|
149 |
|
150 | sendBuffer!!.put(data)
|
151 |
|
152 | if (!multiMetrics) {
|
153 | flush()
|
154 | }
|
155 |
|
156 | return true
|
157 | } catch (e: IOException) {
|
158 | log.error(String.format("Could not send stat %s to host %s:%d", sendBuffer!!.toString(), address.hostName, address.port), e)
|
159 |
|
160 | return false
|
161 | }
|
162 | }
|
163 |
|
164 | @Synchronized
|
165 | fun flush(): Boolean {
|
166 | try {
|
167 | val sizeOfBuffer = sendBuffer!!.position()
|
168 |
|
169 | if (sizeOfBuffer <= 0) {
|
170 | return false
|
171 | }
|
172 |
|
173 |
|
174 | sendBuffer!!.flip()
|
175 |
|
176 | val nbSentBytes = channel.send(sendBuffer, address)
|
177 |
|
178 | sendBuffer!!.limit(sendBuffer!!.capacity())
|
179 | sendBuffer!!.rewind()
|
180 |
|
181 | return if (sizeOfBuffer == nbSentBytes) {
|
182 | true
|
183 | } else {
|
184 | log.error(String.format(
|
185 | "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
|
186 | sendBuffer!!.toString(),
|
187 | address.hostName,
|
188 | address.port,
|
189 | nbSentBytes,
|
190 | sizeOfBuffer
|
191 | ))
|
192 |
|
193 | false
|
194 | }
|
195 | } catch (e: IOException) {
|
196 |
|
197 | log.error(String.format("Could not send stat %s to host %s:%d", sendBuffer!!.toString(), address.hostName, address.port), e)
|
198 | return false
|
199 | }
|
200 | }
|
201 |
|
202 | companion object {
|
203 | private val RNG = Random()
|
204 | private val log = Logger.getLogger(StatsdClient::class.java.name)
|
205 | }
|
206 | }
|