1 | /*
|
2 |
|
3 | Scala implementation of Andrew Gwozdziewycz's StatsdClient.java
|
4 |
|
5 | Copyright (c) 2013 Joshua Garnett
|
6 |
|
7 | Permission is hereby granted, free of charge, to any person obtaining a copy
|
8 | of this software and associated documentation files (the "Software"), to deal
|
9 | in the Software without restriction, including without limitation the rights
|
10 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
11 | copies of the Software, and to permit persons to whom the Software is
|
12 | furnished to do so, subject to the following conditions:
|
13 |
|
14 | The above copyright notice and this permission notice shall be included in
|
15 | all copies or substantial portions of the Software.
|
16 |
|
17 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
18 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
19 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
20 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
21 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
22 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
23 | THE SOFTWARE.
|
24 | */
|
25 |
|
26 | package com.statsd
|
27 |
|
28 | import java.io.IOException
|
29 | import java.net._
|
30 | import java.nio.ByteBuffer
|
31 | import java.nio.channels.DatagramChannel
|
32 | import java.util.Random
|
33 | import org.slf4j.LoggerFactory
|
34 | import akka.actor._
|
35 |
|
36 | /**
|
37 | * Client for sending stats to StatsD uses Akka to manage concurrency
|
38 | *
|
39 | * @param context The Akka ActorContext
|
40 | * @param host The statsd host
|
41 | * @param port The statsd port
|
42 | * @param multiMetrics If true, multiple stats will be sent in a single UDP packet
|
43 | * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet
|
44 | */
|
45 | class StatsD(context: ActorContext,
|
46 | host: String,
|
47 | port: Int,
|
48 | multiMetrics: Boolean = true,
|
49 | packetBufferSize: Int = 1024) {
|
50 |
|
51 | private val rand = new Random()
|
52 |
|
53 | private val actorRef = context.actorOf(Props(new StatsDActor(host, port, multiMetrics, packetBufferSize)))
|
54 |
|
55 | /**
|
56 | * Sends timing stats in milliseconds to StatsD
|
57 | *
|
58 | * @param key name of the stat
|
59 | * @param value time in milliseconds
|
60 | */
|
61 | def timing(key: String, value: Int, sampleRate: Double = 1.0) = {
|
62 | send(key, value.toString, StatsDProtocol.TIMING_METRIC, sampleRate)
|
63 | }
|
64 |
|
65 | /**
|
66 | * Decrement StatsD counter
|
67 | *
|
68 | * @param key name of the stat
|
69 | * @param magnitude how much to decrement
|
70 | */
|
71 | def decrement(key: String, magnitude: Int = -1, sampleRate: Double = 1.0) = {
|
72 | increment(key, magnitude, sampleRate)
|
73 | }
|
74 |
|
75 | /**
|
76 | * Increment StatsD counter
|
77 | *
|
78 | * @param key name of the stat
|
79 | * @param magnitude how much to increment
|
80 | */
|
81 | def increment(key: String, magnitude: Int = 1, sampleRate: Double = 1.0) = {
|
82 | send(key, magnitude.toString, StatsDProtocol.COUNTER_METRIC, sampleRate)
|
83 | }
|
84 |
|
85 | /**
|
86 | * StatsD now also supports gauges, arbitrary values, which can be recorded.
|
87 | *
|
88 | * @param key name of the stat
|
89 | * @param value Can be a fixed value or increase or decrease (Ex: "10" "-1" "+5")
|
90 | */
|
91 | def gauge(key: String, value: String = "1", sampleRate: Double = 1.0) = {
|
92 | send(key, value, StatsDProtocol.GAUGE_METRIC, sampleRate)
|
93 | }
|
94 |
|
95 | /**
|
96 | * StatsD supports counting unique occurrences of events between flushes, using a Set to store all occurring events.
|
97 | *
|
98 | * @param key name of the stat
|
99 | * @param value value of the set
|
100 | */
|
101 | def set(key: String, value: Int, sampleRate: Double = 1.0) = {
|
102 | send(key, value.toString, StatsDProtocol.SET_METRIC, sampleRate)
|
103 | }
|
104 |
|
105 | /**
|
106 | * Checks the sample rate and sends the stat to the actor if it passes
|
107 | */
|
108 | private def send(key: String, value: String, metric: String, sampleRate: Double): Boolean = {
|
109 | if (sampleRate >= 1 || rand.nextDouble <= sampleRate) {
|
110 | actorRef ! SendStat(StatsDProtocol.stat(key, value, metric, sampleRate))
|
111 | true
|
112 | }
|
113 | else {
|
114 | false
|
115 | }
|
116 | }
|
117 | }
|
118 |
|
119 | object StatsDProtocol {
|
120 | val TIMING_METRIC = "ms"
|
121 | val COUNTER_METRIC = "c"
|
122 | val GAUGE_METRIC = "g"
|
123 | val SET_METRIC = "s"
|
124 |
|
125 | /**
|
126 | * @return Returns a string that conforms to the StatsD protocol:
|
127 | * KEY:VALUE|METRIC or KEY:VALUE|METRIC|@SAMPLE_RATE
|
128 | */
|
129 | def stat(key: String, value: String, metric: String, sampleRate: Double) = {
|
130 | val sampleRateString = if (sampleRate < 1) "|@" + sampleRate else ""
|
131 | key + ":" + value + "|" + metric + sampleRateString
|
132 | }
|
133 | }
|
134 |
|
135 | /**
|
136 | * Message for the StatsDActor
|
137 | */
|
138 | private case class SendStat(stat: String)
|
139 |
|
140 | /**
|
141 | * @param host The statsd host
|
142 | * @param port The statsd port
|
143 | * @param multiMetrics If true, multiple stats will be sent in a single UDP packet
|
144 | * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet
|
145 | */
|
146 | private class StatsDActor(host: String,
|
147 | port: Int,
|
148 | multiMetrics: Boolean,
|
149 | packetBufferSize: Int) extends Actor {
|
150 |
|
151 | private val log = LoggerFactory.getLogger(getClass())
|
152 |
|
153 | private val sendBuffer = ByteBuffer.allocate(packetBufferSize)
|
154 |
|
155 | private val address = new InetSocketAddress(InetAddress.getByName(host), port)
|
156 | private val channel = DatagramChannel.open()
|
157 |
|
158 | def receive = {
|
159 | case msg: SendStat => doSend(msg.stat)
|
160 | case _ => log.error("Unknown message")
|
161 | }
|
162 |
|
163 | override def postStop() = {
|
164 | //save any remaining data to StatsD
|
165 | flush
|
166 |
|
167 | //Close the channel
|
168 | if (channel.isOpen()) {
|
169 | channel.close()
|
170 | }
|
171 |
|
172 | sendBuffer.clear()
|
173 | }
|
174 |
|
175 | private def doSend(stat: String) = {
|
176 | try {
|
177 | val data = stat.getBytes("utf-8")
|
178 |
|
179 | // If we're going to go past the threshold of the buffer then flush.
|
180 | // the +1 is for the potential '\n' in multi_metrics below
|
181 | if (sendBuffer.remaining() < (data.length + 1)) {
|
182 | flush
|
183 | }
|
184 |
|
185 | // multiple metrics are separated by '\n'
|
186 | if (sendBuffer.position() > 0) {
|
187 | sendBuffer.put('\n'.asInstanceOf[Byte])
|
188 | }
|
189 |
|
190 | // append the data
|
191 | sendBuffer.put(data)
|
192 |
|
193 | if (!multiMetrics) {
|
194 | flush
|
195 | }
|
196 |
|
197 | }
|
198 | catch {
|
199 | case e: IOException => {
|
200 | log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e)
|
201 | }
|
202 | }
|
203 | }
|
204 |
|
205 | private def flush(): Unit = {
|
206 | try {
|
207 | val sizeOfBuffer = sendBuffer.position()
|
208 |
|
209 | if (sizeOfBuffer <= 0) {
|
210 | // empty buffer
|
211 | return
|
212 | }
|
213 |
|
214 | // send and reset the buffer
|
215 | sendBuffer.flip()
|
216 | val nbSentBytes = channel.send(sendBuffer, address)
|
217 | sendBuffer.limit(sendBuffer.capacity())
|
218 | sendBuffer.rewind()
|
219 |
|
220 | if (sizeOfBuffer != nbSentBytes) {
|
221 | log.error("Could not send entirely stat {} to host {}:{}. Only sent {} bytes out of {} bytes", sendBuffer.toString(),
|
222 | address.getHostName(), address.getPort().toString, nbSentBytes.toString, sizeOfBuffer.toString)
|
223 | }
|
224 |
|
225 | }
|
226 | catch {
|
227 | case e: IOException => {
|
228 | log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e)
|
229 | }
|
230 | }
|
231 | }
|
232 | } |
\ | No newline at end of file |