UNPKG

12.1 kBJavaScriptView Raw
1
2var EventEmitter = require('events').EventEmitter;
3var util = require('util');
4
5var helpers = require('./helpers')
6var Parser = require('./parser');
7var request = require('request');
8var zlib = require('zlib');
9
10var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON
11
12var StreamingAPIConnection = function (reqOpts, twitOptions) {
13 this.reqOpts = reqOpts
14 this.twitOptions = twitOptions
15 EventEmitter.call(this)
16}
17
18util.inherits(StreamingAPIConnection, EventEmitter)
19
20/**
21 * Resets the connection.
22 * - clears request, response, parser
23 * - removes scheduled reconnect handle (if one was scheduled)
24 * - stops the stall abort timeout handle (if one was scheduled)
25 */
26StreamingAPIConnection.prototype._resetConnection = function () {
27 if (this.request) {
28 // clear our reference to the `request` instance
29 this.request.removeAllListeners();
30 this.request.destroy();
31 }
32
33 if (this.response) {
34 // clear our reference to the http.IncomingMessage instance
35 this.response.removeAllListeners();
36 this.response.destroy();
37 }
38
39 if (this.parser) {
40 this.parser.removeAllListeners()
41 }
42
43 // ensure a scheduled reconnect does not occur (if one was scheduled)
44 // this can happen if we get a close event before .stop() is called
45 clearTimeout(this._scheduledReconnect)
46 delete this._scheduledReconnect
47
48 // clear our stall abort timeout
49 this._stopStallAbortTimeout()
50}
51
52/**
53 * Resets the parameters used in determining the next reconnect time
54 */
55StreamingAPIConnection.prototype._resetRetryParams = function () {
56 // delay for next reconnection attempt
57 this._connectInterval = 0
58 // flag indicating whether we used a 0-delay reconnect
59 this._usedFirstReconnect = false
60}
61
62StreamingAPIConnection.prototype._startPersistentConnection = function () {
63 var self = this;
64 self._resetConnection();
65 self._setupParser();
66 self._resetStallAbortTimeout();
67 self.request = request.post(this.reqOpts);
68 self.emit('connect', self.request);
69 self.request.on('response', function (response) {
70 // reset our reconnection attempt flag so next attempt goes through with 0 delay
71 // if we get a transport-level error
72 self._usedFirstReconnect = false;
73 // start a stall abort timeout handle
74 self._resetStallAbortTimeout();
75 self.response = response
76 if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) {
77 // We got a status code telling us we should abort the connection.
78 // Read the body from the response and return an error to the user.
79 var body = '';
80 var compressedBody = '';
81
82 self.response.on('data', function (chunk) {
83 compressedBody += chunk.toString('utf8');
84 })
85
86 var gunzip = zlib.createGunzip();
87 self.response.pipe(gunzip);
88 gunzip.on('data', function (chunk) {
89 body += chunk.toString('utf8')
90 })
91
92 gunzip.on('end', function () {
93 try {
94 body = JSON.parse(body)
95 } catch (jsonDecodeError) {
96 // Twitter may send an HTML body
97 // if non-JSON text was returned, we'll just attach it to the error as-is
98 }
99 // surface the error to the user
100 var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode)
101 error.statusCode = response ? response.statusCode: null;
102 helpers.attachBodyInfoToError(error, body)
103 self.emit('error', error);
104 // stop the stream explicitly so we don't reconnect
105 self.stop()
106 body = null;
107 });
108 gunzip.on('error', function (err) {
109 // If Twitter sends us back an uncompressed HTTP response, gzip will error out.
110 // Handle this by emitting an error with the uncompressed response body.
111 var errMsg = 'Gzip error: ' + err.message;
112 var twitErr = helpers.makeTwitError(errMsg);
113 twitErr.statusCode = self.response.statusCode;
114 helpers.attachBodyInfoToError(twitErr, compressedBody);
115 self.emit('error', twitErr);
116 });
117 } else if (self.response.statusCode === 420) {
118 // close the connection forcibly so a reconnect is scheduled by `self.onClose()`
119 self._scheduleReconnect();
120 } else {
121 // We got an OK status code - the response should be valid.
122 // Read the body from the response and return to the user.
123 var gunzip = zlib.createGunzip();
124 self.response.pipe(gunzip);
125
126 //pass all response data to parser
127 gunzip.on('data', function (chunk) {
128 self._connectInterval = 0
129 // stop stall timer, and start a new one
130 self._resetStallAbortTimeout();
131 self.parser.parse(chunk.toString('utf8'));
132 });
133
134 gunzip.on('close', self._onClose.bind(self))
135 gunzip.on('error', function (err) {
136 self.emit('error', err);
137 })
138 self.response.on('error', function (err) {
139 // expose response errors on twit instance
140 self.emit('error', err);
141 })
142
143 // connected without an error response from Twitter, emit `connected` event
144 // this must be emitted after all its event handlers are bound
145 // so the reference to `self.response` is not interfered-with by the user until it is emitted
146 self.emit('connected', self.response);
147 }
148 });
149 self.request.on('close', self._onClose.bind(self));
150 self.request.on('error', function (err) { self._scheduleReconnect.bind(self) });
151 return self;
152}
153
154/**
155 * Handle when the request or response closes.
156 * Schedule a reconnect according to Twitter's reconnect guidelines
157 *
158 */
159StreamingAPIConnection.prototype._onClose = function () {
160 var self = this;
161 self._stopStallAbortTimeout();
162 // We closed it explicitly - don't reconnect
163 if (self._abortedBy === 'twit-client')
164 return
165
166 // we got disconnected by twitter - reconnect according to their guidelines
167 self._abortedBy = 'twitter';
168
169 if (self._scheduledReconnect) {
170 // if we already have a reconnect scheduled, don't schedule another one.
171 // this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close`
172 return
173 }
174
175 self._scheduleReconnect();
176}
177
178/**
179 * Kick off the http request, and persist the connection
180 *
181 */
182StreamingAPIConnection.prototype.start = function () {
183 this._resetRetryParams();
184 this._startPersistentConnection();
185 return this;
186}
187
188/**
189 * Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state
190 *
191 */
192StreamingAPIConnection.prototype.stop = function () {
193 this._abortedBy = 'twit-client';
194 // clear connection variables and timeout handles
195 this._resetConnection();
196 this._resetRetryParams();
197 return this;
198}
199
200/**
201 * Stop and restart the stall abort timer (called when new data is received)
202 *
203 * If we go 90s without receiving data from twitter, we abort the request & reconnect.
204 */
205StreamingAPIConnection.prototype._resetStallAbortTimeout = function () {
206 var self = this;
207 // stop the previous stall abort timer
208 self._stopStallAbortTimeout();
209 //start a new 90s timeout to trigger a close & reconnect if no data received
210 self._stallAbortTimeout = setTimeout(function () {
211 self._scheduleReconnect()
212 }, 90000);
213 return this;
214}
215
216/**
217 * Stop stall timeout
218 *
219 */
220StreamingAPIConnection.prototype._stopStallAbortTimeout = function () {
221 clearTimeout(this._stallAbortTimeout);
222 // mark the timer as `null` so it is clear via introspection that the timeout is not scheduled
223 delete this._stallAbortTimeout;
224 return this;
225}
226
227/**
228 * Computes the next time a reconnect should occur (based on the last HTTP response received)
229 * and starts a timeout handle to begin reconnecting after `self._connectInterval` passes.
230 *
231 * @return {Undefined}
232 */
233StreamingAPIConnection.prototype._scheduleReconnect = function () {
234 var self = this;
235 if (self.response && self.response.statusCode === 420) {
236 // we are being rate limited
237 // start with a 1 minute wait and double each attempt
238 if (!self._connectInterval) {
239 self._connectInterval = 60000;
240 } else {
241 self._connectInterval *= 2;
242 }
243 } else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
244 // twitter 5xx errors
245 // start with a 5s wait, double each attempt up to 320s
246 if (!self._connectInterval) {
247 self._connectInterval = 5000;
248 } else if (self._connectInterval < 320000) {
249 self._connectInterval *= 2;
250 } else {
251 self._connectInterval = 320000;
252 }
253 } else {
254 // we did not get an HTTP response from our last connection attempt.
255 // DNS/TCP error, or a stall in the stream (and stall timer closed the connection)
256 if (!self._usedFirstReconnect) {
257 // first reconnection attempt on a valid connection should occur immediately
258 self._connectInterval = 0;
259 self._usedFirstReconnect = true;
260 } else if (self._connectInterval < 16000) {
261 // linearly increase delay by 250ms up to 16s
262 self._connectInterval += 250;
263 } else {
264 // cap out reconnect interval at 16s
265 self._connectInterval = 16000;
266 }
267 }
268
269 // schedule the reconnect
270 self._scheduledReconnect = setTimeout(function () {
271 self._startPersistentConnection();
272 }, self._connectInterval);
273 self.emit('reconnect', self.request, self.response, self._connectInterval);
274}
275
276StreamingAPIConnection.prototype._setupParser = function () {
277 var self = this
278 self.parser = new Parser()
279
280 // handle twitter objects as they come in - emit the generic `message` event
281 // along with the specific event corresponding to the message
282 self.parser.on('element', function (msg) {
283 self.emit('message', msg)
284
285 if (msg.delete) { self.emit('delete', msg) }
286 else if (msg.disconnect) { self._handleDisconnect(msg) }
287 else if (msg.limit) { self.emit('limit', msg) }
288 else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
289 else if (msg.warning) { self.emit('warning', msg) }
290 else if (msg.status_withheld) { self.emit('status_withheld', msg) }
291 else if (msg.user_withheld) { self.emit('user_withheld', msg) }
292 else if (msg.friends) { self.emit('friends', msg) }
293 else if (msg.direct_message) { self.emit('direct_message', msg) }
294 else if (msg.event) {
295 self.emit('user_event', msg)
296 // reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
297 var ev = msg.event
298
299 if (ev === 'blocked') { self.emit('blocked', msg) }
300 else if (ev === 'unblocked') { self.emit('unblocked', msg) }
301 else if (ev === 'favorite') { self.emit('favorite', msg) }
302 else if (ev === 'unfavorite') { self.emit('unfavorite', msg) }
303 else if (ev === 'follow') { self.emit('follow', msg) }
304 else if (ev === 'unfollow') { self.emit('unfollow', msg) }
305 else if (ev === 'user_update') { self.emit('user_update', msg) }
306 else if (ev === 'list_created') { self.emit('list_created', msg) }
307 else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) }
308 else if (ev === 'list_updated') { self.emit('list_updated', msg) }
309 else if (ev === 'list_member_added') { self.emit('list_member_added', msg) }
310 else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) }
311 else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
312 else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
313 else if (ev === 'quoted_tweet') { self.emit('quoted_tweet', msg) }
314 else { self.emit('unknown_user_event', msg) }
315 } else { self.emit('tweet', msg) }
316 })
317
318 self.parser.on('error', function (err) {
319 self.emit('parser-error', err)
320 })
321}
322
323StreamingAPIConnection.prototype._handleDisconnect = function (twitterMsg) {
324 this.emit('disconnect', twitterMsg);
325 this.stop();
326}
327
328module.exports = StreamingAPIConnection