Source: speech-to-text/speaker-stream.js

/**
 * Copyright 2014 IBM Corp. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

'use strict';


var Transform = require('stream').Transform;
var util = require('util');
var pullAllWith = require('lodash.pullallwith');
var noTimestamps = require('./no-timestamps');

/**
 * Object-Mode stream that splits up results by speaker.
 *
 * Output format is similar to existing results formats, but with an extra speaker field,
 *
 * Output results array will usually contain multiple results.
 * All results are interim until the final batch; the text will not change, but the speaker may, and so the text may move from one interim result to another.
 *
 * Note: when combined with a TimingStream, data events may contain a combination of final and interim results (with the last one sometimes being interim)
 *
 * Ignores interim results from the service.
 *
 * @constructor
 * @param {Object} options
 */
function SpeakerStream(options) {
  options = options || {};
  options.objectMode = true;
  Transform.call(this, options);
  /**
   * timestamps is a 2-d array.
   * The sub-array is [word, from time, to time]
   * Example:
   * [
       ["Yes", 28.92, 29.17],
       ["that's", 29.17, 29.37],
       ["right", 29.37, 29.64]
    ]
   * @type {Array<Array>}
   * @private
   */
  this.timestamps = [];
  /**
   * speaker_labels is an array of objects.
   * Example:
   * [{
      "from": 28.92,
      "to": 29.17,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }, {
      "from": 29.17,
      "to": 29.37,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }, {
      "from": 29.37,
      "to": 29.64,
      "speaker": 1,
      "confidence": 0.641,
      "final": false
    }]
   * @type {Array<Object>}
   * @private
   */
  this.speaker_labels = [];

}
util.inherits(SpeakerStream, Transform);

SpeakerStream.prototype.isFinal = function() {
  return this.speaker_labels.length && this.speaker_labels[this.speaker_labels.length - 1].final;
};

// positions in the timestamps 2d array
var WORD = 0;
var FROM = 1;
var TO = 2;


SpeakerStream.ERROR_MISMATCH = 'MISMATCH';


SpeakerStream.prototype.process = function() {
  var final = this.isFinal();
  var errored = false;

  // assumes that each speaker_label will have a matching word timestamp at the same index
  // stops processing and emits an error if this assumption is violated
  var pairs = this.speaker_labels.map(function(label, i) {
    var timestamp = this.timestamps[i];
    if (!timestamp || timestamp[FROM] !== label.from || timestamp[TO] !== label.to) {
      if (!errored) {
        var err = new Error('Mismatch between speaker_label and word timestamp');
        err.name = SpeakerStream.ERROR_MISMATCH;
        err.speaker_label = label;
        err.timestamp = timestamp;
        err.speaker_labels = this.speaker_labels;
        err.timestamps = this.timestamps;
        this.emit('error', err);
        errored = true;
      }
      return null;
    }
    return [timestamp, label];
  }, this);

  if (errored) {
    return;
  }

  var results = pairs.reduce(function(arr, pair) {
    // this turns our pairs into something that looks like a regular results object, only with a speaker field
    // each result represents a single "line" from a particular speaker
    // todo: consider also splitting results up at pauses (where they are split when they arrive from the service) - FormatStream helps here
    var currentResult = arr[arr.length - 1];
    if (!currentResult || currentResult.speaker !== pair[1].speaker) {
      // new speaker - start a new result
      // todo: consider trying to include word alternatives and other features in these results
      currentResult = {
        speaker: pair[1].speaker,
        alternatives: [{
          transcript: pair[0][WORD] + ' ',
          timestamps: [
            pair[0]
          ]
        }],
        final: final
      };
      // and add it to the list
      arr.push(currentResult);
    } else {
      // otherwise just append the current word to the current result
      currentResult.alternatives[0].transcript += pair[0][WORD] + ' ';
      currentResult.alternatives[0].timestamps.push(pair[0]);
    }
    return arr;
  }, []);

  if (results.length) {
    /**
     * Emit an object similar to the normal results object, only with multiple entries in the results Array (a new one
     * each time the speaker changes), and with a speaker field on the results.
     *
     * result_index is always 0 because the results always includes the entire conversation so far.
     *
     * @event SpeakerStream#data
     * @param {Object} results-format message with multiple results and an extra speaker field on each result
     */
    this.push({results: results, result_index: 0});
  }
};

/**
 * Captures the timestamps out of results or errors if timestamps are missing
 * @param {Object} data
 */
SpeakerStream.prototype.handleResults = function(data) {
  if (noTimestamps(data)) {
    var err = new Error('SpeakerStream requires that timestamps and speaker_labels be enabled');
    err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
    this.emit('error', err);
    return;
  }
  data.results.filter(function(result) {
    return result.final;
  }).forEach(function(result) {
    this.timestamps = this.timestamps.concat(result.alternatives[0].timestamps);
  }, this);
};

// sorts by start time and then end time and then finality
SpeakerStream.speakerLabelsSorter = function(a, b) {
  if (a.from === b.from) {
    if (a.to === b.to) {
      return 0;
    }
    return a.to < b.to ? -1 : 1;
  }
  return a.from < b.from ? -1 : 1;
};

/**
 * Only the very last labeled word gets final: true. Up until that point, all speaker_labels are considered interim and
 * may be repeated with a new speaker selected in a later set of speaker_labels.
 *
 * @private
 * @param {Object} data
 */
SpeakerStream.prototype.handleSpeakerLabels = function(data) {
  var speaker_labels = data.speaker_labels; // eslint-disable-line camelcase

  // remove any values from the old speaker_labels that are duplicated in the new set
  pullAllWith(this.speaker_labels, speaker_labels, function(old, nw) {
    return old.from === nw.from && old.to === nw.to;
  });

  // next append the new labels to the remaining old ones
  this.speaker_labels.push.apply(this.speaker_labels, data.speaker_labels);

  // finally, ensure the list is still sorted chronologically
  this.speaker_labels.sort(SpeakerStream.speakerLabelsSorter);
};

SpeakerStream.prototype._transform = function(data, encoding, next) {
  if (Array.isArray(data.results)) {
    this.handleResults(data);
  }
  if (Array.isArray(data.speaker_labels)) {
    this.handleSpeakerLabels(data);
    this.process();
  }
  next();
};

/**
 * catches cases where speaker_labels was not enabled and internal errors that cause data loss
 *
 * @param {Function} done
 * @private
 */
SpeakerStream.prototype._flush = function(done) {
  if (this.timestamps.length !== this.speaker_labels.length) {
    var msg;
    if (this.timestamps.length && !this.speaker_labels.length) {
      msg = 'No speaker_labels found. SpeakerStream requires speaker_labels to be enabled.';
    } else {
      msg = 'Mismatch between number of word timestamps (' + this.timestamps.length + ') and number of speaker_labels (' + this.speaker_labels.length + ') - some data may be lost.';
    }
    var err = new Error(msg);
    err.name = SpeakerStream.ERROR_MISMATCH;
    err.speaker_labels = this.speaker_labels;
    err.timestamps = this.timestamps;
    this.emit('error', err);
  }
  done();
};

SpeakerStream.prototype.promise = require('./to-promise');

module.exports = SpeakerStream;