UNPKG

10.9 kBJavaScriptView Raw
1
2var EventEmitter = require('events').EventEmitter;
3var util = require('util');
4
5var helpers = require('./helpers')
6var Parser = require('./parser');
7var request = require('request');
8var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON
9
10var StreamingAPIConnection = function (reqOpts, twitOptions) {
11 this.reqOpts = reqOpts
12 this.twitOptions = twitOptions
13 EventEmitter.call(this)
14}
15
16util.inherits(StreamingAPIConnection, EventEmitter)
17
18/**
19 * Resets the connection.
20 * - clears request, response, parser
21 * - removes scheduled reconnect handle (if one was scheduled)
22 * - stops the stall abort timeout handle (if one was scheduled)
23 */
24StreamingAPIConnection.prototype._resetConnection = function () {
25 if (this.request) {
26 // clear our reference to the `request` instance
27 this.request.removeAllListeners();
28 this.request.destroy();
29 delete this.request;
30 }
31
32 if (this.response) {
33 // clear our reference to the http.IncomingMessage instance
34 this.response.removeAllListeners();
35 this.response.destroy();
36 delete this.response;
37 }
38
39 if (this.parser) {
40 this.parser.removeAllListeners()
41 delete this.parser
42 }
43
44 // ensure a scheduled reconnect does not occur (if one was scheduled)
45 // this can happen if we get a close event before .stop() is called
46 clearTimeout(this._scheduledReconnect)
47 delete this._scheduledReconnect
48
49 // clear our stall abort timeout
50 this._stopStallAbortTimeout()
51}
52
53/**
54 * Resets the parameters used in determining the next reconnect time
55 */
56StreamingAPIConnection.prototype._resetRetryParams = function () {
57 // delay for next reconnection attempt
58 this._connectInterval = 0
59 // flag indicating whether we used a 0-delay reconnect
60 this._usedFirstReconnect = false
61}
62
63StreamingAPIConnection.prototype._startPersistentConnection = function () {
64 var self = this;
65 self._resetConnection();
66 self._setupParser();
67 self._resetStallAbortTimeout();
68 self.request = request.post(this.reqOpts);
69 self.emit('connect', self.request);
70 self.request.on('response', function (response) {
71 // reset our reconnection attempt flag so next attempt goes through with 0 delay
72 // if we get a transport-level error
73 self._usedFirstReconnect = false;
74 // start a stall abort timeout handle
75 self._resetStallAbortTimeout();
76 self.response = response
77 self.response.setEncoding('utf8');
78
79 if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) {
80 // We got a status code telling us we should abort the connection.
81 // Read the body from the response and return an error to the user.
82 var body = ''
83 self.response.on('data', function (chunk) {
84 body += chunk.toString('utf8')
85 })
86
87 self.response.on('end', function () {
88 try {
89 body = JSON.parse(body)
90 } catch (jsonDecodeError) {
91 // Twitter may send an HTML body
92 // if non-JSON text was returned, we'll just attach it to the error as-is
93 }
94 // surface the error to the user
95 var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode)
96 error.statusCode = response ? response.statusCode: null;
97 helpers.attachBodyInfoToError(error, body)
98 self.emit('error', error);
99 // stop the stream explicitly so we don't reconnect
100 self.stop()
101 delete body
102 })
103 return
104 } else if (self.response.statusCode === 420) {
105 // close the connection forcibly so a reconnect is scheduled by `self.onClose()`
106 self._scheduleReconnect();
107 return
108 }
109
110 //pass all response data to parser
111 self.response.on('data', function (chunk) {
112 self._connectInterval = 0
113 // stop stall timer, and start a new one
114 self._resetStallAbortTimeout();
115 self.parser.parse(chunk);
116 });
117
118 self.response.on('close', self._onClose.bind(self))
119 self.response.on('error', function (err) {
120 // expose response errors on twit instance
121 self.emit('error', err)
122 })
123
124 // connected without an error response from Twitter, emit `connected` event
125 // this must be emitted after all its event handlers are bound
126 // so the reference to `self.response` is not interfered-with by the user until it is emitted
127 self.emit('connected', self.response);
128 });
129 self.request.on('close', self._onClose.bind(self));
130 self.request.on('error', self._scheduleReconnect.bind(self));
131 return self;
132}
133
134/**
135 * Handle when the request or response closes.
136 * Schedule a reconnect according to Twitter's reconnect guidelines
137 *
138 */
139StreamingAPIConnection.prototype._onClose = function () {
140 var self = this;
141 self._stopStallAbortTimeout();
142
143 // We closed it explicitly - don't reconnect
144 if (self._abortedBy === 'twit-client')
145 return
146
147 // we got disconnected by twitter - reconnect according to their guidelines
148 self._abortedBy = 'twitter';
149
150 if (self._scheduledReconnect) {
151 // if we already have a reconnect scheduled, don't schedule another one.
152 // this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close`
153 return
154 }
155
156 self._scheduleReconnect()
157}
158
159/**
160 * Kick off the http request, and persist the connection
161 *
162 */
163StreamingAPIConnection.prototype.start = function () {
164 this._resetRetryParams();
165 this._startPersistentConnection();
166 return this;
167}
168
169/**
170 * Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state
171 *
172 */
173StreamingAPIConnection.prototype.stop = function () {
174 this._abortedBy = 'twit-client';
175 // clear connection variables and timeout handles
176 this._resetConnection();
177 return this;
178}
179
180/**
181 * Stop and restart the stall abort timer (called when new data is received)
182 *
183 * If we go 90s without receiving data from twitter, we abort the request & reconnect.
184 */
185StreamingAPIConnection.prototype._resetStallAbortTimeout = function () {
186 var self = this;
187 // stop the previous stall abort timer
188 self._stopStallAbortTimeout();
189 //start a new 90s timeout to trigger a close & reconnect if no data received
190 self._stallAbortTimeout = setTimeout(function () {
191 self._scheduleReconnect()
192 }, 90000);
193 return this;
194}
195
196/**
197 * Stop stall timeout
198 *
199 */
200StreamingAPIConnection.prototype._stopStallAbortTimeout = function () {
201 clearTimeout(this._stallAbortTimeout);
202 // mark the timer as `null` so it is clear via introspection that the timeout is not scheduled
203 delete this._stallAbortTimeout;
204 return this;
205}
206
207/**
208 * Computes the next time a reconnect should occur (based on the last HTTP response received)
209 * and starts a timeout handle to begin reconnecting after `self._connectInterval` passes.
210 *
211 * @return {Undefined}
212 */
213StreamingAPIConnection.prototype._scheduleReconnect = function () {
214 var self = this;
215 if (self.response && self.response.statusCode === 420) {
216 // we are being rate limited
217 // start with a 1 minute wait and double each attempt
218 if (!self._connectInterval) {
219 self._connectInterval = 60000;
220 } else {
221 self._connectInterval *= 2;
222 }
223 } else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
224 // twitter 5xx errors
225 // start with a 5s wait, double each attempt up to 320s
226 if (!self._connectInterval) {
227 self._connectInterval = 5000;
228 } else if (self._connectInterval < 320000) {
229 self._connectInterval *= 2;
230 } else {
231 self._connectInterval = 320000;
232 }
233 } else {
234 // we did not get an HTTP response from our last connection attempt.
235 // DNS/TCP error, or a stall in the stream (and stall timer closed the connection)
236 if (!self._usedFirstReconnect) {
237 // first reconnection attempt on a valid connection should occur immediately
238 self._connectInterval = 0;
239 self._usedFirstReconnect = true;
240 } else if (self._connectInterval < 16000) {
241 // linearly increase delay by 250ms up to 16s
242 self._connectInterval += 250;
243 } else {
244 // cap out reconnect interval at 16s
245 self._connectInterval = 16000;
246 }
247 }
248
249 // schedule the reconnect
250 self._scheduledReconnect = setTimeout(function () {
251 self._startPersistentConnection();
252 }, self._connectInterval);
253 self.emit('reconnect', self.request, self.response, self._connectInterval);
254}
255
256StreamingAPIConnection.prototype._setupParser = function () {
257 var self = this
258 self.parser = new Parser()
259
260 // handle twitter objects as they come in - emit the generic `message` event
261 // along with the specific event corresponding to the message
262 self.parser.on('element', function (msg) {
263 self.emit('message', msg)
264
265 if (msg.delete) { self.emit('delete', msg) }
266 else if (msg.disconnect) { self.handleDisconnect(msg) }
267 else if (msg.limit) { self.emit('limit', msg) }
268 else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
269 else if (msg.warning) { self.emit('warning', msg) }
270 else if (msg.status_withheld) { self.emit('status_withheld', msg) }
271 else if (msg.user_withheld) { self.emit('user_withheld', msg) }
272 else if (msg.friends) { self.emit('friends', msg) }
273 else if (msg.direct_message) { self.emit('direct_message', msg) }
274 else if (msg.event) {
275 self.emit('user_event', msg)
276 // reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
277 var ev = msg.event
278
279 if (ev === 'blocked') { self.emit('blocked', msg) }
280 else if (ev === 'unblocked') { self.emit('unblocked', msg) }
281 else if (ev === 'favorite') { self.emit('favorite', msg) }
282 else if (ev === 'unfavorite') { self.emit('unfavorite', msg) }
283 else if (ev === 'follow') { self.emit('follow', msg) }
284 else if (ev === 'unfollow') { self.emit('unfollow', msg) }
285 else if (ev === 'user_update') { self.emit('user_update', msg) }
286 else if (ev === 'list_created') { self.emit('list_created', msg) }
287 else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) }
288 else if (ev === 'list_updated') { self.emit('list_updated', msg) }
289 else if (ev === 'list_member_added') { self.emit('list_member_added', msg) }
290 else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) }
291 else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
292 else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
293 else { self.emit('unknown_user_event', msg) }
294 } else { self.emit('tweet', msg) }
295 })
296
297 self.parser.on('error', function (err) {
298 self.emit('error', err)
299 })
300}
301
302module.exports = StreamingAPIConnection