Source: rtc/monitor.js

const EventEmitter = require('events').EventEmitter;
const getStatistics = require('./stats');
const inherits = require('util').inherits;
const Mos = require('./mos');

// How many samples we use when testing metric thresholds
const SAMPLE_COUNT_METRICS = 5;

// How many samples that need to cross the threshold to
// raise or clear a warning.
const SAMPLE_COUNT_CLEAR = 0;
const SAMPLE_COUNT_RAISE = 3;

const SAMPLE_INTERVAL = 1000;
const WARNING_TIMEOUT = 5 * 1000;

/**
 * @typedef {Object} RTCMonitor.ThresholdOptions
 * @property {RTCMonitor.ThresholdOption} [audioInputLevel] - Rules to apply to sample.audioInputLevel
 * @property {RTCMonitor.ThresholdOption} [audioOutputLevel] - Rules to apply to sample.audioOutputLevel
 * @property {RTCMonitor.ThresholdOption} [packetsLostFraction] - Rules to apply to sample.packetsLostFraction
 * @property {RTCMonitor.ThresholdOption} [jitter] - Rules to apply to sample.jitter
 * @property {RTCMonitor.ThresholdOption} [rtt] - Rules to apply to sample.rtt
 * @property {RTCMonitor.ThresholdOption} [mos] - Rules to apply to sample.mos
 *//**
 * @typedef {Object} RTCMonitor.ThresholdOption
 * @property {?Number} [min] - Warning will be raised if tracked metric falls below this value.
 * @property {?Number} [max] - Warning will be raised if tracked metric rises above this value.
 * @property {?Number} [maxDuration] - Warning will be raised if tracked metric stays constant for
 *   the specified number of consequent samples.
 */
const DEFAULT_THRESHOLDS = {
  audioInputLevel: { maxDuration: 10 },
  audioOutputLevel: { maxDuration: 10 },
  packetsLostFraction: { max: 1 },
  jitter: { max: 30 },
  rtt: { max: 400 },
  mos: { min: 3 }
};

/**
 * RTCMonitor polls a peerConnection via PeerConnection.getStats
 * and emits warnings when stats cross the specified threshold values.
 * @constructor
 * @param {RTCMonitor.Options} [options] - Config options for RTCMonitor.
 *//**
 * @typedef {Object} RTCMonitor.Options
 * @property {PeerConnection} [peerConnection] - The PeerConnection to monitor.
 * @property {RTCMonitor.ThresholdOptions} [thresholds] - Optional custom threshold values.
 */
function RTCMonitor(options) {
  if (!(this instanceof RTCMonitor)) {
    return new RTCMonitor(options);
  }

  options = options || { };
  const thresholds = Object.assign({ }, DEFAULT_THRESHOLDS, options.thresholds);

  Object.defineProperties(this, {
    _activeWarnings: { value: new Map() },
    _currentStreaks: { value: new Map() },
    _peerConnection: { value: options.peerConnection, writable: true },
    _sampleBuffer: { value: [] },
    _sampleInterval: { value: null, writable: true },
    _thresholds: { value: thresholds },
    _warningsEnabled: { value: true, writable: true }
  });

  if (options.peerConnection) {
    this.enable();
  }

  EventEmitter.call(this);
}

inherits(RTCMonitor, EventEmitter);

/**
 * Create a sample object from a stats object using the previous sample,
 *   if available.
 * @param {Object} stats - Stats retrieved from getStatistics
 * @param {?Object} [previousSample=null] - The previous sample to use to calculate deltas.
 * @returns {Promise<RTCSample>}
 */
RTCMonitor.createSample = function createSample(stats, previousSample) {
  const previousPacketsSent = previousSample && previousSample.totals.packetsSent || 0;
  const previousPacketsReceived = previousSample && previousSample.totals.packetsReceived || 0;
  const previousPacketsLost = previousSample && previousSample.totals.packetsLost || 0;

  const currentPacketsSent = stats.packetsSent - previousPacketsSent;
  const currentPacketsReceived = stats.packetsReceived - previousPacketsReceived;
  const currentPacketsLost = stats.packetsLost - previousPacketsLost;
  const currentInboundPackets = currentPacketsReceived + currentPacketsLost;
  const currentPacketsLostFraction = (currentInboundPackets > 0) ?
    (currentPacketsLost / currentInboundPackets) * 100 : 0;

  const totalInboundPackets = stats.packetsReceived + stats.packetsLost;
  const totalPacketsLostFraction = (totalInboundPackets > 0) ?
    (stats.packetsLost / totalInboundPackets) * 100 : 100;

  return {
    timestamp: stats.timestamp,
    totals: {
      packetsReceived: stats.packetsReceived,
      packetsLost: stats.packetsLost,
      packetsSent: stats.packetsSent,
      packetsLostFraction: totalPacketsLostFraction,
      bytesReceived: stats.bytesReceived,
      bytesSent: stats.bytesSent
    },
    packetsSent: currentPacketsSent,
    packetsReceived: currentPacketsReceived,
    packetsLost: currentPacketsLost,
    packetsLostFraction: currentPacketsLostFraction,
    audioInputLevel: stats.audioInputLevel,
    audioOutputLevel: stats.audioOutputLevel,
    jitter: stats.jitter,
    rtt: stats.rtt,
    mos: Mos.calculate(stats, previousSample && currentPacketsLostFraction)
  };
};

/**
 * Start sampling RTC statistics for this {@link RTCMonitor}.
 * @param {PeerConnection} [peerConnection] - A PeerConnection to monitor.
 * @throws {Error} Attempted to replace an existing PeerConnection in RTCMonitor.enable
 * @throws {Error} Can not enable RTCMonitor without a PeerConnection
 * @returns {RTCMonitor} This RTCMonitor instance.
 */
RTCMonitor.prototype.enable = function enable(peerConnection) {
  if (peerConnection) {
    if (this._peerConnection && peerConnection !== this._peerConnection) {
      throw new Error('Attempted to replace an existing PeerConnection in RTCMonitor.enable');
    }

    this._peerConnection = peerConnection;
  }

  if (!this._peerConnection) {
    throw new Error('Can not enable RTCMonitor without a PeerConnection');
  }

  this._sampleInterval = this._sampleInterval ||
    setInterval(this._fetchSample.bind(this), SAMPLE_INTERVAL);

  return this;
};

/**
 * Stop sampling RTC statistics for this {@link RTCMonitor}.
 * @returns {RTCMonitor} This RTCMonitor instance.
 */
RTCMonitor.prototype.disable = function disable() {
  clearInterval(this._sampleInterval);
  this._sampleInterval = null;

  return this;
};

/**
 * Get stats from the PeerConnection.
 * @returns {Promise<RTCSample>} A universally-formatted version of RTC stats.
 */
RTCMonitor.prototype.getSample = function getSample() {
  const pc = this._peerConnection;
  const self = this;

  return getStatistics(pc).then(stats => {
    const previousSample = self._sampleBuffer.length &&
      self._sampleBuffer[self._sampleBuffer.length - 1];

    return RTCMonitor.createSample(stats, previousSample);
  });
};

/**
 * Get stats from the PeerConnection and add it to our list of samples.
 * @private
 * @returns {Promise<Object>} A universally-formatted version of RTC stats.
 */
RTCMonitor.prototype._fetchSample = function _fetchSample() {
  const self = this;

  return this.getSample().then(
    function addSample(sample) {
      self._addSample(sample);
      self._raiseWarnings();
      self.emit('sample', sample);
      return sample;
    },
    function getSampleFailed(error) {
      self.disable();
      self.emit('error', error);
    }
  );
};

/**
 * Add a sample to our sample buffer and remove the oldest if
 *   we are over the limit.
 * @private
 * @param {Object} sample - Sample to add
 */
RTCMonitor.prototype._addSample = function _addSample(sample) {
  const samples = this._sampleBuffer;
  samples.push(sample);

  // We store 1 extra sample so that we always have (current, previous)
  // available for all {sampleBufferSize} threshold validations.
  if (samples.length > SAMPLE_COUNT_METRICS) {
    samples.splice(0, samples.length - SAMPLE_COUNT_METRICS);
  }
};

/**
 * Apply our thresholds to our array of RTCStat samples.
 * @private
 */
RTCMonitor.prototype._raiseWarnings = function _raiseWarnings() {
  if (!this._warningsEnabled) { return; }

  for (const name in this._thresholds) {
    this._raiseWarningsForStat(name);
  }
};

/**
 * Enable warning functionality.
 * @returns {RTCMonitor}
 */
RTCMonitor.prototype.enableWarnings = function enableWarnings() {
  this._warningsEnabled = true;
  return this;
};

/**
 * Disable warning functionality.
 * @returns {RTCMonitor}
 */
RTCMonitor.prototype.disableWarnings = function disableWarnings() {
  if (this._warningsEnabled) {
    this._activeWarnings.clear();
  }

  this._warningsEnabled = false;
  return this;
};

/**
 * Apply thresholds for a given stat name to our array of
 *   RTCStat samples and raise or clear any associated warnings.
 * @private
 * @param {String} statName - Name of the stat to compare.
 */
RTCMonitor.prototype._raiseWarningsForStat = function _raiseWarningsForStat(statName) {
  const samples = this._sampleBuffer;
  const limits = this._thresholds[statName];

  let relevantSamples = samples.slice(-SAMPLE_COUNT_METRICS);
  const values = relevantSamples.map(sample => sample[statName]);

  // (rrowland) If we have a bad or missing value in the set, we don't
  // have enough information to throw or clear a warning. Bail out.
  const containsNull = values.some(value => typeof value === 'undefined' || value === null);

  if (containsNull) {
    return;
  }

  let count;
  if (typeof limits.max === 'number') {
    count = countHigh(limits.max, values);
    if (count >= SAMPLE_COUNT_RAISE) {
      this._raiseWarning(statName, 'max', { values });
    } else if (count <= SAMPLE_COUNT_CLEAR) {
      this._clearWarning(statName, 'max', { values });
    }
  }

  if (typeof limits.min === 'number') {
    count = countLow(limits.min, values);
    if (count >= SAMPLE_COUNT_RAISE) {
      this._raiseWarning(statName, 'min', { values });
    } else if (count <= SAMPLE_COUNT_CLEAR) {
      this._clearWarning(statName, 'min', { values });
    }
  }

  if (typeof limits.maxDuration === 'number' && samples.length > 1) {
    relevantSamples = samples.slice(-2);
    const prevValue = relevantSamples[0][statName];
    const curValue = relevantSamples[1][statName];

    const prevStreak = this._currentStreaks.get(statName) || 0;
    const streak = (prevValue === curValue) ? prevStreak + 1 : 0;

    this._currentStreaks.set(statName, streak);

    if (streak >= limits.maxDuration) {
      this._raiseWarning(statName, 'maxDuration', { value: streak });
    } else if (streak === 0) {
      this._clearWarning(statName, 'maxDuration', { value: prevStreak });
    }
  }
};

/**
 * Count the number of values that cross the min threshold.
 * @private
 * @param {Number} min - The minimum allowable value.
 * @param {Array<Number>} values - The values to iterate over.
 * @returns {Number} The amount of values in which the stat
 *   crossed the threshold.
 */
function countLow(min, values) {
  // eslint-disable-next-line no-return-assign
  return values.reduce((lowCount, value) => lowCount += (value < min) ? 1 : 0, 0);
}

/**
 * Count the number of values that cross the max threshold.
 * @private
 * @param {Number} max - The max allowable value.
 * @param {Array<Number>} values - The values to iterate over.
 * @returns {Number} The amount of values in which the stat
 *   crossed the threshold.
 */
function countHigh(max, values) {
  // eslint-disable-next-line no-return-assign
  return values.reduce((highCount, value) => highCount += (value > max) ? 1 : 0, 0);
}

/**
 * Clear an active warning.
 * @param {String} statName - The name of the stat to clear.
 * @param {String} thresholdName - The name of the threshold to clear
 * @param {?Object} [data] - Any relevant sample data.
 * @private
 */
RTCMonitor.prototype._clearWarning = function _clearWarning(statName, thresholdName, data) {
  const warningId = `${statName}:${thresholdName}`;
  const activeWarning = this._activeWarnings.get(warningId);

  if (!activeWarning || Date.now() - activeWarning.timeRaised < WARNING_TIMEOUT) { return; }
  this._activeWarnings.delete(warningId);

  this.emit('warning-cleared', Object.assign({
    name: statName,
    threshold: {
      name: thresholdName,
      value: this._thresholds[statName][thresholdName]
    }
  }, data));
};

/**
 * Raise a warning and log its raised time.
 * @param {String} statName - The name of the stat to raise.
 * @param {String} thresholdName - The name of the threshold to raise
 * @param {?Object} [data] - Any relevant sample data.
 * @private
 */
RTCMonitor.prototype._raiseWarning = function _raiseWarning(statName, thresholdName, data) {
  const warningId = `${statName}:${thresholdName}`;

  if (this._activeWarnings.has(warningId)) { return; }
  this._activeWarnings.set(warningId, { timeRaised: Date.now() });

  this.emit('warning', Object.assign({
    name: statName,
    threshold: {
      name: thresholdName,
      value: this._thresholds[statName][thresholdName]
    }
  }, data));
};

module.exports = RTCMonitor;