UNPKG

7.35 kBPlain TextView Raw
1/*
2
3Scala implementation of Andrew Gwozdziewycz's StatsdClient.java
4
5Copyright (c) 2013 Joshua Garnett
6
7Permission is hereby granted, free of charge, to any person obtaining a copy
8of this software and associated documentation files (the "Software"), to deal
9in the Software without restriction, including without limitation the rights
10to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11copies of the Software, and to permit persons to whom the Software is
12furnished to do so, subject to the following conditions:
13
14The above copyright notice and this permission notice shall be included in
15all copies or substantial portions of the Software.
16
17THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23THE SOFTWARE.
24*/
25
26package com.statsd
27
28import java.io.IOException
29import java.net._
30import java.nio.ByteBuffer
31import java.nio.channels.DatagramChannel
32import java.util.Random
33import org.slf4j.LoggerFactory
34import akka.actor._
35
36/**
37 * Client for sending stats to StatsD uses Akka to manage concurrency
38 *
39 * @param context The Akka ActorContext
40 * @param host The statsd host
41 * @param port The statsd port
42 * @param multiMetrics If true, multiple stats will be sent in a single UDP packet
43 * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet
44 */
45class StatsD(context: ActorContext,
46 host: String,
47 port: Int,
48 multiMetrics: Boolean = true,
49 packetBufferSize: Int = 1024) {
50
51 private val rand = new Random()
52
53 private val actorRef = context.actorOf(Props(new StatsDActor(host, port, multiMetrics, packetBufferSize)))
54
55 /**
56 * Sends timing stats in milliseconds to StatsD
57 *
58 * @param key name of the stat
59 * @param value time in milliseconds
60 */
61 def timing(key: String, value: Int, sampleRate: Double = 1.0) = {
62 send(key, value.toString, StatsDProtocol.TIMING_METRIC, sampleRate)
63 }
64
65 /**
66 * Decrement StatsD counter
67 *
68 * @param key name of the stat
69 * @param magnitude how much to decrement
70 */
71 def decrement(key: String, magnitude: Int = -1, sampleRate: Double = 1.0) = {
72 increment(key, magnitude, sampleRate)
73 }
74
75 /**
76 * Increment StatsD counter
77 *
78 * @param key name of the stat
79 * @param magnitude how much to increment
80 */
81 def increment(key: String, magnitude: Int = 1, sampleRate: Double = 1.0) = {
82 send(key, magnitude.toString, StatsDProtocol.COUNTER_METRIC, sampleRate)
83 }
84
85 /**
86 * StatsD now also supports gauges, arbitrary values, which can be recorded.
87 *
88 * @param key name of the stat
89 * @param value Can be a fixed value or increase or decrease (Ex: "10" "-1" "+5")
90 */
91 def gauge(key: String, value: String = "1", sampleRate: Double = 1.0) = {
92 send(key, value, StatsDProtocol.GAUGE_METRIC, sampleRate)
93 }
94
95 /**
96 * StatsD supports counting unique occurrences of events between flushes, using a Set to store all occurring events.
97 *
98 * @param key name of the stat
99 * @param value value of the set
100 */
101 def set(key: String, value: Int, sampleRate: Double = 1.0) = {
102 send(key, value.toString, StatsDProtocol.SET_METRIC, sampleRate)
103 }
104
105 /**
106 * Checks the sample rate and sends the stat to the actor if it passes
107 */
108 private def send(key: String, value: String, metric: String, sampleRate: Double): Boolean = {
109 if (sampleRate >= 1 || rand.nextDouble <= sampleRate) {
110 actorRef ! SendStat(StatsDProtocol.stat(key, value, metric, sampleRate))
111 true
112 }
113 else {
114 false
115 }
116 }
117}
118
119object StatsDProtocol {
120 val TIMING_METRIC = "ms"
121 val COUNTER_METRIC = "c"
122 val GAUGE_METRIC = "g"
123 val SET_METRIC = "s"
124
125 /**
126 * @return Returns a string that conforms to the StatsD protocol:
127 * KEY:VALUE|METRIC or KEY:VALUE|METRIC|@SAMPLE_RATE
128 */
129 def stat(key: String, value: String, metric: String, sampleRate: Double) = {
130 val sampleRateString = if (sampleRate < 1) "|@" + sampleRate else ""
131 key + ":" + value + "|" + metric + sampleRateString
132 }
133}
134
135/**
136 * Message for the StatsDActor
137 */
138private case class SendStat(stat: String)
139
140/**
141 * @param host The statsd host
142 * @param port The statsd port
143 * @param multiMetrics If true, multiple stats will be sent in a single UDP packet
144 * @param packetBufferSize If multiMetrics is true, this is the max buffer size before sending the UDP packet
145 */
146private class StatsDActor(host: String,
147 port: Int,
148 multiMetrics: Boolean,
149 packetBufferSize: Int) extends Actor {
150
151 private val log = LoggerFactory.getLogger(getClass())
152
153 private val sendBuffer = ByteBuffer.allocate(packetBufferSize)
154
155 private val address = new InetSocketAddress(InetAddress.getByName(host), port)
156 private val channel = DatagramChannel.open()
157
158 def receive = {
159 case msg: SendStat => doSend(msg.stat)
160 case _ => log.error("Unknown message")
161 }
162
163 override def postStop() = {
164 //save any remaining data to StatsD
165 flush
166
167 //Close the channel
168 if (channel.isOpen()) {
169 channel.close()
170 }
171
172 sendBuffer.clear()
173 }
174
175 private def doSend(stat: String) = {
176 try {
177 val data = stat.getBytes("utf-8")
178
179 // If we're going to go past the threshold of the buffer then flush.
180 // the +1 is for the potential '\n' in multi_metrics below
181 if (sendBuffer.remaining() < (data.length + 1)) {
182 flush
183 }
184
185 // multiple metrics are separated by '\n'
186 if (sendBuffer.position() > 0) {
187 sendBuffer.put('\n'.asInstanceOf[Byte])
188 }
189
190 // append the data
191 sendBuffer.put(data)
192
193 if (!multiMetrics) {
194 flush
195 }
196
197 }
198 catch {
199 case e: IOException => {
200 log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e)
201 }
202 }
203 }
204
205 private def flush(): Unit = {
206 try {
207 val sizeOfBuffer = sendBuffer.position()
208
209 if (sizeOfBuffer <= 0) {
210 // empty buffer
211 return
212 }
213
214 // send and reset the buffer
215 sendBuffer.flip()
216 val nbSentBytes = channel.send(sendBuffer, address)
217 sendBuffer.limit(sendBuffer.capacity())
218 sendBuffer.rewind()
219
220 if (sizeOfBuffer != nbSentBytes) {
221 log.error("Could not send entirely stat {} to host {}:{}. Only sent {} bytes out of {} bytes", sendBuffer.toString(),
222 address.getHostName(), address.getPort().toString, nbSentBytes.toString, sizeOfBuffer.toString)
223 }
224
225 }
226 catch {
227 case e: IOException => {
228 log.error("Could not send stat {} to host {}:{}", sendBuffer.toString, address.getHostName(), address.getPort().toString, e)
229 }
230 }
231 }
232}
\No newline at end of file