// @flow // NOTE: don't use Module here, since Module uses this class import EventEmitter from 'events' import { httpReq } from './uninstrumentedHttpUtils' import type { MinimalMetricDimensions } from './types' export type Metric = { statName: string, value: number, dims?: ?{}, timestampMs?: ?number, } export type StatNameMode = 'measurements' | 'fields' export default class InfluxDBClient extends EventEmitter { _influxDBHost: string _influxDBDBName: string _statNameMode: StatNameMode _flushIntervalMS: number _counterPoints: Map _gaugePoints: Map _timerPoints: Map> _baseDims: Object constructor(influxDBHost: string, influxDBDBName: string, statNameMode: StatNameMode, flushIntervalMS: ?number, baseDims?: ?Object) { super() this._influxDBHost = influxDBHost this._influxDBDBName = influxDBDBName this._statNameMode = statNameMode this._flushIntervalMS = flushIntervalMS || 0 this._baseDims = baseDims || {} this._counterPoints = new Map() this._gaugePoints = new Map() this._timerPoints = new Map() if (this._flushIntervalMS) { setInterval(this.flushMetrics, this._flushIntervalMS) } } trackMetrics = async (metrics: Array) => { metrics.forEach(({ statName, value, dims }) => { const filledDims = this._fillDims(dims) const metricKey = this._statNameMode === 'fields' ? `stat${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} ${statName}` : `${statName}${`${Object.keys(filledDims).map(k => `,${k}=${filledDims[k]}`).join('')}`} value` switch (filledDims.metricType) { case 'counters': this._counterPoints.set(metricKey, Number(this._counterPoints.get(metricKey) || 0) + Number(value)) break case 'gauges': this._gaugePoints.set(metricKey, Number(value)) break case 'timers': this._timerPoints.set(metricKey, [ ...(this._timerPoints.get(metricKey) || []), Number(value) ]) break default: throw new Error(`Illegal metricType: '${filledDims.metricType}'`) } }) //const body = `${statName}${ dims ? `${Object.keys(dims).map(k => `,${k}=${dims[k]}`).join('')}` : '' } value=${value}${ timestampMs ? ` ${timestampMs}000000` : '' }` // $FlowIgnore //const newMetricStrs = metrics.map(({ statName, value, dims, timestampMs }) => `stat${ dims ? `${Object.keys(dims).map(k => `,${k}=${dims[k]}`).join('')}` : '' } ${statName}=${value}${ timestampMs ? ` ${timestampMs}000000` : '' }`) if (!this._flushIntervalMS) { await this.flushMetrics() } } flushMetrics = async () => { try { if (this._counterPoints.size + this._gaugePoints.size + this._timerPoints.size === 0) return const pointsStr = [ Array.from(this._counterPoints.keys()).map(key => `${key}=${ this._counterPoints.get(key) || 0 }`).join('\n'), Array.from(this._gaugePoints.keys()).map(key => `${key}=${ this._gaugePoints.get(key) || 0 }`).join('\n'), Array.from(this._timerPoints.keys()).map(key => `${key}=${ (this._timerPoints.get(key) || []).reduce((a, b) => a + b, 0) / (this._timerPoints.get(key) || []).length }`).join('\n'), ].join('\n') this._counterPoints = new Map() this._gaugePoints = new Map() this._timerPoints = new Map() // console.log('Tracking:\n' + pointsStr) // eslint-disable-line no-console await httpReq(`http://${this._influxDBHost}/write?db=${this._influxDBDBName}`, { method: 'POST', body: pointsStr, maxRetries: 1, timeout: 10000, }) } catch (err) { console.error('InfluxDB error', (new Date()).toUTCString(), err.toString()) // eslint-disable-line no-console // TODO: make sure these errors are logged //this.emit('error', { err: err }) } } _fillDims = (dims: ?{}): MinimalMetricDimensions => { return { ...this._baseDims, metricType: 'counters', // a metric is a counter by default ...dims, } } }