1 | <!DOCTYPE html>
|
2 | <html lang="en">
|
3 | <head>
|
4 |
|
5 | <meta charset="utf-8">
|
6 | <title>index.js - Documentation</title>
|
7 |
|
8 | <meta name="description" content="Implementation of the v2 twitter streaming api" />
|
9 |
|
10 | <meta name="keywords" content="twitter api v2 node.js rest http stream client" />
|
11 | <meta name="keyword" content="twitter api v2 node.js rest http stream client" />
|
12 |
|
13 |
|
14 |
|
15 | <script src="scripts/prettify/prettify.js"></script>
|
16 | <script src="scripts/prettify/lang-css.js"></script>
|
17 | |
18 |
|
19 |
|
20 | <link type="text/css" rel="stylesheet" href="styles/prettify.css">
|
21 | <link type="text/css" rel="stylesheet" href="styles/jsdoc.css">
|
22 | <script src="scripts/nav.js" defer></script>
|
23 | <meta name="viewport" content="width=device-width, initial-scale=1.0">
|
24 | </head>
|
25 | <body>
|
26 |
|
27 | <input type="checkbox" id="nav-trigger" class="nav-trigger" />
|
28 | <label for="nav-trigger" class="navicon-button x">
|
29 | <div class="navicon"></div>
|
30 | </label>
|
31 |
|
32 | <label for="nav-trigger" class="overlay"></label>
|
33 |
|
34 | <nav >
|
35 |
|
36 | <input type="text" id="nav-search" placeholder="Search" />
|
37 |
|
38 | <h2><a href="index.html">Home</a></h2><h2><a href="https://github.com/CrunchwrapSupreme/twit-stream-v2" target="_blank" class="menu-item" id="website_link" >Github Repo</a></h2><h3>Classes</h3><ul><li><a href="StreamClient.html">StreamClient</a><ul class='methods'><li data-type='method'><a href="StreamClient.html#connect">connect</a></li><li data-type='method'><a href="StreamClient.html#disconnect">disconnect</a></li></ul></li></ul><h3>Events</h3><ul><li><a href="StreamClient.html#event:tweet">tweet</a></li><li><a href="StreamClient.html#event:heartbeat">heartbeat</a></li><li><a href="StreamClient.html#event:stream-error">stream-error</a></li><li><a href="StreamClient.html#event:api-errors">api-errors</a></li><li><a href="StreamClient.html#event:other">other</a></li><li><a href="StreamClient.html#event:connected">connected</a></li><li><a href="StreamClient.html#event:reconnect">reconnect</a></li><li><a href="StreamClient.html#event:disconnected">disconnected</a></li><li><a href="StreamClient.html#event:close">close</a></li></ul>
|
39 | </nav>
|
40 |
|
41 | <div id="main">
|
42 |
|
43 | <h1 class="page-title">index.js</h1>
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 | <section>
|
52 | <article>
|
53 | <pre class="prettyprint source linenums"><code>const axios = require('axios');
|
54 | const EventEmitter = require('events');
|
55 | const TweetParser = require('./lib/parse_stream');
|
56 | const { finished } = require('stream');
|
57 | const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
|
58 | const DATA_TIMEOUT = 30000;
|
59 |
|
60 | /**
|
61 | * Process error to determine if it's either an axios timeout or response stream timeout
|
62 | * @private
|
63 | */
|
64 | function isTimedout(error) {
|
65 | return error.code === 'ECONNABORTED' || error.isTimeout
|
66 | }
|
67 |
|
68 | /**
|
69 | * Statuses that can be retried
|
70 | * @private
|
71 | */
|
72 | function retriableStatus(resp) {
|
73 | let status = resp.status;
|
74 | const valid_statuses = [429, 420, 504, 503, 502, 500];
|
75 | return valid_statuses.includes(status);
|
76 | }
|
77 |
|
78 | /**
|
79 | * Returns def if num is NaN
|
80 | * @private
|
81 | */
|
82 | function defaultNaN(num, def) {
|
83 | if(isNaN(num)) {
|
84 | return def;
|
85 | } else {
|
86 | return num;
|
87 | }
|
88 | }
|
89 |
|
90 | /**
|
91 | * Returns a Promise that resolves on timeout
|
92 | * @private
|
93 | */
|
94 | const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }
|
95 |
|
96 | /**
|
97 | * Calculate rate limit time in milliseconds
|
98 | * @private
|
99 | * @param {Object} resp Response object that preferably defines resp.status and resp.headers
|
100 | * @param {Date} last_retry Date of previous retry attempt
|
101 | * @returns {number} Backout time
|
102 | */
|
103 | function rateLimiting(resp, last_retry) {
|
104 | const now = Date.now()
|
105 | const fallback_rate = now + RATE_LIMIT_WINDOW;
|
106 | let backoff, ratelimit, remaining;
|
107 |
|
108 | if(resp && resp.headers) {
|
109 | const headers = resp.headers;
|
110 | remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
|
111 | ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
|
112 | ratelimit = new Date(ratelimit * 1000);
|
113 | } else {
|
114 | remaining = -1;
|
115 | ratelimit = fallback_rate;
|
116 | }
|
117 |
|
118 | if(remaining === 0 || resp && (resp.status === 429 || resp.status === 420)) {
|
119 | backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
|
120 | } else {
|
121 | let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
|
122 | //delta = 1.0 - delta;
|
123 | backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
|
124 | }
|
125 |
|
126 | return backoff;
|
127 | }
|
128 |
|
129 | /**
|
130 | * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing<br/>
|
131 | * For additional information see
|
132 | * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
|
133 | * @extends EventEmitter
|
134 | * @fires StreamClient#tweet
|
135 | * @fires StreamClient#connected
|
136 | * @fires StreamClient#reconnect
|
137 | * @fires StreamClient#disconnected
|
138 | * @fires StreamClient#close
|
139 | * @fires StreamClient#stream-error
|
140 | * @fires StreamClient#api-errors
|
141 | * @fires StreamClient#heartbeat
|
142 | * @fires StreamClient#other
|
143 | */
|
144 | class StreamClient extends EventEmitter {
|
145 | /**
|
146 | * Initializes the client
|
147 | * @param {Object} config Configuration for client
|
148 | * @param {number} config.timeout Set request and response timeout
|
149 | * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
|
150 | */
|
151 | constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
|
152 | super();
|
153 | this.timeout = timeout;
|
154 | this.twitrClient = axios.create({
|
155 | baseURL: 'https://api.twitter.com/2',
|
156 | headers: { 'Authorization': `Bearer ${token}`},
|
157 | timeout: timeout
|
158 | });
|
159 | this.stream_timeout = stream_timeout;
|
160 | }
|
161 |
|
162 | /**
|
163 | * Connect to twitter stream and emit events.
|
164 | * @param {Object} config Configuration for connection
|
165 | * @param {number} config.params Set any filter parameters for stream, etc.
|
166 | * @param {string} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
|
167 | * @returns {(Promise<object>|Promise<Error>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
|
168 | * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
|
169 | * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
|
170 | * @see retriableStatus
|
171 | */
|
172 | async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
|
173 | let reconnects = -1;
|
174 | let last_try = Date.now();
|
175 | let last_error;
|
176 |
|
177 | while(max_reconnects === -1 || reconnects <= max_reconnects)
|
178 | {
|
179 | let disconnected = false;
|
180 | try
|
181 | {
|
182 | reconnects += 1;
|
183 | disconnected = await this.buildConnection(params, writeable_stream);
|
184 |
|
185 | if(disconnected) {
|
186 | Promise.resolve();
|
187 | }
|
188 | }
|
189 | catch(request_error)
|
190 | {
|
191 | last_error = request_error;
|
192 | let resp = request_error.response;
|
193 | if(axios.isCancel(request_error)) {
|
194 | return Promise.resolve();
|
195 | } else if(max_reconnects !== -1 && reconnects >= max_reconnects) {
|
196 | break;
|
197 | } else if(isTimedout(request_error) || resp && retriableStatus(resp)) {
|
198 | let timeout_wait = rateLimiting(resp, last_try);
|
199 | this.emit('reconnect', request_error, timeout_wait);
|
200 | await sleep(timeout_wait);
|
201 | } else {
|
202 | let error = new Error(`${request_error.message}`);
|
203 | return Promise.reject(error);
|
204 | }
|
205 | }
|
206 |
|
207 | last_try = Date.now();
|
208 | }
|
209 |
|
210 | let reject_error = new Error(`Max reconnects exceeded (${reconnects}):\n${last_error.message}`);
|
211 | reject_error.reconnects = reconnects;
|
212 | return Promise.reject(reject_error);
|
213 | }
|
214 |
|
215 | /**
|
216 | * Disconnects an active request if any
|
217 | * @returns {boolean} Returns true if there is a request to disconnect
|
218 | */
|
219 | disconnect() {
|
220 | if(this.cancelToken) {
|
221 | this.cancelToken.cancel('Disconnecting stream');
|
222 | this.cancelToken = null;
|
223 | }
|
224 | this.emit('disconnected');
|
225 | }
|
226 |
|
227 | /**
|
228 | * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
|
229 | * @private
|
230 | * @returns {Promise} Promise that initiates HTTP streaming
|
231 | */
|
232 | buildStreamPromise(resp, writable_stream) {
|
233 | return new Promise((resolve, reject) => {
|
234 | this.emit('connected');
|
235 | let error;
|
236 | let disconnected = false;
|
237 | let hose = resp.data;
|
238 | const timer = setTimeout(() => {
|
239 | const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
|
240 | e.isTimeout = true;
|
241 | hose.emit('error', e);
|
242 | }, this.stream_timeout);
|
243 | const jsonStream = hose.pipe(new TweetParser({
|
244 | emitter: this,
|
245 | timer: timer
|
246 | }));
|
247 | const streams = [hose, jsonStream];
|
248 | if(writable_stream) {
|
249 | streams.push(jsonStream.pipe(writable_stream));
|
250 | }
|
251 | const s_cleanup_fns = streams.map((s) => {
|
252 | return finished(s, (err) => {
|
253 | if(!s.destroyed) {
|
254 | s.destroy(err);
|
255 | }
|
256 | });
|
257 | });
|
258 | const stream_cleanup = () => s_cleanup_fns.forEach((fn) => fn());
|
259 | const disconnect_cb = () => {
|
260 | disconnected = true;
|
261 | hose.destroy();
|
262 | };
|
263 | this.once('disconnected', disconnect_cb);
|
264 | hose.on('close', () => {
|
265 | clearTimeout(timer);
|
266 | stream_cleanup();
|
267 | this.removeListener('disconnected', disconnect_cb)
|
268 | if(!error) {
|
269 | this.emit('close');
|
270 | resolve(disconnected);
|
271 | }
|
272 | });
|
273 | hose.on('error', stream_error => {
|
274 | error = stream_error;
|
275 | if(!hose.destroyed) {
|
276 | hose.destroy(error);
|
277 | }
|
278 | reject(error);
|
279 | });
|
280 | });
|
281 | }
|
282 |
|
283 | /**
|
284 | * Connect to twitter stream and emit events. Errors unhandled.
|
285 | * @private
|
286 | * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
|
287 | * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
|
288 | */
|
289 | buildConnection(params, writable_stream) {
|
290 | this.disconnect();
|
291 | this.cancelToken = axios.CancelToken.source();
|
292 |
|
293 | let streamReq = this.twitrClient.get('tweets/sample/stream', {
|
294 | responseType: 'stream',
|
295 | cancelToken: this.cancelToken.token,
|
296 | params: params,
|
297 | decompress: true,
|
298 | });
|
299 |
|
300 | return streamReq.then((resp) => {
|
301 | return this.buildStreamPromise(resp, writable_stream);
|
302 | });
|
303 | }
|
304 | }
|
305 |
|
306 | module.exports = StreamClient;
|
307 | </code></pre>
|
308 | </article>
|
309 | </section>
|
310 |
|
311 |
|
312 |
|
313 |
|
314 |
|
315 |
|
316 | </div>
|
317 |
|
318 | <br class="clear">
|
319 |
|
320 | <footer>
|
321 | Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.6.6</a> on Mon Oct 19 2020 05:12:10 GMT-0500 (Central Daylight Time) using the <a href="https://github.com/clenemt/docdash">docdash</a> theme.
|
322 | </footer>
|
323 |
|
324 | <script>prettyPrint();</script>
|
325 | <script src="scripts/polyfill.js"></script>
|
326 | <script src="scripts/linenumber.js"></script>
|
327 |
|
328 | <script src="scripts/search.js" defer></script>
|
329 |
|
330 |
|
331 |
|
332 | </body>
|
333 | </html>
|