UNPKG

7.54 kBJavaScriptView Raw
1const DevNull = require('dev-null-stream')
2const axios = require('axios');
3const EventEmitter = require('events');
4const TweetParser = require('./parse_stream');
5const http = require('http');
6const https = require('https');
7const { pipeline } = require('stream/promises');
8const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
9const DATA_TIMEOUT = 30000;
10
11/**
12 * Process error to determine if it's either an axios timeout or response stream timeout
13 * @private
14 */
15function isTimedout(error) {
16 return error.code === 'ECONNABORTED' || error.isTimeout
17}
18
19/**
20 * Statuses that can be retried
21 * @private
22 */
23function retriableStatus(resp) {
24 let status = resp.status;
25 const valid_statuses = [429, 420, 504, 503, 502, 500];
26 return valid_statuses.includes(status);
27}
28
29/**
30 * Returns def if num is NaN
31 * @private
32 */
33function defaultNaN(num, def) {
34 if(isNaN(num)) {
35 return def;
36 } else {
37 return num;
38 }
39}
40
41/**
42 * Returns a Promise that resolves on timeout
43 * @private
44 */
45const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }
46
47/**
48 * Calculate rate limit time in milliseconds
49 * @private
50 * @param {Object} resp Response object that preferably defines resp.status and resp.headers
51 * @param {Date} last_retry Date of previous retry attempt
52 * @returns {number} Backout time
53 */
54function rateLimiting(resp, last_retry) {
55 const now = Date.now()
56 const fallback_rate = now + RATE_LIMIT_WINDOW;
57 let backoff, ratelimit, remaining;
58
59 if(resp && resp.headers) {
60 const headers = resp.headers;
61 remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
62 ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
63 ratelimit = new Date(ratelimit * 1000);
64 } else {
65 remaining = -1;
66 ratelimit = fallback_rate;
67 }
68
69 if(remaining === 0 || resp && (resp.status === 429 || resp.status === 420)) {
70 backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
71 } else {
72 let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
73 //delta = 1.0 - delta;
74 backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
75 }
76
77 return backoff;
78}
79
80/**
81 * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing<br/>
82 * For additional information see
83 * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
84 * @extends EventEmitter
85 * @fires StreamClient#tweet
86 * @fires StreamClient#connected
87 * @fires StreamClient#reconnect
88 * @fires StreamClient#disconnected
89 * @fires StreamClient#stream-error
90 * @fires StreamClient#api-errors
91 * @fires StreamClient#heartbeat
92 * @fires StreamClient#other
93 */
94class StreamClient extends EventEmitter {
95 /**
96 * Initializes the client
97 * @param {Object} config Configuration for client
98 * @param {number} config.timeout Set request and response timeout
99 * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
100 */
101 constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
102 super();
103 this.url = 'tweets/sample/stream';
104 this.timeout = timeout;
105 this.twitrClient = axios.create({
106 baseURL: 'https://api.twitter.com/2',
107 headers: { 'Authorization': `Bearer ${token}`},
108 timeout: timeout
109 });
110 this.stream_timeout = stream_timeout;
111 }
112
113 /**
114 * Connect to twitter stream and emit events.
115 * @param {Object} config Configuration for connection
116 * @param {number} config.params Set any filter parameters for stream, etc.
117 * @param {number} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
118 * @returns {(Promise<void>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
119 * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
120 * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
121 * @see retriableStatus
122 */
123 async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
124 let reconnects = -1;
125 let last_try = Date.now();
126 let last_error;
127
128 while(max_reconnects === -1 || reconnects <= max_reconnects)
129 {
130 reconnects += 1;
131 this.params = params;
132 this.max_reconnects = max_reconnects;
133 try {
134 await this.buildConnection(this.params, writeable_stream);
135 } catch(request_error) {
136 let resp = request_error.response;
137 if(axios.isCancel(request_error)) {
138 return Promise.resolve();
139 } else if(isTimedout(request_error) || resp && retriableStatus(resp)) {
140 let timeout_wait = rateLimiting(resp, last_try);
141 this.emit('reconnect', request_error, timeout_wait);
142 if(!this.skip_sleep) {
143 await sleep(timeout_wait);
144 }
145 } else {
146 return Promise.reject(request_error);
147 }
148 }
149 last_try = Date.now();
150 }
151
152 let reject_error = new Error({
153 message: `Max reconnects exceeded (${reconnects})`,
154 wrapped_error: last_error
155 })
156 reject_error.reconnects = reconnects;
157 return Promise.reject(reject_error);
158 }
159
160 /**
161 * Disconnects an active request if any
162 * @returns {boolean} Returns true if there is a request to disconnect
163 */
164 disconnect() {
165 this.emit('disconnected');
166 if(this.cancelToken) {
167 this.cancelToken.cancel('Disconnecting stream');
168 this.cancelToken = null;
169 return true;
170 } else {
171 return false;
172 }
173 }
174
175 /**
176 * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
177 * @private
178 * @returns {Promise} Promise that initiates HTTP streaming
179 */
180 buildStreamPromise(resp, writable_stream) {
181 const hose = resp.data;
182 const timer = setTimeout(() => {
183 const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
184 e.isTimeout = true;
185 hose.destroy(e);
186 clearTimeout(timer);
187 }, this.stream_timeout);
188 const jsonStream = new TweetParser({
189 emitter: this,
190 timer: timer
191 });
192 const streams = [hose, jsonStream];
193 if(writable_stream) {
194 streams.push(writable_stream);
195 } else {
196 streams.push(new DevNull());
197 }
198 let listener = () => {
199 hose.end();
200 }
201
202 this.once('disconnected', listener);
203 let thicc = pipeline(...streams);
204 thicc.finally(() => clearTimeout(timer));
205 this.emit('connected');
206 return thicc;
207 }
208
209 /**
210 * Connect to twitter stream and emit events. Errors unhandled.
211 * @private
212 * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
213 * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
214 */
215 buildConnection(params, writable_stream) {
216 if(this.cancelToken) {
217 this.disconnect();
218 }
219 this.cancelToken = axios.CancelToken.source();
220
221 let streamReq = this.twitrClient.get(this.url, {
222 responseType: 'stream',
223 cancelToken: this.cancelToken.token,
224 params: params,
225 decompress: true,
226 httpAgent: new http.Agent({ keepAlive: true }),
227 httpsAgent: new https.Agent({ keepAlive: true }),
228 timeout: 0,
229 maxContentLength: Infinity,
230 });
231
232 return streamReq.then((resp) => {
233 return this.buildStreamPromise(resp, writable_stream);
234 });
235 }
236}
237
238module.exports = StreamClient;