Source: speech-to-text/timing-stream.js

'use strict';

var Duplex = require('stream').Duplex;
var util = require('util');
var defaults = require('defaults');
var noTimestamps = require('./no-timestamps');

/**
 * Slows results down to no faster than real time.
 *
 * Useful when running recognizeFile because the text can otherwise appear before the words are spoken
 *
 * Note: when combined with a SpeakerStream, the TimingStream must come first
 * @todo: fix TimingStream to work with the output of the SpeakerStream
 *
 * @param {Object} [opts]
 * @param {*} [opts.emitAt=TimingStream.END] - set to TimingStream.START for a more subtitles-like output where results are returned as soon as the utterance begins
 * @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
 * @constructor
 */
function TimingStream(opts) {
  this.options = defaults(opts, {
    emitAt: TimingStream.END,
    delay: 0,
    allowHalfOpen: true, // keep the readable side open after the source closes
    writableObjectMode: true
  });
  Duplex.call(this, opts);

  this.startTime = Date.now();

  // queue to store future messages
  this.messages = [];

  // setTimeout handle. if null, next tick will occur whenever new data arrives
  this.nextTick = null;

  // this stream cannot end until both the messages queue is empty and the source stream has ended
  this.sourceEnded = false;

  var self = this;
  this.on('finish', function() {
    self.sourceEnded = true; // todo: see if there's anything built-in that does this for us
    self.checkForEnd();
  });
}
util.inherits(TimingStream, Duplex);

TimingStream.START = 1;
TimingStream.END = 2;

TimingStream.prototype._write = function(msg, encoding, next) {
  if (msg instanceof Buffer) {
    return this.emit('error', new Error('TimingStream requires the source to be in objectMode'));
  }
  if (Array.isArray(msg.results) && msg.results.length && noTimestamps(msg)) {
    var err = new Error('TimingStream requires timestamps');
    err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
    this.emit('error', err);
    return;
  }

  this.messages.push(msg);

  if (!this.nextTick) {
    this.scheduleNextTick();
  }
  next();
};

TimingStream.prototype._read = function(/* size*/) {
  // ignore - we'll emit results once the time has come
};

TimingStream.prototype.cutoff = function cutoff() {
  return (Date.now() - this.startTime) / 1000 - this.options.delay;
};

/**
 * Grabs the appropriate timestamp from the given message, depending on options.emitAt and the type of message
 *
 * @private
 * @param {Object} msg
 * @returns {Number} timestamp
 */
TimingStream.prototype.getMessageTime = function(msg) {
  if (this.options.emitAt === TimingStream.START) {
    if (Array.isArray(msg.results) && msg.results.length) {
      return msg.results[0].alternatives[0].timestamps[0][TimingStream.START];
    } else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
      return msg.speaker_labels[0].from;
    }
  } else {
    if (Array.isArray(msg.results) && msg.results.length) {
      var timestamps = msg.results[msg.results.length - 1].alternatives[0].timestamps;
      return timestamps[timestamps.length - 1][TimingStream.END];
    } else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
      return msg.speaker_labels[msg.speaker_labels.length - 1].to;
    }
  }
  return 0; // failsafe for unknown message types
};

/**
 * Returns one of:
 *  - null if the next result is completely later than the current cutoff
 *  - the original next result object (removing it from the array) if it's completely earlier than the current cutoff
 *  (or it's partially within range and emitAt is set to start)
 *
 * @private
 * @returns {Object|null}
 */
TimingStream.prototype.getCurrentResult = function getCurrentResult() {
  if (!this.messages.length) {
    return null;
  }
  if (this.getMessageTime(this.messages[0]) <= this.cutoff()) {
    return this.messages.shift();
  }
};


/**
 * Tick emits any buffered words that have a timestamp before the current time, then calls scheduleNextTick()
 *
 * @private
 */
TimingStream.prototype.tick = function tick() {
  var msg;
  // eslint-disable-next-line no-cond-assign
  while (msg = this.getCurrentResult()) {
    if (this.options.objectMode || this.options.readableObjectMode) {
      this.push(msg);
    } else if (Array.isArray(msg.results && msg.results.length)) {
      this.push(msg.results[0].alternatives[0].transcript);
    }
  }

  this.scheduleNextTick();
};

/**
 * Given a speaker labels message, returns the final to time
 *
 * @private
 * @param {Object} msg
 * @returns {Number}
 */
function getEnd(msg) {
  return msg.speaker_labels[msg.speaker_labels.length - 1].to;
}

TimingStream.prototype.tickSpeakerLables = function tickSpeakerLabels() {
  clearTimeout(this.nextSpeakerLabelsTick);
  if (this.speakerLabels.length && getEnd(this.speakerLabels[0]) <= this.cutoff()) {
    this.push(this.speakerLabels.shift());
  }
  if (this.speakerLabels.length) {
    var nextMsg = this.speakerLabels[0];
    var nextTime = this.startTime + (getEnd(nextMsg) * 1000);
    this.nextSpeakerLabelsTick = setTimeout(this.tickSpeakerLables.bind(this, nextTime - Date.now()));
  } else {
    this.nextSpeakerLabelsTick = null;
    this.checkForEnd();
  }
};

/**
 * Schedules next tick or checks for the end of the results
 *
 * @private
 */
TimingStream.prototype.scheduleNextTick = function scheduleNextTick() {
  clearTimeout(this.nextTick); // just in case
  if (this.messages.length) {
    var messageTime = this.getMessageTime(this.messages[0]);
    var nextTickTime = this.startTime + (messageTime * 1000); // ms since epoch
    var nextTickOffset = Math.min(0, nextTickTime - Date.now()); // ms from right now
    this.nextTick = setTimeout(this.tick.bind(this), nextTickOffset);
  } else {
    this.nextTick = null;
    this.checkForEnd();
  }
};

/**
 * Triggers the 'close' and 'end' events if both pre-conditions are true:
 *  - the previous stream must have already emitted it's 'end' event
 *  - there must be no next tick scheduled, indicating that there are no results buffered for later delivery
 *
 * @private
 */
TimingStream.prototype.checkForEnd = function() {
  if (this.sourceEnded && !this.nextTick) {
    this.emit('close');
    this.push(null);
  }
};


TimingStream.prototype.promise = require('./to-promise');

// when stop is called, immediately stop emitting results
TimingStream.prototype.stop = function stop() {
  this.emit('stop');
  this.checkForEnd(); // in case the RecognizeStream already ended
  clearTimeout(this.nextTick);
  this.nextTick = -1; // fake timer to prevent _write from scheduling new ticks
};

module.exports = TimingStream;