Source: speech-to-text/timing-stream.js

'use strict';

var Transform = require('stream').Transform;
var util = require('util');
var defaults = require('defaults');
var noTimestamps = require('./no-timestamps');

/**
 * Slows results down to no faster than real time.
 *
 * Useful when running recognizeFile because the text can otherwise appear before the words are spoken
 *
 * @param {Object} [opts]
 * @param {*} [opts.emitAt=TimingStream.END] - set to TimingStream.START for a more subtitles-like output where results are returned as soon as the utterance begins
 * @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
 * @constructor
 */
function TimingStream(opts) {
  this.options = defaults(opts, {
    emitAt: TimingStream.END,
    delay: 0,
    allowHalfOpen: true, // keep the readable side open after the source closes
    writableObjectMode: true
  });
  Transform.call(this, opts);

  this.startTime = Date.now();

  // to support stopping mid-stream
  this.stopped = false;
  this.timeout = null;
}
util.inherits(TimingStream, Transform);

TimingStream.START = 1;
TimingStream.END = 2;

TimingStream.prototype._transform = function(msg, encoding, next) {
  if (msg instanceof Buffer) {
    return next(new Error('TimingStream requires the source to be in objectMode'));
  }
  if (Array.isArray(msg.results) && msg.results.length && noTimestamps(msg)) {
    var err = new Error('TimingStream requires timestamps');
    err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
    return next(err);
  }

  if (this.stopped) {
    return;
  }

  var delayMs = this.getDelayMs(msg);

  var objectMode = this.options.objectMode || this.options.readableObjectMode;
  var hasTranscript = Array.isArray(msg.results && msg.results.length);

  // to support text mode
  if (!objectMode && hasTranscript) {
    msg = msg.results[0].alternatives[0].transcript;
  }

  if (objectMode || hasTranscript) {
    this.timeout = setTimeout(
      function() {
        next(null, msg);
      },
      delayMs
    );
  } else {
    return next();
  }
};

/**
 * Grabs the appropriate timestamp from the given message, depending on options.emitAt and the type of message
 *
 * @private
 * @param {Object} msg
 * @return {Number} timestamp
 */
TimingStream.prototype.getMessageTime = function(msg) {
  if (this.options.emitAt === TimingStream.START) {
    if (Array.isArray(msg.results) && msg.results.length) {
      return msg.results[msg.results.length - 1].alternatives[0].timestamps[0][TimingStream.START];
    } else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
      return msg.speaker_labels[0].from;
    }
  } else {
    if (Array.isArray(msg.results) && msg.results.length) {
      var timestamps = msg.results[msg.results.length - 1].alternatives[0].timestamps;
      return timestamps[timestamps.length - 1][TimingStream.END];
    } else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
      return msg.speaker_labels[msg.speaker_labels.length - 1].to;
    }
  }
  return 0; // failsafe for unknown message types
};

/**
 * Gets the length of time to delay (in ms) before emitting the given message
 *
 * @private
 * @param {Object} msg
 * @return {Number} ms to delay
 */
TimingStream.prototype.getDelayMs = function(msg) {
  var messageTime = this.getMessageTime(msg);
  var nextTickTime = this.startTime + messageTime * 1000; // ms since epoch
  var delayMs = nextTickTime - Date.now(); // ms from right now
  return Math.max(0, delayMs); // never return a negative number
};

/**
 * Overrides the start time, adjusting the delay applied to all pending results.
 *
 * Stream may emit up to 1 more result based on the older time after this is called.
 *
 * @param {Number} [time=Date.now()] Start time in Miliseconds since epoch
 */
TimingStream.prototype.setStartTime = function(time) {
  this.startTime = time || Date.now();
};

TimingStream.prototype.promise = require('./to-promise');

// when stop is called, immediately stop emitting results
TimingStream.prototype.stop = function stop() {
  this.stopped = true;
  clearTimeout(this.timeout);
  this.emit('stop');
};

module.exports = TimingStream;