UNPKG

4.12 kBJavaScriptView Raw
1// @flow
2
3// NOTE: don't use Module here, since Module uses this class
4
5import EventEmitter from 'events'
6
7import { httpReq } from './uninstrumentedHttpUtils'
8
9import type { MinimalMetricDimensions } from './types'
10
11
12export type Metric = {
13 statName: string,
14 value: number,
15 dims?: ?{},
16 timestampMs?: ?number,
17}
18
19export type StatNameMode = 'measurements' | 'fields'
20
21export default class InfluxDBClient extends EventEmitter {
22
23 _influxDBHost: string
24 _influxDBDBName: string
25 _statNameMode: StatNameMode
26 _flushIntervalMS: number
27 _counterPoints: Map<string, number>
28 _gaugePoints: Map<string, number>
29 _timerPoints: Map<string, Array<number>>
30 _baseDims: Object
31
32
33 constructor(influxDBHost: string, influxDBDBName: string, statNameMode: StatNameMode, flushIntervalMS: ?number, baseDims?: ?Object) {
34 super()
35
36 this._influxDBHost = influxDBHost
37 this._influxDBDBName = influxDBDBName
38 this._statNameMode = statNameMode
39 this._flushIntervalMS = flushIntervalMS || 0
40 this._baseDims = baseDims || {}
41 this._counterPoints = new Map()
42 this._gaugePoints = new Map()
43 this._timerPoints = new Map()
44
45 if (this._flushIntervalMS) {
46 setInterval(this.flushMetrics, this._flushIntervalMS)
47 }
48 }
49
50
51 trackMetrics = async (metrics: Array<Metric>) => {
52 metrics.forEach(({ statName, value, dims }) => {
53 const filledDims = this._fillDims(dims)
54 const metricKey = this._statNameMode === 'fields' ?
55 `stat${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} ${statName}` :
56 `${statName}${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} value`
57 switch (filledDims.metricType) {
58 case 'counters':
59 this._counterPoints.set(metricKey, Number(this._counterPoints.get(metricKey) || 0) + Number(value))
60 break
61 case 'gauges':
62 this._gaugePoints.set(metricKey, Number(value))
63 break
64 case 'timers':
65 this._timerPoints.set(metricKey, [ ...(this._timerPoints.get(metricKey) || []), Number(value) ])
66 break
67 default:
68 throw new Error(`Illegal metricType: '${filledDims.metricType}'`)
69 }
70 })
71
72 //const body = `${statName}${ dims ? `${Object.keys(dims).map(k => `,${k}=${dims[k]}`).join('')}` : '' } value=${value}${ timestampMs ? ` ${timestampMs}000000` : '' }`
73
74 // $FlowIgnore
75 //const newMetricStrs = metrics.map(({ statName, value, dims, timestampMs }) => `stat${ dims ? `${Object.keys(dims).map(k => `,${k}=${dims[k]}`).join('')}` : '' } ${statName}=${value}${ timestampMs ? ` ${timestampMs}000000` : '' }`)
76
77 if (!this._flushIntervalMS) {
78 await this.flushMetrics()
79 }
80 }
81
82 flushMetrics = async () => {
83 try {
84 if (this._counterPoints.size + this._gaugePoints.size + this._timerPoints.size === 0)
85 return
86
87 const pointsStr = [
88 Array.from(this._counterPoints.keys()).map(key => `${key}=${ this._counterPoints.get(key) || 0 }`).join('\n'),
89 Array.from(this._gaugePoints.keys()).map(key => `${key}=${ this._gaugePoints.get(key) || 0 }`).join('\n'),
90 Array.from(this._timerPoints.keys()).map(key => `${key}=${ (this._timerPoints.get(key) || []).reduce((a, b) => a + b, 0) / (this._timerPoints.get(key) || []).length }`).join('\n'),
91 ].join('\n')
92
93 this._counterPoints = new Map()
94 this._gaugePoints = new Map()
95 this._timerPoints = new Map()
96
97 // console.log('Tracking:\n' + pointsStr) // eslint-disable-line no-console
98
99 await httpReq(`http://${this._influxDBHost}/write?db=${this._influxDBDBName}`, {
100 method: 'POST',
101 body: pointsStr,
102 maxRetries: 1,
103 timeout: 10000,
104 })
105 } catch (err) {
106 console.error('InfluxDB error', (new Date()).toUTCString(), err.toString()) // eslint-disable-line no-console
107 // TODO: make sure these errors are logged
108 //this.emit('error', { err: err })
109 }
110 }
111
112 _fillDims = (dims: ?{}): MinimalMetricDimensions => {
113 return {
114 ...this._baseDims,
115 metricType: 'counters', // a metric is a counter by default
116 ...dims,
117 }
118 }
119
120}