Source: speech-to-text/speaker-stream.js

/**
 * Copyright 2014 IBM Corp. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

'use strict';

var Transform = require('stream').Transform;
var util = require('util');
var pullAllWith = require('lodash.pullallwith');
var noTimestamps = require('./no-timestamps');
var clone = require('clone');

/**
 * Object-Mode stream that splits up results by speaker.
 *
 * Output format is similar to existing results formats, but with an extra speaker field,
 *
 * Output results array will usually contain multiple results.
 * All results are interim until the final batch; the text may change (if options.speakerlessInterim is enabled) or move from one interim result to another.
 *
 * Keywords, words_alternatives, and other features may appear on results that come slightly earlier than the timestamp due to the way things are split up.
 *
 * Ignores interim results from the service unless options.speakerlessInterim is enabled.
 *
 * @constructor
 * @param {Object} options
 * @param {boolean} [options.speakerlessInterim=false] - emit interim results before initial speaker has been identified (allows UI to update more quickly)
 */
function SpeakerStream(options) {
  options = options || {};
  options.objectMode = true;
  this.options = options;
  Transform.call(this, options);
  /**
   * timestamps is a 2-d array.
   * The sub-array is [word, from time, to time]
   * Example:
   * [
       ["Yes", 28.92, 29.17],
       ["that's", 29.17, 29.37],
       ["right", 29.37, 29.64]
    ]
   * @type {Array<Array>}
   * @private
   */
  this.results = [];
  /**
   * speaker_labels is an array of objects.
   * Example:
   * [{
      "from": 28.92,
      "to": 29.17,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }, {
      "from": 29.17,
      "to": 29.37,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }, {
      "from": 29.37,
      "to": 29.64,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }]
   * @type {Array<Object>}
   * @private
   */
  this.speaker_labels = [];

  this.mismatchErrorEmitted = false;

  // flag to signal that labels were recieved before results, and therefore
  // the stream needs to emit on the next batch of final results
  this.extraLabels = false;
}
util.inherits(SpeakerStream, Transform);

SpeakerStream.prototype.isFinal = function() {
  return this.speaker_labels.length && this.speaker_labels[this.speaker_labels.length - 1].final;
};

// positions in the timestamps 2d array
var WORD = 0;
var FROM = 1;
var TO = 2;

SpeakerStream.ERROR_MISMATCH = 'MISMATCH';

/**
 * Builds a results object with everything we've got so far
 * @return {*}
 */
SpeakerStream.prototype.buildMessage = function() {
  var final = this.isFinal();
  this.extraLabels = false;

  // first match all speaker_labeles to the appropriate word and result
  // assumes that each speaker_label will have a matching word timestamp at the same index
  // stops processing and emits an error if this assumption is violated
  var resultIndex = 0;
  var timestampIndex = -1;
  var words = this.speaker_labels.map(
    // eslint-disable-next-line camelcase
    function(speaker_label) {
      var result = this.results[resultIndex];
      timestampIndex++;
      var timestamp = result.alternatives[0].timestamps[timestampIndex];
      if (!timestamp) {
        timestampIndex = 0;
        resultIndex++;
        result = this.results[resultIndex];
        timestamp = result && result.alternatives[0].timestamps[timestampIndex];
      }
      if (!timestamp) {
        // this shouldn't happen normally, but the TimingStream could inadvertently cause a
        // speaker_labels to be emitted before a result
        this.extraLabels = true;
        return null;
      }
      if (timestamp[FROM] !== speaker_label.from || timestamp[TO] !== speaker_label.to) {
        if (!this.mismatchErrorEmitted) {
          var err = new Error('Mismatch between speaker_label and word timestamp');
          err.name = SpeakerStream.ERROR_MISMATCH;
          // eslint-disable-next-line camelcase
          err.speaker_label = speaker_label;
          err.timestamp = timestamp;
          // eslint-disable-next-line camelcase
          err.speaker_labels = this.speaker_labels;
          err.results = this.results;
          this.emit('error', err);
          this.mismatchErrorEmitted = true; // If one is off, then a bunch probably are. Just emit one error.
        }
        return null;
      }
      return {
        timestamp: timestamp,
        speaker: speaker_label.speaker,
        result: result
      };
    },
    this
  );

  // assume that there's nothing new to emit right now,
  // wait for new results to match our new labels
  if (this.extraLabels) {
    return;
  }

  // filter out any nulls
  words = words.filter(function(w) {
    return w;
  });

  // group the words together into utterances by speaker
  var utterances = words.reduce(
    function(arr, word) {
      var utterance = arr[arr.length - 1];
      // any time the speaker changes or the (original) result changes, create a new utterance
      if (!utterance || utterance.speaker !== word.speaker || utterance.result !== word.result) {
        utterance = {
          speaker: word.speaker,
          timestamps: [word.timestamp],
          result: word.result
        };
        // and add it to the list
        arr.push(utterance);
      } else {
        // otherwise just append the current word to the current result
        utterance.timestamps.push(word.timestamp);
      }
      return arr;
    },
    []
  );

  // create new results
  var results = utterances.map(function(utterance, i) {
    // if this is the first usage of this result, clone the original (to keep keywords and such)
    // otherwise create a new one
    var result;
    var lastUtterance = utterances[i - 1] || {};
    if (utterance.result === lastUtterance.result) {
      result = { alternatives: [{}] };
    } else {
      result = clone(utterance.result);
    }

    // update the result object
    // set the speaker
    result.speaker = utterance.speaker;
    // overwrite the transcript and timestamps on the first alternative
    var alt = result.alternatives[0];
    alt.transcript = utterance.timestamps
      .map(function(ts) {
        return ts[WORD];
      })
      .join(' ') +
      ' ';
    alt.timestamps = utterance.timestamps;
    // overwrite the final value
    result.final = final;

    var start = utterance.timestamps[0][1];
    var end = utterance.timestamps[utterance.timestamps.length - 1][2];

    // overwrite the word_alternatives
    if (utterance.result.word_alternatives) {
      var alts = utterance.result.word_alternatives.filter(function(walt) {
        return walt.start_time >= start && walt.end_time <= end;
      });
      result.word_alternatives = alts;
    }

    // overwrite the keywords spotted
    /* eslint-disable camelcase */
    var original_keywords_result = utterance.result.keywords_result;
    if (original_keywords_result) {
      var keywords_result = {};
      Object.keys(original_keywords_result).forEach(function(keyword) {
        var spottings = original_keywords_result[keyword].filter(function(spotting) {
          return spotting.start_time >= start && spotting.end_time <= end;
        });
        if (spottings.length) {
          keywords_result[keyword] = spottings;
        }
      });
      result.keywords_result = keywords_result;
    }
    /* eslint-enable camelcase */

    return result;
  });

  // result_index is always 0 because the results always includes the entire conversation so far.
  return { results: results, result_index: 0 };
};

/**
 * Captures the timestamps out of results or errors if timestamps are missing
 * @param {Object} data
 */
SpeakerStream.prototype.handleResults = function(data) {
  if (noTimestamps(data)) {
    var err = new Error('SpeakerStream requires that timestamps and speaker_labels be enabled');
    err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
    this.emit('error', err);
    return;
  }
  data.results
    .filter(function(result) {
      return result.final;
    })
    .forEach(
      function(result) {
        this.results.push(result);
      },
      this
    );
};

// sorts by start time and then end time
SpeakerStream.speakerLabelsSorter = function(a, b) {
  if (a.from === b.from) {
    if (a.to === b.to) {
      return 0;
    }
    return a.to < b.to ? -1 : 1;
  }
  return a.from < b.from ? -1 : 1;
};

/**
 * Only the very last labeled word gets final: true. Up until that point, all speaker_labels are considered interim and
 * may be repeated with a new speaker selected in a later set of speaker_labels.
 *
 * @private
 * @param {Object} data
 */
SpeakerStream.prototype.handleSpeakerLabels = function(data) {
  var speaker_labels = data.speaker_labels; // eslint-disable-line camelcase

  // remove any values from the old speaker_labels that are duplicated in the new set
  pullAllWith(this.speaker_labels, speaker_labels, function(old, nw) {
    return old.from === nw.from && old.to === nw.to;
  });

  // next append the new labels to the remaining old ones
  this.speaker_labels.push.apply(this.speaker_labels, data.speaker_labels);

  // finally, ensure the list is still sorted chronologically
  this.speaker_labels.sort(SpeakerStream.speakerLabelsSorter);
};

SpeakerStream.prototype._transform = function(data, encoding, next) {
  var message;
  if (Array.isArray(data.results)) {
    this.handleResults(data);
    if (this.options.speakerlessInterim && data.results.length && data.results[0].final === false) {
      message = this.buildMessage();
      message.results = message.results.concat(data.results);
    }
    // clean up if things got out of order
    if (this.extraLabels && data.results.length && data.results[0].final === true) {
      message = this.buildMessage();
    }
  }
  if (Array.isArray(data.speaker_labels)) {
    this.handleSpeakerLabels(data);
    message = this.buildMessage();
  }
  if (message) {
    /**
     * Emit an object similar to the normal results object, only with multiple entries in the results Array (a new one
     * each time the speaker changes), and with a speaker field on the results.
     *
     * result_index is always 0 because the results always includes the entire conversation so far.
     *
     * @event SpeakerStream#data
     * @param {Object} results-format message with multiple results and an extra speaker field on each result
     */
    this.push(message);
  }
  next();
};

/**
 * catches cases where speaker_labels was not enabled and internal errors that cause data loss
 *
 * @param {Function} done
 * @private
 */
SpeakerStream.prototype._flush = function(done) {
  var timestamps = this.results
    .map(function(r) {
      return r.alternatives[0].timestamps;
    })
    .reduce(
      function(a, b) {
        return a.concat(b);
      },
      []
    );
  if (timestamps.length !== this.speaker_labels.length) {
    var msg;
    if (timestamps.length && !this.speaker_labels.length) {
      msg = 'No speaker_labels found. SpeakerStream requires speaker_labels to be enabled.';
    } else {
      msg = 'Mismatch between number of word timestamps (' +
        timestamps.length +
        ') and number of speaker_labels (' +
        this.speaker_labels.length +
        ') - some data may be lost.';
    }
    var err = new Error(msg);
    err.name = SpeakerStream.ERROR_MISMATCH;
    err.speaker_labels = this.speaker_labels;
    err.results = this.results;
    this.emit('error', err);
  }
  done();
};

SpeakerStream.prototype.promise = require('./to-promise');

module.exports = SpeakerStream;