UNPKG

8.08 kBJavaScriptView Raw
1const axios = require('axios');
2const EventEmitter = require('events');
3const TweetParser = require('./lib/parse_stream');
4const { finished } = require('stream');
5const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
6const DATA_TIMEOUT = 30000;
7
8/**
9 * Process error to determine if it's either an axios timeout or response stream timeout
10 * @private
11 */
12function isTimedout(error) {
13 return error.code === 'ECONNABORTED' || error.isTimeout
14}
15
16/**
17 * Statuses that can be retried
18 * @private
19 */
20function retriableStatus(resp) {
21 let status = resp.status;
22 const valid_statuses = [429, 420, 504, 503, 502, 500];
23 return valid_statuses.includes(status);
24}
25
26/**
27 * Returns def if num is NaN
28 * @private
29 */
30function defaultNaN(num, def) {
31 if(isNaN(num)) {
32 return def;
33 } else {
34 return num;
35 }
36}
37
38/**
39 * Returns a Promise that resolves on timeout
40 * @private
41 */
42const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }
43
44/**
45 * Calculate rate limit time in milliseconds
46 * @private
47 * @param {Object} resp Response object that preferably defines resp.status and resp.headers
48 * @param {Date} last_retry Date of previous retry attempt
49 * @returns {number} Backout time
50 */
51function rateLimiting(resp, last_retry) {
52 const now = Date.now()
53 const fallback_rate = now + RATE_LIMIT_WINDOW;
54 let backoff, ratelimit, remaining;
55
56 if(resp && resp.headers) {
57 const headers = resp.headers;
58 remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
59 ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
60 ratelimit = new Date(ratelimit * 1000);
61 } else {
62 remaining = -1;
63 ratelimit = fallback_rate;
64 }
65
66 if(remaining === 0 || resp && (resp.status === 429 || resp.status === 420)) {
67 backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
68 } else {
69 let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
70 //delta = 1.0 - delta;
71 backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
72 }
73
74 return backoff;
75}
76
77/**
78 * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing<br/>
79 * For additional information see
80 * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
81 * @extends EventEmitter
82 * @fires StreamClient#tweet
83 * @fires StreamClient#connected
84 * @fires StreamClient#reconnect
85 * @fires StreamClient#disconnected
86 * @fires StreamClient#close
87 * @fires StreamClient#stream-error
88 * @fires StreamClient#api-errors
89 * @fires StreamClient#heartbeat
90 * @fires StreamClient#other
91 */
92class StreamClient extends EventEmitter {
93 /**
94 * Initializes the client
95 * @param {Object} config Configuration for client
96 * @param {number} config.timeout Set request and response timeout
97 * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
98 */
99 constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
100 super();
101 this.timeout = timeout;
102 this.twitrClient = axios.create({
103 baseURL: 'https://api.twitter.com/2',
104 headers: { 'Authorization': `Bearer ${token}`},
105 timeout: timeout
106 });
107 this.stream_timeout = stream_timeout;
108 }
109
110 /**
111 * Connect to twitter stream and emit events.
112 * @param {Object} config Configuration for connection
113 * @param {number} config.params Set any filter parameters for stream, etc.
114 * @param {string} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
115 * @returns {(Promise<object>|Promise<Error>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
116 * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
117 * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
118 * @see retriableStatus
119 */
120 async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
121 let reconnects = -1;
122 let last_try = Date.now();
123 let last_error;
124
125 while(max_reconnects === -1 || reconnects <= max_reconnects)
126 {
127 let disconnected = false;
128 try
129 {
130 reconnects += 1;
131 disconnected = await this.buildConnection(params, writeable_stream);
132
133 if(disconnected) {
134 Promise.resolve();
135 }
136 }
137 catch(request_error)
138 {
139 last_error = request_error;
140 let resp = request_error.response;
141 if(axios.isCancel(request_error)) {
142 return Promise.resolve();
143 } else if(max_reconnects !== -1 && reconnects >= max_reconnects) {
144 break;
145 } else if(isTimedout(request_error) || resp && retriableStatus(resp)) {
146 let timeout_wait = rateLimiting(resp, last_try);
147 this.emit('reconnect', request_error, timeout_wait);
148 await sleep(timeout_wait);
149 } else {
150 let error = new Error(`${request_error.message}`);
151 return Promise.reject(error);
152 }
153 }
154
155 last_try = Date.now();
156 }
157
158 let reject_error = new Error(`Max reconnects exceeded (${reconnects}):\n${last_error.message}`);
159 reject_error.reconnects = reconnects;
160 return Promise.reject(reject_error);
161 }
162
163 /**
164 * Disconnects an active request if any
165 * @returns {boolean} Returns true if there is a request to disconnect
166 */
167 disconnect() {
168 if(this.cancelToken) {
169 this.cancelToken.cancel('Disconnecting stream');
170 this.cancelToken = null;
171 }
172 this.emit('disconnected');
173 }
174
175 /**
176 * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
177 * @private
178 * @returns {Promise} Promise that initiates HTTP streaming
179 */
180 buildStreamPromise(resp, writable_stream) {
181 return new Promise((resolve, reject) => {
182 this.emit('connected');
183 let error;
184 let disconnected = false;
185 let hose = resp.data;
186 const timer = setTimeout(() => {
187 const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
188 e.isTimeout = true;
189 hose.emit('error', e);
190 }, this.stream_timeout);
191 const jsonStream = hose.pipe(new TweetParser({
192 emitter: this,
193 timer: timer
194 }));
195 const streams = [hose, jsonStream];
196 if(writable_stream) {
197 streams.push(jsonStream.pipe(writable_stream));
198 }
199 const s_cleanup_fns = streams.map((s) => {
200 return finished(s, (err) => {
201 if(!s.destroyed) {
202 s.destroy(err);
203 }
204 });
205 });
206 const stream_cleanup = () => s_cleanup_fns.forEach((fn) => fn());
207 const disconnect_cb = () => {
208 disconnected = true;
209 hose.destroy();
210 };
211 this.once('disconnected', disconnect_cb);
212 hose.on('close', () => {
213 clearTimeout(timer);
214 stream_cleanup();
215 this.removeListener('disconnected', disconnect_cb)
216 if(!error) {
217 this.emit('close');
218 resolve(disconnected);
219 }
220 });
221 hose.on('error', stream_error => {
222 error = stream_error;
223 if(!hose.destroyed) {
224 hose.destroy(error);
225 }
226 reject(error);
227 });
228 });
229 }
230
231 /**
232 * Connect to twitter stream and emit events. Errors unhandled.
233 * @private
234 * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
235 * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
236 */
237 buildConnection(params, writable_stream) {
238 this.disconnect();
239 this.cancelToken = axios.CancelToken.source();
240
241 let streamReq = this.twitrClient.get('tweets/sample/stream', {
242 responseType: 'stream',
243 cancelToken: this.cancelToken.token,
244 params: params,
245 decompress: true,
246 });
247
248 return streamReq.then((resp) => {
249 return this.buildStreamPromise(resp, writable_stream);
250 });
251 }
252}
253
254module.exports = StreamClient;