UNPKG

16.9 kBJavaScriptView Raw
1const Buffer = require('safe-buffer').Buffer
2const debug = require('debug')('uwt')
3const EventEmitter = require('events').EventEmitter
4const http = require('http')
5const peerid = require('bittorrent-peerid')
6const series = require('run-series')
7const WebSocketServer = require('uws').Server
8
9const common = require('./lib/common')
10const Swarm = require('./lib/swarm')
11const parseWebSocketRequest = require('./lib/parse-websocket')
12
13const NAME = 'µWebTorrentTracker'
14const VERSION = require('./package.json').version
15
16/**
17 * WebTorrent tracker server.
18 *
19 * HTTP service which responds to GET requests from torrent clients. Requests include
20 * metrics from clients that help the tracker keep overall statistics about the torrent.
21 * Responses include a peer list that helps the client participate in the torrent.
22 *
23 * @param {Object} opts options object
24 * @param {Number} opts.interval tell clients to announce on this interval (ms)
25 * @param {Number} opts.trustProxy trust 'x-forwarded-for' and 'x-real-ip' headers from reverse proxy
26 * @param {boolean} opts.stats enable web-based statistics? (default: true)
27 * @param {function} opts.filter black/whitelist fn for disallowing/allowing torrents
28 */
29class Server extends EventEmitter {
30 constructor (opts) {
31 super()
32 if (!opts) opts = {}
33
34 debug('new server %o')
35
36 this.intervalMs = opts.interval
37 ? opts.interval
38 : 2 * 60 * 1000 // 2 min
39
40 this._trustProxy = !!opts.trustProxy
41 if (typeof opts.filter === 'function') this._filter = opts.filter
42
43 this.peersCacheLength = opts.peersCacheLength
44 this.peersCacheTtl = opts.peersCacheTtl
45
46 this._listenCalled = false
47 this.listening = false
48 this.destroyed = false
49 this.torrents = {}
50
51 this.http = http.createServer()
52 this.http.on('error', err => { this._onError(err) })
53 this.http.on('listening', () => {
54 this.listening = true
55 debug('listening')
56 this.emit('listening')
57 })
58
59 // Add default http request handler on next tick to give user the chance to add
60 // their own handler first. Handle requests untouched by user's handler.
61 process.nextTick(() => {
62 this.http.on('request', (req, res) => {
63 if (res.headersSent) return
64 // For websocket trackers, we only need to handle the UPGRADE http method.
65 // Return 404 for all other request types.
66 res.statusCode = 404
67 res.end('404 Not Found')
68 })
69 })
70
71 this.ws = new WebSocketServer({ server: this.http })
72 this.ws.on('error', err => { this._onError(err) })
73 this.ws.on('connection', socket => { this.onWebSocketConnection(socket) })
74
75 if (opts.stats !== false) {
76 // Http handler for '/stats' route
77 this.http.on('request', (req, res) => {
78 if (res.headersSent) return
79
80 const infoHashes = Object.keys(this.torrents)
81 const allPeers = {}
82
83 if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) {
84 infoHashes.forEach(infoHash => {
85 const peers = this.torrents[infoHash].peers
86 const keys = peers.keys
87
88 keys.forEach(peerId => {
89 // Don't mark the peer as most recently used for stats
90 const peer = peers.peek(peerId)
91 if (peer == null) return // peers.peek() can evict the peer
92
93 if (!allPeers[peerId]) {
94 allPeers[peerId] = {
95 ipv4: false,
96 ipv6: false,
97 seeder: false,
98 leecher: false
99 }
100 }
101
102 if (peer.ip.indexOf(':') >= 0) {
103 allPeers[peerId].ipv6 = true
104 } else {
105 allPeers[peerId].ipv4 = true
106 }
107
108 if (peer.complete) {
109 allPeers[peerId].seeder = true
110 } else {
111 allPeers[peerId].leecher = true
112 }
113
114 allPeers[peerId].peerId = peer.peerId
115 allPeers[peerId].client = peerid(peer.peerId)
116 })
117 })
118
119 const stats = {
120 torrents: infoHashes.length,
121 peersAll: Object.keys(allPeers).length,
122 peersSeederOnly: countPeers(isSeederOnly, allPeers),
123 peersLeecherOnly: countPeers(isLeecherOnly, allPeers),
124 peersSeederAndLeecher: countPeers(isSeederAndLeecher, allPeers),
125 peersIPv4: countPeers(isIPv4, allPeers),
126 peersIPv6: countPeers(isIPv6, allPeers),
127 clients: groupByClient(allPeers),
128 server: NAME,
129 serverVersion: VERSION
130 }
131
132 if (req.url === '/stats.json' || req.headers['accept'] === 'application/json') {
133 res.setHeader('Content-Type', 'application/json')
134 res.end(JSON.stringify(stats))
135 } else if (req.url === '/stats') {
136 res.end('<h1>' + stats.torrents + ' active torrents</h1>\n' +
137 '<h2>Connected Peers: ' + stats.peersAll + '</h2>\n' +
138 '<h3>Peers Seeding Only: ' + stats.peersSeederOnly + '</h3>\n' +
139 '<h3>Peers Leeching Only: ' + stats.peersLeecherOnly + '</h3>\n' +
140 '<h3>Peers Seeding & Leeching: ' + stats.peersSeederAndLeecher + '</h3>\n' +
141 '<h3>IPv4 Peers: ' + stats.peersIPv4 + '</h3>\n' +
142 '<h3>IPv6 Peers: ' + stats.peersIPv6 + '</h3>\n' +
143 '<h3>Clients:</h3>\n' +
144 printClients(stats.clients) +
145 '<small>Running <a href="https://www.npmjs.com/package/uwt">' + NAME + '</a> v' + VERSION + '</small>'
146 )
147 }
148 }
149 })
150 }
151 }
152
153 _onError (err) {
154 this.emit('error', err)
155 }
156
157 listen () /* port, onlistening */{
158 if (this._listenCalled || this.listening) throw new Error('server already listening')
159 this._listenCalled = true
160
161 const lastArg = arguments[arguments.length - 1]
162 if (typeof lastArg === 'function') this.once('listening', lastArg)
163
164 const port = toNumber(arguments[0]) || arguments[0] || 0
165
166 debug('listen (port: %o)', port)
167
168 const isObject = obj => {
169 return typeof obj === 'object' && obj !== null
170 }
171
172 const httpPort = isObject(port) ? (port.http || 0) : port
173
174 if (this.http) this.http.listen(httpPort)
175 }
176
177 close (cb) {
178 if (!cb) cb = noop
179 debug('close')
180
181 this.listening = false
182 this.destroyed = true
183
184 if (this.ws) {
185 try {
186 this.ws.close()
187 } catch (err) {}
188 }
189
190 if (this.http) this.http.close(cb)
191 else cb(null)
192 }
193
194 createSwarm (infoHash) {
195 if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex')
196
197 const swarm = this.torrents[infoHash] = new Swarm(infoHash, this)
198 return swarm
199 }
200
201 deleteSwarm (infoHash) {
202 process.nextTick(() => {
203 delete this.torrents[infoHash]
204 })
205 }
206
207 getSwarm (infoHash) {
208 if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex')
209
210 return this.torrents[infoHash]
211 }
212
213 onWebSocketConnection (socket, opts) {
214 if (!opts) opts = {}
215 opts.trustProxy = opts.trustProxy || this._trustProxy
216
217 socket.headers = socket.upgradeReq.headers
218 socket.realIPAddress = opts.trustProxy ? socket.headers['x-forwarded-for'] || socket._socket.remoteAddress : socket._socket.remoteAddress.replace(common.REMOVE_IPV4_MAPPED_IPV6_RE, '') // force ipv4
219 socket.port = socket._socket.remotePort
220
221 socket.peerId = null // as hex
222 socket.infoHashes = [] // swarms that this socket is participating in
223 socket.onSend = err => {
224 this._onWebSocketSend(socket, err)
225 }
226
227 socket.onMessageBound = params => {
228 this._onWebSocketRequest(socket, opts, params)
229 }
230 socket.on('message', socket.onMessageBound)
231
232 socket.onErrorBound = err => {
233 this._onWebSocketError(socket, err)
234 }
235 socket.on('error', socket.onErrorBound)
236
237 socket.onCloseBound = () => {
238 this._onWebSocketClose(socket)
239 }
240 socket.on('close', socket.onCloseBound)
241 }
242
243 _onWebSocketRequest (socket, opts, params) {
244 try {
245 params = parseWebSocketRequest(socket, opts, params)
246 } catch (err) {
247 socket.send(JSON.stringify({
248 'failure reason': err.message
249 }), socket.onSend)
250
251 // even though it's an error for the client, it's just a warning for the server.
252 // don't crash the server because a client sent bad data :)
253 this.emit('warning', err)
254 return
255 }
256
257 if (!socket.peerId) socket.peerId = params.peer_id // as hex
258
259 this._onRequest(params, (err, response) => {
260 if (this.destroyed) return
261 if (err) {
262 socket.send(JSON.stringify({
263 action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape',
264 'failure reason': err.message,
265 info_hash: common.hexToBinary(params.info_hash)
266 }), socket.onSend)
267
268 this.emit('warning', err)
269 return
270 }
271
272 response.action = params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape'
273
274 let peers
275 if (response.action === 'announce') {
276 peers = response.peers
277 delete response.peers
278
279 if (socket.infoHashes.indexOf(params.info_hash) === -1) {
280 socket.infoHashes.push(params.info_hash)
281 }
282
283 response.info_hash = common.hexToBinary(params.info_hash)
284 }
285
286 // Skip sending update back for 'answer' announce messages – not needed
287 if (!params.answer) {
288 socket.send(JSON.stringify(response), socket.onSend)
289 debug('sent response %s to %s', JSON.stringify(response), params.peer_id)
290 }
291
292 if (Array.isArray(params.offers)) {
293 debug('got %s offers from %s', params.offers.length, params.peer_id)
294 debug('got %s peers from swarm %s', peers.length, params.info_hash)
295 peers.forEach((peer, i) => {
296 peer.socket.send(JSON.stringify({
297 action: 'announce',
298 offer: params.offers[i].offer,
299 offer_id: params.offers[i].offer_id,
300 peer_id: common.hexToBinary(params.peer_id),
301 info_hash: common.hexToBinary(params.info_hash)
302 }), peer.socket.onSend)
303 debug('sent offer to %s from %s', peer.peerId, params.peer_id)
304 })
305 }
306
307 const done = () => {
308 // emit event once the announce is fully "processed"
309 if (params.action === common.ACTIONS.ANNOUNCE) {
310 this.emit(common.EVENT_NAMES[params.event], params.peer_id, params)
311 }
312 }
313
314 if (params.answer) {
315 debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id)
316
317 const swarm = this.getSwarm(params.info_hash)
318
319 if (!swarm) {
320 return this.emit('warning', new Error('no swarm with that `info_hash`'))
321 }
322 // Mark the destination peer as recently used in cache
323 const toPeer = swarm.peers.get(params.to_peer_id)
324 if (!toPeer) {
325 return this.emit('warning', new Error('no peer with that `to_peer_id`'))
326 }
327
328 toPeer.socket.send(JSON.stringify({
329 action: 'announce',
330 answer: params.answer,
331 offer_id: params.offer_id,
332 peer_id: common.hexToBinary(params.peer_id),
333 info_hash: common.hexToBinary(params.info_hash)
334 }), toPeer.socket.onSend)
335 debug('sent answer to %s from %s', toPeer.peerId, params.peer_id)
336
337 done()
338 } else {
339 done()
340 }
341 })
342 }
343
344 _onWebSocketSend (socket, err) {
345 if (err) this._onWebSocketError(socket, err)
346 }
347
348 _onWebSocketClose (socket) {
349 debug('websocket close %s', socket.peerId)
350
351 if (socket.peerId) {
352 socket.infoHashes.slice(0).forEach(infoHash => {
353 const swarm = this.torrents[infoHash]
354 if (swarm) {
355 swarm.announce({
356 event: 'stopped',
357 numwant: 0,
358 peer_id: socket.peerId
359 }, noop)
360 }
361 })
362 }
363
364 // ignore all future errors
365 socket.onSend = noop
366 socket.on('error', noop)
367
368 if (typeof socket.onMessageBound === 'function') {
369 socket.removeListener('message', socket.onMessageBound)
370 }
371 socket.onMessageBound = null
372
373 if (typeof socket.onErrorBound === 'function') {
374 socket.removeListener('error', socket.onErrorBound)
375 }
376 socket.onErrorBound = null
377
378 if (typeof socket.onCloseBound === 'function') {
379 socket.removeListener('close', socket.onCloseBound)
380 }
381 socket.onCloseBound = null
382
383 socket.peerId = null
384 socket.infoHashes = null
385 }
386
387 _onWebSocketError (socket, err) {
388 debug('websocket error %s', err.message || err)
389 this.emit('warning', err)
390 this._onWebSocketClose(socket)
391 }
392
393 _onRequest (params, cb) {
394 if (params && params.action === common.ACTIONS.CONNECT) {
395 cb(null, { action: common.ACTIONS.CONNECT })
396 } else if (params && params.action === common.ACTIONS.ANNOUNCE) {
397 this._onAnnounce(params, cb)
398 } else if (params && params.action === common.ACTIONS.SCRAPE) {
399 this._onScrape(params, cb)
400 } else {
401 cb(new Error('Invalid action'))
402 }
403 }
404
405 _onAnnounce (params, cb) {
406 const createSwarm = () => {
407 const swarm = this.createSwarm(params.info_hash)
408 announce(swarm)
409 }
410
411 const createSwarmFilter = () => {
412 if (this._filter) {
413 this._filter(params.info_hash, params, err => {
414 if (err) {
415 cb(err)
416 } else {
417 createSwarm()
418 }
419 })
420 } else {
421 createSwarm()
422 }
423 }
424
425 const announce = (swarm) => {
426 if (!params.event || params.event === 'empty') params.event = 'update'
427 swarm.announce(params, (err, response) => {
428 if (err) return cb(err)
429
430 if (!response.action) response.action = common.ACTIONS.ANNOUNCE
431 if (!response.interval) response.interval = Math.ceil(this.intervalMs / 1000)
432
433 cb(null, response)
434 })
435 }
436
437 const swarm = this.getSwarm(params.info_hash)
438
439 if (swarm) {
440 announce(swarm)
441 } else {
442 createSwarmFilter()
443 }
444 }
445
446 _onScrape (params, cb) {
447 if (params.info_hash == null) {
448 // if info_hash param is omitted, stats for all torrents are returned
449 params.info_hash = Object.keys(this.torrents)
450 }
451
452 series(params.info_hash.map(infoHash => cb => {
453 const swarm = this.getSwarm(infoHash)
454
455 if (swarm) {
456 swarm.scrape(params, (err, scrapeInfo) => {
457 if (err) return cb(err)
458 cb(null, {
459 infoHash: infoHash,
460 complete: (scrapeInfo && scrapeInfo.complete) || 0,
461 incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0
462 })
463 })
464 } else {
465 cb(null, { infoHash: infoHash, complete: 0, incomplete: 0 })
466 }
467 }), (err, results) => {
468 if (err) return cb(err)
469
470 const response = {
471 action: common.ACTIONS.SCRAPE,
472 files: {},
473 flags: { min_request_interval: Math.ceil(this.intervalMs / 1000) }
474 }
475
476 results.forEach(result => {
477 response.files[common.hexToBinary(result.infoHash)] = {
478 complete: result.complete || 0,
479 incomplete: result.incomplete || 0,
480 downloaded: result.complete || 0
481 }
482 })
483
484 cb(null, response)
485 })
486 }
487}
488
489function countPeers (filterFunction, allPeers) {
490 let count = 0
491 let key
492
493 for (key in allPeers) {
494 if (allPeers.hasOwnProperty(key) && filterFunction(allPeers[key])) {
495 count++
496 }
497 }
498
499 return count
500}
501
502function groupByClient (allPeers) {
503 const clients = {}
504 for (const key in allPeers) {
505 if (allPeers.hasOwnProperty(key)) {
506 const peer = allPeers[key]
507
508 if (!clients[peer.client.client]) {
509 clients[peer.client.client] = {}
510 }
511 const client = clients[peer.client.client]
512 // If the client is not known show 8 chars from peerId as version
513 const version = peer.client.version || new Buffer(peer.peerId, 'hex').toString().substring(0, 8)
514 if (!client[version]) {
515 client[version] = 0
516 }
517 client[version]++
518 }
519 }
520 return clients
521}
522
523function printClients (clients) {
524 let html = '<ul>\n'
525 for (const name in clients) {
526 if (clients.hasOwnProperty(name)) {
527 const client = clients[name]
528 for (const version in client) {
529 if (client.hasOwnProperty(version)) {
530 html += '<li><strong>' + name + '</strong> ' + version + ' : ' + client[version] + '</li>\n'
531 }
532 }
533 }
534 }
535 html += '</ul>\n'
536 return html
537}
538
539function isSeederOnly (peer) { return peer.seeder && peer.leecher === false }
540
541function isLeecherOnly (peer) { return peer.leecher && peer.seeder === false }
542
543function isSeederAndLeecher (peer) { return peer.seeder && peer.leecher }
544
545function isIPv4 (peer) { return peer.ipv4 }
546
547function isIPv6 (peer) { return peer.ipv6 }
548
549function toNumber (x) {
550 x = Number(x)
551 return x >= 0 ? x : false
552}
553
554function noop () {}
555
556module.exports = Server