UNPKG

13.5 kBJavaScriptView Raw
1//
2// Create and execute http requests to an Oauth-protected resource
3//
4var EventEmitter = require('events').EventEmitter
5 , querystring = require('querystring')
6 , util = require('util')
7 , Parser = require('./parser')
8 , Auth = require('./auth')
9
10// set of status codes where we don't attempt reconnects
11var STATUS_CODES_TO_ABORT_ON = [ 400, 401, 403, 404, 406, 410, 422 ];
12
13/**
14 * OAuth http request
15 *
16 * @param {Object} oauth
17 * @param {String} method GET or POST
18 * @param {String} path REST resource
19 * @param {Object} params query params
20 */
21function OARequest (oauth, method, path, params) {
22 if (method !== 'GET' && method !== 'POST') {
23 throw new Error('method `'+method+'` not supported.\n')
24 }
25 this.oauth = oauth
26 this.method = method
27
28 this.twit_options = {}
29
30 if (params && params.twit_options && typeof params.twit_options !== 'object') {
31 // invalid `twit_options` specified
32 throw new Error('Invalid twit_options supplied: '+params.twit_options)
33 } else if (params.twit_options) {
34 // valid `twit_options` specified - override the default
35 this.twit_options = JSON.parse(JSON.stringify(params.twit_options))
36 delete params.twit_options
37 }
38
39 if (method === 'GET') {
40 this.path = path + (params ? '?' + querystring.stringify(params) : '')
41 this.params = null
42 } else if (method === 'POST') {
43 this.path = path
44 this.params = params
45 }
46
47 EventEmitter.call(this)
48}
49
50util.inherits(OARequest, EventEmitter)
51
52/**
53 * Perform http request & persist the connection
54 * Emit events as they come in from twitter
55 */
56OARequest.prototype.persist = function () {
57 var self = this
58
59 this.parser = new Parser()
60
61 //handle twitter objects as they come in
62 this.parser.on('element', function (msg) {
63 // first msg; successful connection => reset reconnect interval
64 self.connectInterval = 0
65 if (msg.delete) { self.emit('delete', msg) }
66 else if (msg.disconnect) { self.handleDisconnect(msg) }
67 else if (msg.limit) { self.emit('limit', msg) }
68 else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
69 else if (msg.warning) { self.emit('warning', msg) }
70 else if (msg.status_withheld) { self.emit('status_withheld', msg) }
71 else if (msg.user_withheld) { self.emit('user_withheld', msg) }
72 else if (msg.friends) { self.emit('friends', msg) }
73 else if (msg.direct_message) { self.emit('direct_message', msg) }
74 else if (msg.event) {
75 self.emit('user_event', msg)
76 // reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
77 var ev = msg.event
78
79 if (ev === 'blocked') { self.emit('blocked', msg) }
80 else if (ev === 'unblocked') { self.emit('unblocked', msg) }
81 else if (ev === 'favorite') { self.emit('favorite', msg) }
82 else if (ev === 'unfavorite') { self.emit('unfavorite', msg) }
83 else if (ev === 'follow') { self.emit('follow', msg) }
84 else if (ev === 'unfollow') { self.emit('unfollow', msg) }
85 else if (ev === 'user_update') { self.emit('user_update', msg) }
86 else if (ev === 'list_created') { self.emit('list_created', msg) }
87 else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) }
88 else if (ev === 'list_updated') { self.emit('list_updated', msg) }
89 else if (ev === 'list_member_added') { self.emit('list_member_added', msg) }
90 else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) }
91 else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
92 else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
93 else { self.emit('unknown_user_event', msg) }
94 } else { self.emit('tweet', msg) }
95 })
96
97 this.parser.on('error', function (err) {
98 self.emit('error', err)
99 })
100
101 //kick off the persisting http request - call it on nextTick so events
102 //can be listened on in current event loop tick before we connect
103 process.nextTick(function () {
104 // don't call `.start()` so that `.stop()` can be called in the
105 // same event loop tick and set the `abortedBy` flag to stop a request attempt.
106 self.keepAlive()
107 })
108
109 return this
110}
111
112/**
113 * Kick off the http request, and persist the connection
114 *
115 */
116OARequest.prototype.start = function () {
117 this.abortedBy = null;
118 this.keepAlive()
119 return this
120}
121
122/**
123 * Abort the http request
124 *
125 */
126OARequest.prototype.stop = function () {
127 this.abortedBy = 'twit-client'
128
129 if (this.request) {
130 this.request.abort()
131 this.request.removeAllListeners()
132 delete this.request
133 }
134
135 if (this.response) {
136 this.response.removeAllListeners()
137 }
138
139 return this
140}
141
142/**
143 * Make http request and keep the connection alive. Handles the req events here.
144 *
145 * @return {Object} req http request object
146 */
147OARequest.prototype.keepAlive = function () {
148
149 var self = this;
150 // make request - passing in `null` causes request to be kept alive.
151 this.request = this.makeRequest(null);
152
153 this.response = null;
154
155 this.request.once('response', function (response) {
156
157 // reset our reconnection attempt flag
158 self.usedFirstReconnect = false;
159 self.response = response;
160 self.response.setEncoding('utf8');
161
162 //pass all response data to parser
163 self.response.on('data', function (chunk) {
164 self.emit('connected', self.response);
165 // stop stall abort timer, and start a new one
166 self.resetStallAbortTimeout();
167 self.parser.parse(chunk);
168 });
169
170 self.response.once('close', function () {
171 self.handleCloseEvent()
172 })
173
174 self.response.on('error', function (err) {
175 // expose response errors on twit instance
176 self.emit('error', err)
177 })
178
179 if (STATUS_CODES_TO_ABORT_ON.indexOf(response.statusCode) !== -1) {
180 self.abortedBy = 'twit-client'
181 self.stop()
182
183 var error = new Error('Bad Twitter streaming request: ' + response.statusCode)
184 error.response = response
185 self.emit('error', error);
186 }
187 });
188
189 this.request.once('close', function () {
190 self.handleCloseEvent()
191 });
192
193 this.request.once('error', function (err) {
194 self.stopStallAbortTimeout();
195 self.emit('error', err);
196 });
197 // send off the request
198 this.request.end();
199 this.emit('connect', this.request);
200 return this;
201}
202
203/**
204 * Handle when the request or response closes.
205 * Schedule a keep-alive reconnect according to Twitter's reconnect guidelines
206 * @param {http.ClientRequest} lastRequest http request that we last made
207 * @param {http.IncomingMessage} response response to the last request we made
208 *
209 */
210OARequest.prototype.handleCloseEvent = function () {
211 var self = this;
212
213 self.stopStallAbortTimeout();
214
215 // We closed it explicitly - don't reconnect
216 if (self.abortedBy === 'twit-client')
217 return
218
219 // we got disconnected by twitter - reconnect according to their guidelines
220 self.abortedBy = 'twitter';
221
222 if (self.scheduledReconnect) {
223 // if we already have a reconnect scheduled, don't schedule another one.
224 // this race condition can happen if self.request and self.response both emit `close`
225 return
226 }
227
228 if (self.response && self.response.statusCode === 420) {
229 // we are being rate limited
230 // start with a 1 minute wait and double each attempt
231 if (!self.connectInterval) {
232 self.connectInterval = 60000;
233 } else {
234 self.connectInterval *= 2;
235 }
236 } else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
237 // twitter 5xx errors
238 // start with a 5s wait, double each attempt up to 320s
239 if (!self.connectInterval) {
240 self.connectInterval = 5000;
241 } else if (self.connectInterval < 320000) {
242 self.connectInterval *= 2;
243 } else {
244 self.connectInterval = 320000;
245 }
246 } else {
247 // tcp error, or a stall in the stream (and stall timer closed the connection)
248 if (!self.usedFirstReconnect) {
249 // first reconnection attempt on a valid connection should occur immediately
250 self.connectInterval = 0;
251 self.usedFirstReconnect = true;
252 } else if (self.connectInterval < 16000) {
253 // linearly increase delay by 250ms up to 16s
254 self.connectInterval += 250;
255 } else {
256 // cap out reconnect interval at 16s
257 self.connectInterval = 16000;
258 }
259 }
260
261 self.emit('reconnect', self.request, self.response, self.connectInterval);
262
263 // schedule the reconnect
264 self.scheduledReconnect = setTimeout(function () {
265 self.start();
266 delete self.scheduledReconnect;
267 }, self.connectInterval);
268}
269
270/**
271 * Handles a disconnect message from twitter. Closes the connection, and emits descriptive info
272 *
273 *
274 * @param {Object} msg - disconnect message received from twitter
275 *
276 */
277OARequest.prototype.handleDisconnect = function (msg) {
278 this.emit('disconnect', msg)
279 this.stop()
280 return this
281}
282
283/**
284 *
285 * Make an HTTP request to twitter, pass the parsed reply to @callback
286 *
287 * @param {Function} callback - user's supplied callback
288 *
289 */
290OARequest.prototype.end = function (callback) {
291 var self = this
292
293 // handle the http response - maybe retry if specified `retry`
294 var responseHandler = function (err, raw, response) {
295 if (err instanceof Error) {
296 // socket error
297 // add default values to error
298 err.twitterReply = raw || null
299 // no code sent from twitter
300 err.code = null;
301 err.allErrors = [];
302
303 return callback(err, err.twitterReply, response)
304 }
305
306 // handle http errors
307 if (err && !(err instanceof Error) && err.statusCode) {
308 if (
309 self.twit_options.retry &&
310 STATUS_CODES_TO_ABORT_ON.indexOf(err.statusCode) === -1
311 ) {
312 // we should retry the request
313 self.makeRequest(responseHandler)
314 return
315 }
316
317 var error = new Error()
318 // default error message which will get overwritten
319 error.message = 'Unknown Twitter API Error';
320 error.statusCode = err.statusCode
321 // keep consistent with the rest of the error handling by passing the raw response data here
322 error.twitterReply = err.data
323 // code to be overwritten by parsing twitter error objects
324 error.code = null;
325 // array of error objects returned by twitter
326 error.allErrors = [];
327
328 // try to populate `message`, `code` and `allErrors`
329 try {
330 var parsedError = JSON.parse(err.data)
331
332 if (parsedError.errors && parsedError.errors.length) {
333 // default to the first error
334 error.message = parsedError.errors[0].message
335 error.code = parsedError.errors[0].code
336
337 // save reference to all errors
338 error.allErrors = parsedError.errors;
339 }
340 } catch (e) {
341 // if twitter error reply is not as-expected, just go with what we have
342 }
343
344 return callback(error, undefined, response)
345 } else if (err) {
346 // handle non-http errors
347 err.twitterReply = raw
348 return callback(err, undefined, response)
349 }
350
351 // non-error case - parse response and pass to user's callback
352
353 // parse response
354 var parsed = null
355 , parseError = null
356
357 try {
358 parsed = JSON.parse(raw)
359 } catch(e) {
360 parseError = new Error('twitter reply is not a valid JSON string.')
361 parseError.twitterReply = raw
362 }
363
364 // handle parsing errors
365 if (parseError) {
366 return callback(parseError, undefined, response)
367 } else if (!parsed) {
368 // null, false or empty reply
369 var badReplyError = new Error('twitter sent bad reply: `'+parsed+'`.')
370 badReplyError.twitterReply = raw
371
372 return callback(badReplyError, undefined, response)
373 }
374
375 return callback(null, parsed, response)
376 }
377
378 // make the http request and handle it before calling user's cb
379 this.makeRequest(responseHandler)
380
381 return this
382}
383
384/**
385 * Send off the HTTP request, passing back the reply to @cb
386 * If no @cb, persists the connection.
387 *
388 * For use by OARequest.end() and OARequest.persist()
389 *
390 * @param {Function} cb
391 *
392 */
393OARequest.prototype.makeRequest = function (cb) {
394 var action = this.method.toLowerCase()
395 var self = this
396
397 if (action !== 'get' && action !== 'post')
398 throw new Error('method `'+action+'` not supported')
399
400 // GET
401 if (action === 'get') {
402 return self.oauth.oa.get(
403 self.path
404 , self.oauth.config.access_token
405 , self.oauth.config.access_token_secret
406 , cb
407 )
408 }
409 // POST
410 return self.oauth.oa.post(
411 self.path
412 , self.oauth.config.access_token
413 , self.oauth.config.access_token_secret
414 , self.params
415 , cb
416 )
417}
418
419
420
421/**
422 * Stop and restart the stall abort timer (called when new data is received)
423 *
424 */
425OARequest.prototype.resetStallAbortTimeout = function () {
426 var self = this;
427 // stop the previous stall abort timer
428 this.stopStallAbortTimeout();
429 //start a new 90s timeout to trigger a close & reconnect if no data received
430 this.stallAbortTimeout = setTimeout(function () {
431 if (typeof self.request !== 'undefined') {
432 self.request.abort();
433 delete self.request;
434 }
435 }, 90000);
436 return this;
437}
438
439/**
440 * Stop stall timer
441 *
442 */
443OARequest.prototype.stopStallAbortTimeout = function () {
444 if (this.stallAbortTimeout) clearTimeout(this.stallAbortTimeout);
445 return this;
446}
447
448module.exports = OARequest
449