UNPKG

1.99 kBJavaScriptView Raw
1'use strict'
2
3const EventEmitter = require('events')
4const pull = require('pull-stream')
5const Pushable = require('pull-pushable')
6
7const PROTOCOL = require('./protocol')
8const encoding = require('./encoding')
9const getPeerId = require('./peer-id')
10const libp2p = require('./libp2p')
11
12module.exports = class Connection extends EventEmitter {
13 constructor (id, ipfs, room) {
14 super()
15 this._id = id
16 this._ipfs = ipfs
17 this._room = room
18 this._connection = null
19 this._connecting = false
20 }
21
22 push (message) {
23 if (this._connection) {
24 this._connection.push(encoding(message))
25 } else {
26 this.once('connect', () => this.push(message))
27 if (!this._connecting) {
28 this._getConnection()
29 }
30 }
31 }
32
33 stop () {
34 if (this._connection) {
35 this._connection.end()
36 }
37 }
38
39 _getConnection () {
40 this._connecting = true
41 this._getPeerAddresses(this._id, (err, peerAddresses) => {
42 if (err) {
43 this.emit('error', err)
44 return // early
45 }
46
47 if (!peerAddresses.length) {
48 this.emit('disconnect')
49 return // early
50 }
51
52 libp2p(this._ipfs).dialProtocol(peerAddresses[0], PROTOCOL, (err, conn) => {
53 if (err) {
54 this.emit('disconnect')
55 return // early
56 }
57 this._connecting = false
58 const pushable = Pushable()
59 this._connection = pushable
60 pull(
61 pushable,
62 conn,
63 pull.onEnd(() => {
64 delete this._connection
65 this.emit('disconnect')
66 })
67 )
68 this.emit('connect', pushable)
69 })
70 })
71 }
72
73 _getPeerAddresses (peerId, callback) {
74 this._ipfs.swarm.peers((err, peersAddresses) => {
75 if (err) {
76 callback(err)
77 return // early
78 }
79
80 callback(
81 null,
82 peersAddresses
83 .filter((peerAddress) => getPeerId(peerAddress.peer) === peerId)
84 .map(peerAddress => peerAddress.peer))
85 })
86 }
87}