1 |
|
2 |
|
3 |
|
4 |
|
5 | import EventEmitter from 'events'
|
6 |
|
7 | import { httpReq } from './uninstrumentedHttpUtils'
|
8 |
|
9 | import type { MinimalMetricDimensions } from './types'
|
10 |
|
11 |
|
12 | export type Metric = {
|
13 | statName: string,
|
14 | value: number,
|
15 | dims?: ?{},
|
16 | timestampMs?: ?number,
|
17 | }
|
18 |
|
19 | export type StatNameMode = 'measurements' | 'fields'
|
20 |
|
21 | export default class InfluxDBClient extends EventEmitter {
|
22 |
|
23 | _influxDBHost: string
|
24 | _influxDBDBName: string
|
25 | _statNameMode: StatNameMode
|
26 | _flushIntervalMS: number
|
27 | _counterPoints: Map<string, number>
|
28 | _gaugePoints: Map<string, number>
|
29 | _timerPoints: Map<string, Array<number>>
|
30 | _baseDims: Object
|
31 |
|
32 |
|
33 | constructor(influxDBHost: string, influxDBDBName: string, statNameMode: StatNameMode, flushIntervalMS: ?number, baseDims?: ?Object) {
|
34 | super()
|
35 |
|
36 | this._influxDBHost = influxDBHost
|
37 | this._influxDBDBName = influxDBDBName
|
38 | this._statNameMode = statNameMode
|
39 | this._flushIntervalMS = flushIntervalMS || 0
|
40 | this._baseDims = baseDims || {}
|
41 | this._counterPoints = new Map()
|
42 | this._gaugePoints = new Map()
|
43 | this._timerPoints = new Map()
|
44 |
|
45 | if (this._flushIntervalMS) {
|
46 | setInterval(this.flushMetrics, this._flushIntervalMS)
|
47 | }
|
48 | }
|
49 |
|
50 |
|
51 | trackMetrics = async (metrics: Array<Metric>) => {
|
52 | metrics.forEach(({ statName, value, dims }) => {
|
53 | const filledDims = this._fillDims(dims)
|
54 | const metricKey = this._statNameMode === 'fields' ?
|
55 | `stat${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} ${statName}` :
|
56 | `${statName}${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} value`
|
57 | switch (filledDims.metricType) {
|
58 | case 'counters':
|
59 | this._counterPoints.set(metricKey, Number(this._counterPoints.get(metricKey) || 0) + Number(value))
|
60 | break
|
61 | case 'gauges':
|
62 | this._gaugePoints.set(metricKey, Number(value))
|
63 | break
|
64 | case 'timers':
|
65 | this._timerPoints.set(metricKey, [ ...(this._timerPoints.get(metricKey) || []), Number(value) ])
|
66 | break
|
67 | default:
|
68 | throw new Error(`Illegal metricType: '${filledDims.metricType}'`)
|
69 | }
|
70 | })
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | if (!this._flushIntervalMS) {
|
78 | await this.flushMetrics()
|
79 | }
|
80 | }
|
81 |
|
82 | flushMetrics = async () => {
|
83 | try {
|
84 | if (this._counterPoints.size + this._gaugePoints.size + this._timerPoints.size === 0)
|
85 | return
|
86 |
|
87 | const pointsStr = [
|
88 | Array.from(this._counterPoints.keys()).map(key => `${key}=${ this._counterPoints.get(key) || 0 }`).join('\n'),
|
89 | Array.from(this._gaugePoints.keys()).map(key => `${key}=${ this._gaugePoints.get(key) || 0 }`).join('\n'),
|
90 | Array.from(this._timerPoints.keys()).map(key => `${key}=${ (this._timerPoints.get(key) || []).reduce((a, b) => a + b, 0) / (this._timerPoints.get(key) || []).length }`).join('\n'),
|
91 | ].join('\n')
|
92 |
|
93 | this._counterPoints = new Map()
|
94 | this._gaugePoints = new Map()
|
95 | this._timerPoints = new Map()
|
96 |
|
97 |
|
98 |
|
99 | await httpReq(`http://${this._influxDBHost}/write?db=${this._influxDBDBName}`, {
|
100 | method: 'POST',
|
101 | body: pointsStr,
|
102 | maxRetries: 1,
|
103 | timeout: 10000,
|
104 | })
|
105 | } catch (err) {
|
106 | console.error('InfluxDB error', (new Date()).toUTCString(), err.toString())
|
107 |
|
108 |
|
109 | }
|
110 | }
|
111 |
|
112 | _fillDims = (dims: ?{}): MinimalMetricDimensions => {
|
113 | return {
|
114 | ...this._baseDims,
|
115 | metricType: 'counters',
|
116 | ...dims,
|
117 | }
|
118 | }
|
119 |
|
120 | }
|