Source: speech-to-text/timing-stream.js

'use strict';

var Duplex = require('stream').Duplex;
var util = require('util');
var clone = require('clone');
var defaults = require('defaults');

/**
 * Slows results down to no faster than real time.
 *
 * Useful when running recognizeFile because the text can otherwise appear before the words are spoken
 *
 * @param {Object} [opts]
 * @param {*} [opts.emitAtt=TimingStream.START] - set to TimingStream.END to only emit text that has been completely spoken.
 * @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
 * @constructor
 */
function TimingStream(opts) {
  this.options = defaults(opts, {
    emitAt: TimingStream.START,
    delay: 0,
    allowHalfOpen: true, // keep the readable side open after the source closes
    writableObjectMode: true
  });
  Duplex.call(this, opts);

  this.startTime = Date.now();
  // buffer to store future results
  this.final = [];
  this.interim = [];
  this.nextTick = null;
  this.sourceEnded = false;

  var self = this;
  this.on('pipe', function(source) {
    source.on('end', function() {
      self.sourceEnded = true; // todo: see if there's anything built-in that does this for us
      self.checkForEnd();
    });
  });
}
util.inherits(TimingStream, Duplex);

TimingStream.START = 1;
TimingStream.END = 2;

TimingStream.prototype._write = function(result, encoding, next) {
  if (result instanceof Buffer) {
    return this.emit('error', new Error('TimingStream requires the source to be in objectMode'));
  }
  this.handleResult(result);
  next();
};

TimingStream.prototype._read = function(/* size*/) {
  // ignore - we'll emit results once the time has come
};

TimingStream.prototype.cutoff = function cutoff() {
  return (Date.now() - this.startTime) / 1000 - this.options.delay;
};

TimingStream.prototype.withinRange = function(result, cutoff) {
  return result.alternatives.some(function(alt) {
    // timestamp structure is ["word", startTime, endTime]
    // if the first timestamp ends before the cutoff, then it's at least partially within range
    var timestamp = alt.timestamps[0];
    return !!timestamp && timestamp[this.options.emitAt] <= cutoff;
  }, this);
};

TimingStream.prototype.completelyWithinRange = function(result, cutoff) {
  return result.alternatives.every(function(alt) {
    // timestamp structure is ["word", startTime, endTime]
    // if the last timestamp ends before the cutoff, then it's completely within range
    var timestamp = alt.timestamps[alt.timestamps.length - 1];
    return timestamp[this.options.emitAt] <= cutoff;
  }, this);
};

/**
 * Clones the given result and then crops out any words that occur later than the current cutoff
 * @param {Object} result
 * @param {Number} cutoff timestamp (in seconds)
 * @returns {Object}
 */
Duplex.prototype.crop = function crop(result, cutoff) {
  result = clone(result);
  result.alternatives = result.alternatives.map(function(alt) {
    var timestamps = [];
    for (var i = 0, timestamp; i < alt.timestamps.length; i++) {
      timestamp = alt.timestamps[i];
      if (timestamp[this.options.emitAt] <= cutoff) {
        timestamps.push(timestamp);
      } else {
        break;
      }
    }
    alt.timestamps = timestamps;
    alt.transcript = timestamps.map(function(ts) {
      return ts[0];
    }).join(' ');
    return alt;
  }, this);
  // "final" signifies both that the text won't change, and that we're at the end of a sentence. Only one of those is true here.
  result.final = false;
  return result;
};

/**
 * Returns one of:
 *  - undefined if the next result is completely later than the current cutoff
 *  - a cropped clone of the next result if it's later than the current cutoff && in objectMode
 *  - the original next result object (removing it from the array) if it's completely earlier than the current cutoff (or we're in string mode with emitAt set to start)
 *
 * @param {Object} results
 * @param {Number} cutoff
 * @returns {Object|undefined}
 */
TimingStream.prototype.getCurrentResult = function getCurrentResult(results, cutoff) {
  if (results.length && this.withinRange(results[0], cutoff)) {
    var completeResult = this.completelyWithinRange(results[0], cutoff);
    if (this.options.objectMode || this.options.readableObjectMode) {
      // object mode: emit either a complete result or a cropped result
      return completeResult ? results.shift() : this.crop(results[0], cutoff);
    } else if (completeResult || this.options.emitAt === TimingStream.START) {
      // string mode: emit either a complete result or nothing
      return results.shift();
    }
  }
};


/**
 * Tick emits any buffered words that have a timestamp before the current time, then calls scheduleNextTick()
 */
TimingStream.prototype.tick = function tick() {
  var cutoff = this.cutoff();

  clearTimeout(this.nextTick);
  var result = this.getCurrentResult(this.final, cutoff);

  if (!result) {
    result = this.getCurrentResult(this.interim, cutoff);
  }

  if (result) {
    if (this.options.objectMode || this.options.readableObjectMode) {
      this.push(result);
    } else {
      this.push(result.alternatives[0].transcript);
    }
    if (result.final) {
      this.nextTick = setTimeout(this.tick.bind(this), 0); // in case we are multiple results behind - don't schedule until we are out of final results that are due now
      return;
    }
  }

  this.scheduleNextTick(cutoff);
};

/**
 * Schedules next tick if possible. Requires previous stream to emit recognize objects (objectMode or readableObjectMode)
 *
 * triggers the 'close' and 'end' events if the buffer is empty and no further results are expected
 *
 * @param {Number} cutoff
 *
 */
TimingStream.prototype.scheduleNextTick = function scheduleNextTick(cutoff) {

  // prefer final results over interim - when final results are added, any older interim ones are automatically deleted.
  var nextResult = this.final[0] || this.interim[0];
  if (nextResult) {
    // loop through the timestamps until we find one that comes after the current cutoff (there should always be one)
    var timestamps = nextResult.alternatives[0].timestamps;
    for (var i = 0; i < timestamps.length; i++) {
      var wordOffset = timestamps[i][this.options.emitAt];
      if (wordOffset > cutoff) {
        var nextTime = this.startTime + (wordOffset * 1000);
        this.nextTick = setTimeout(this.tick.bind(this), nextTime - Date.now());
        return;
      }
    }
    throw new Error('No future words found'); // this shouldn't happen ever - getCurrentResult should automatically delete the result from the buffer if all of it's words are consumed
  } else {
    // clear the next tick
    this.nextTick = null;
    this.checkForEnd();
  }
};

/**
 * Triggers the 'close' and 'end' events if both pre-conditions are true:
 *  - the previous stream must have already emitted it's 'end' event
 *  - there must be no next tick scheduled, indicating that there are no results buffered for later delivery
 */
TimingStream.prototype.checkForEnd = function() {
  if (this.sourceEnded && !this.nextTick) {
    this.emit('close');
    this.push(null);
  }
};

/**
 * Returns true if the result is missing it's timestamps
 * @param {Object} result
 * @returns {Boolean}
 */
function noTimestamps(result) {
  var alt = result.alternatives && result.alternatives[0];
  return !!(alt && alt.transcript.trim() && !alt.timestamps || !alt.timestamps.length);
}

/**
 * Creates a new result with all transcriptions formatted
 *
 * @param {Object} result
 */
TimingStream.prototype.handleResult = function handleResult(result) {
  if (noTimestamps(result)) {
    this.emit('error', new Error('TimingStream requires timestamps'));
    return;
  }

  // additional alternatives do not include timestamps, so we can't process and emit them correctly
  if (result.alternatives.length > 1) {
    result.alternatives.length = 1;
  }

  // loop through the buffer and delete any interim results with the same or lower index
  while (this.interim.length && this.interim[0].index <= result.index) {
    this.interim.shift();
  }

  if (result.final) {
    // then add it to the final results array
    this.final.push(result);
  } else {
    this.interim.push(result);
  }

  this.tick();
};

TimingStream.prototype.promise = require('./to-promise');

module.exports = TimingStream;