index.js

const axios = require('axios');
const EventEmitter = require('events');
const TweetParser = require('./lib/parse_stream');
const { finished } = require('stream');
const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
const DATA_TIMEOUT = 30000;

/**
 * Process error to determine if it's either an axios timeout or response stream timeout
 * @private
 */
function isTimedout(error) {
  return error.code === 'ECONNABORTED' || error.isTimeout
}

/**
 * Statuses that can be retried
 * @private
 */
function retriableStatus(resp) {
  let status = resp.status;
  const valid_statuses = [429, 420, 504, 503, 502, 500];
  return valid_statuses.includes(status);
}

/**
 * Returns def if num is NaN
 * @private
 */
function defaultNaN(num, def) {
  if(isNaN(num)) {
    return def;
  } else {
    return num;
  }
}

/**
 * Returns a Promise that resolves on timeout
 * @private
 */
const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }

/**
 * Calculate rate limit time in milliseconds
 * @private
 * @param {Object} resp Response object that preferably defines resp.status and resp.headers
 * @param {Date} last_retry Date of previous retry attempt
 * @returns {number} Backout time
 */
function rateLimiting(resp, last_retry) {
  const now = Date.now()
  const fallback_rate = now + RATE_LIMIT_WINDOW;
  let backoff, ratelimit, remaining;

  if(resp && resp.headers) {
    const headers = resp.headers;
    remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
    ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
    ratelimit = new Date(ratelimit * 1000);
  } else {
    remaining = -1;
    ratelimit = fallback_rate;
  }

  if(remaining === 0 || resp && (resp.status === 429 || resp.status === 420)) {
    backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
  } else {
    let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
    //delta = 1.0 - delta;
    backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
  }

  return backoff;
}

/**
 * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing<br/>
 * For additional information see
 * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
 * @extends EventEmitter
 * @fires StreamClient#tweet
 * @fires StreamClient#connected
 * @fires StreamClient#reconnect
 * @fires StreamClient#disconnected
 * @fires StreamClient#close
 * @fires StreamClient#stream-error
 * @fires StreamClient#api-errors
 * @fires StreamClient#heartbeat
 * @fires StreamClient#other
 */
class StreamClient extends EventEmitter {
  /**
   * Initializes the client
   * @param {Object} config Configuration for client
   * @param {number} config.timeout Set request and response timeout
   * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
   */
  constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
    super();
    this.timeout = timeout;
    this.twitrClient = axios.create({
      baseURL: 'https://api.twitter.com/2',
      headers: { 'Authorization': `Bearer ${token}`},
      timeout: timeout
    });
    this.stream_timeout = stream_timeout;
  }

  /**
   * Connect to twitter stream and emit events.
   * @param {Object} config Configuration for connection
   * @param {number} config.params Set any filter parameters for stream, etc.
   * @param {string} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
   * @returns {(Promise<object>|Promise<Error>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
   * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
   * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
   * @see retriableStatus
   */
  async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
    let reconnects = -1;
    let last_try = Date.now();
    let last_error;

    while(max_reconnects === -1 || reconnects <= max_reconnects)
    {
      let disconnected = false;
      try
      {
        reconnects += 1;
        disconnected = await this.buildConnection(params, writeable_stream);

        if(disconnected) {
          Promise.resolve();
        }
      }
      catch(request_error)
      {
        last_error = request_error;
        let resp = request_error.response;
        if(axios.isCancel(request_error)) {
          return Promise.resolve();
        } else if(max_reconnects !== -1 && reconnects >= max_reconnects) {
          break;
        } else if(isTimedout(request_error) || resp && retriableStatus(resp)) {
          let timeout_wait = rateLimiting(resp, last_try);
          this.emit('reconnect', request_error, timeout_wait);
          await sleep(timeout_wait);
        } else {
          let error = new Error(`${request_error.message}`);
          return Promise.reject(error);
        }
      }

      last_try = Date.now();
    }

    let reject_error = new Error(`Max reconnects exceeded (${reconnects}):\n${last_error.message}`);
    reject_error.reconnects = reconnects;
    return Promise.reject(reject_error);
  }

  /**
   * Disconnects an active request if any
   * @returns {boolean} Returns true if there is a request to disconnect
   */
  disconnect() {
    if(this.cancelToken) {
      this.cancelToken.cancel('Disconnecting stream');
      this.cancelToken = null;
    }
    this.emit('disconnected');
  }

  /**
   * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
   * @private
   * @returns {Promise} Promise that initiates HTTP streaming
   */
  buildStreamPromise(resp, writable_stream) {
    return new Promise((resolve, reject) => {
      this.emit('connected');
      let error;
      let disconnected = false;
      let hose = resp.data;
      const timer = setTimeout(() => {
        const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
        e.isTimeout = true;
        hose.emit('error', e);
      }, this.stream_timeout);
      const jsonStream = hose.pipe(new TweetParser({
        emitter: this,
        timer: timer
      }));
      const streams = [hose, jsonStream];
      if(writable_stream) {
        streams.push(jsonStream.pipe(writable_stream));
      }
      const s_cleanup_fns = streams.map((s) => {
        return finished(s, (err) => {
          if(!s.destroyed) {
            s.destroy(err);
          }
        });
      });
      const stream_cleanup = () => s_cleanup_fns.forEach((fn) => fn());
      const disconnect_cb = () => {
        disconnected = true;
        hose.destroy();
      };
      this.once('disconnected', disconnect_cb);
      hose.on('close', () => {
        clearTimeout(timer);
        stream_cleanup();
        this.removeListener('disconnected', disconnect_cb)
        if(!error) {
          this.emit('close');
          resolve(disconnected);
        }
      });
      hose.on('error', stream_error => {
        error = stream_error;
        if(!hose.destroyed) {
          hose.destroy(error);
        }
        reject(error);
      });
    });
  }

  /**
   * Connect to twitter stream and emit events. Errors unhandled.
   * @private
   * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
   * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
   */
  buildConnection(params, writable_stream) {
    this.disconnect();
    this.cancelToken = axios.CancelToken.source();

    let streamReq = this.twitrClient.get('tweets/sample/stream', {
      responseType: 'stream',
      cancelToken: this.cancelToken.token,
      params: params,
      decompress: true,
    });

    return streamReq.then((resp) => {
      return this.buildStreamPromise(resp, writable_stream);
    });
  }
}

module.exports = StreamClient;