UNPKG

11.1 kBHTMLView Raw
1<!DOCTYPE html>
2<html lang="en">
3<head>
4
5 <meta charset="utf-8">
6 <title>index.js - Documentation</title>
7
8 <meta name="description" content="Implementation of the v2 twitter streaming api" />
9
10 <meta name="keywords" content="twitter api v2 node.js rest http stream client" />
11 <meta name="keyword" content="twitter api v2 node.js rest http stream client" />
12
13
14
15 <script src="scripts/prettify/prettify.js"></script>
16 <script src="scripts/prettify/lang-css.js"></script>
17 <!--[if lt IE 9]>
18 <script src="//html5shiv.googlecode.com/svn/trunk/html5.js"></script>
19 <![endif]-->
20 <link type="text/css" rel="stylesheet" href="styles/prettify.css">
21 <link type="text/css" rel="stylesheet" href="styles/jsdoc.css">
22 <script src="scripts/nav.js" defer></script>
23 <meta name="viewport" content="width=device-width, initial-scale=1.0">
24</head>
25<body>
26
27<input type="checkbox" id="nav-trigger" class="nav-trigger" />
28<label for="nav-trigger" class="navicon-button x">
29 <div class="navicon"></div>
30</label>
31
32<label for="nav-trigger" class="overlay"></label>
33
34<nav >
35
36 <input type="text" id="nav-search" placeholder="Search" />
37
38 <h2><a href="index.html">Home</a></h2><h2><a href="https://github.com/CrunchwrapSupreme/twit-stream-v2" target="_blank" class="menu-item" id="website_link" >Github Repo</a></h2><h3>Classes</h3><ul><li><a href="StreamClient.html">StreamClient</a><ul class='methods'><li data-type='method'><a href="StreamClient.html#connect">connect</a></li><li data-type='method'><a href="StreamClient.html#disconnect">disconnect</a></li></ul></li></ul><h3>Events</h3><ul><li><a href="StreamClient.html#event:tweet">tweet</a></li><li><a href="StreamClient.html#event:heartbeat">heartbeat</a></li><li><a href="StreamClient.html#event:stream-error">stream-error</a></li><li><a href="StreamClient.html#event:api-errors">api-errors</a></li><li><a href="StreamClient.html#event:other">other</a></li><li><a href="StreamClient.html#event:connected">connected</a></li><li><a href="StreamClient.html#event:reconnect">reconnect</a></li><li><a href="StreamClient.html#event:disconnected">disconnected</a></li><li><a href="StreamClient.html#event:close">close</a></li></ul>
39</nav>
40
41<div id="main">
42
43 <h1 class="page-title">index.js</h1>
44
45
46
47
48
49
50
51 <section>
52 <article>
53 <pre class="prettyprint source linenums"><code>const axios = require('axios');
54const EventEmitter = require('events');
55const TweetParser = require('./lib/parse_stream');
56const { finished } = require('stream');
57const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
58const DATA_TIMEOUT = 30000;
59
60/**
61 * Process error to determine if it's either an axios timeout or response stream timeout
62 * @private
63 */
64function isTimedout(error) {
65 return error.code === 'ECONNABORTED' || error.isTimeout
66}
67
68/**
69 * Statuses that can be retried
70 * @private
71 */
72function retriableStatus(resp) {
73 let status = resp.status;
74 const valid_statuses = [429, 420, 504, 503, 502, 500];
75 return valid_statuses.includes(status);
76}
77
78/**
79 * Returns def if num is NaN
80 * @private
81 */
82function defaultNaN(num, def) {
83 if(isNaN(num)) {
84 return def;
85 } else {
86 return num;
87 }
88}
89
90/**
91 * Returns a Promise that resolves on timeout
92 * @private
93 */
94const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }
95
96/**
97 * Calculate rate limit time in milliseconds
98 * @private
99 * @param {Object} resp Response object that preferably defines resp.status and resp.headers
100 * @param {Date} last_retry Date of previous retry attempt
101 * @returns {number} Backout time
102 */
103function rateLimiting(resp, last_retry) {
104 const now = Date.now()
105 const fallback_rate = now + RATE_LIMIT_WINDOW;
106 let backoff, ratelimit, remaining;
107
108 if(resp &amp;&amp; resp.headers) {
109 const headers = resp.headers;
110 remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
111 ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
112 ratelimit = new Date(ratelimit * 1000);
113 } else {
114 remaining = -1;
115 ratelimit = fallback_rate;
116 }
117
118 if(remaining === 0 || resp &amp;&amp; (resp.status === 429 || resp.status === 420)) {
119 backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
120 } else {
121 let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
122 //delta = 1.0 - delta;
123 backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
124 }
125
126 return backoff;
127}
128
129/**
130 * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing&lt;br/>
131 * For additional information see
132 * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
133 * @extends EventEmitter
134 * @fires StreamClient#tweet
135 * @fires StreamClient#connected
136 * @fires StreamClient#reconnect
137 * @fires StreamClient#disconnected
138 * @fires StreamClient#close
139 * @fires StreamClient#stream-error
140 * @fires StreamClient#api-errors
141 * @fires StreamClient#heartbeat
142 * @fires StreamClient#other
143 */
144class StreamClient extends EventEmitter {
145 /**
146 * Initializes the client
147 * @param {Object} config Configuration for client
148 * @param {number} config.timeout Set request and response timeout
149 * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
150 */
151 constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
152 super();
153 this.timeout = timeout;
154 this.twitrClient = axios.create({
155 baseURL: 'https://api.twitter.com/2',
156 headers: { 'Authorization': `Bearer ${token}`},
157 timeout: timeout
158 });
159 this.stream_timeout = stream_timeout;
160 }
161
162 /**
163 * Connect to twitter stream and emit events.
164 * @param {Object} config Configuration for connection
165 * @param {number} config.params Set any filter parameters for stream, etc.
166 * @param {string} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
167 * @returns {(Promise&lt;object>|Promise&lt;Error>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
168 * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
169 * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
170 * @see retriableStatus
171 */
172 async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
173 let reconnects = -1;
174 let last_try = Date.now();
175 let last_error;
176
177 while(max_reconnects === -1 || reconnects &lt;= max_reconnects)
178 {
179 let disconnected = false;
180 try
181 {
182 reconnects += 1;
183 disconnected = await this.buildConnection(params, writeable_stream);
184
185 if(disconnected) {
186 Promise.resolve();
187 }
188 }
189 catch(request_error)
190 {
191 last_error = request_error;
192 let resp = request_error.response;
193 if(axios.isCancel(request_error)) {
194 return Promise.resolve();
195 } else if(max_reconnects !== -1 &amp;&amp; reconnects >= max_reconnects) {
196 break;
197 } else if(isTimedout(request_error) || resp &amp;&amp; retriableStatus(resp)) {
198 let timeout_wait = rateLimiting(resp, last_try);
199 this.emit('reconnect', request_error, timeout_wait);
200 await sleep(timeout_wait);
201 } else {
202 let error = new Error(`${request_error.message}`);
203 return Promise.reject(error);
204 }
205 }
206
207 last_try = Date.now();
208 }
209
210 let reject_error = new Error(`Max reconnects exceeded (${reconnects}):\n${last_error.message}`);
211 reject_error.reconnects = reconnects;
212 return Promise.reject(reject_error);
213 }
214
215 /**
216 * Disconnects an active request if any
217 * @returns {boolean} Returns true if there is a request to disconnect
218 */
219 disconnect() {
220 if(this.cancelToken) {
221 this.cancelToken.cancel('Disconnecting stream');
222 this.cancelToken = null;
223 }
224 this.emit('disconnected');
225 }
226
227 /**
228 * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
229 * @private
230 * @returns {Promise} Promise that initiates HTTP streaming
231 */
232 buildStreamPromise(resp, writable_stream) {
233 return new Promise((resolve, reject) => {
234 this.emit('connected');
235 let error;
236 let disconnected = false;
237 let hose = resp.data;
238 const timer = setTimeout(() => {
239 const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
240 e.isTimeout = true;
241 hose.emit('error', e);
242 }, this.stream_timeout);
243 const jsonStream = hose.pipe(new TweetParser({
244 emitter: this,
245 timer: timer
246 }));
247 const streams = [hose, jsonStream];
248 if(writable_stream) {
249 streams.push(jsonStream.pipe(writable_stream));
250 }
251 const s_cleanup_fns = streams.map((s) => {
252 return finished(s, (err) => {
253 if(!s.destroyed) {
254 s.destroy(err);
255 }
256 });
257 });
258 const stream_cleanup = () => s_cleanup_fns.forEach((fn) => fn());
259 const disconnect_cb = () => {
260 disconnected = true;
261 hose.destroy();
262 };
263 this.once('disconnected', disconnect_cb);
264 hose.on('close', () => {
265 clearTimeout(timer);
266 stream_cleanup();
267 this.removeListener('disconnected', disconnect_cb)
268 if(!error) {
269 this.emit('close');
270 resolve(disconnected);
271 }
272 });
273 hose.on('error', stream_error => {
274 error = stream_error;
275 if(!hose.destroyed) {
276 hose.destroy(error);
277 }
278 reject(error);
279 });
280 });
281 }
282
283 /**
284 * Connect to twitter stream and emit events. Errors unhandled.
285 * @private
286 * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
287 * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
288 */
289 buildConnection(params, writable_stream) {
290 this.disconnect();
291 this.cancelToken = axios.CancelToken.source();
292
293 let streamReq = this.twitrClient.get('tweets/sample/stream', {
294 responseType: 'stream',
295 cancelToken: this.cancelToken.token,
296 params: params,
297 decompress: true,
298 });
299
300 return streamReq.then((resp) => {
301 return this.buildStreamPromise(resp, writable_stream);
302 });
303 }
304}
305
306module.exports = StreamClient;
307</code></pre>
308 </article>
309 </section>
310
311
312
313
314
315
316</div>
317
318<br class="clear">
319
320<footer>
321 Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.6.6</a> on Mon Oct 19 2020 05:12:10 GMT-0500 (Central Daylight Time) using the <a href="https://github.com/clenemt/docdash">docdash</a> theme.
322</footer>
323
324<script>prettyPrint();</script>
325<script src="scripts/polyfill.js"></script>
326<script src="scripts/linenumber.js"></script>
327
328<script src="scripts/search.js" defer></script>
329
330
331
332</body>
333</html>