UNPKG

1.97 kBJavaScriptView Raw
1'use strict'
2
3const EventEmitter = require('events')
4const pull = require('pull-stream')
5const Pushable = require('pull-pushable')
6
7const PROTOCOL = require('./protocol')
8const encoding = require('./encoding')
9
10module.exports = class Connection extends EventEmitter {
11 constructor (id, ipfs, room) {
12 super()
13 this._id = id
14 this._ipfs = ipfs
15 this._room = room
16 this._connection = null
17 this._connecting = false
18 }
19
20 push (message) {
21 if (this._connection) {
22 this._connection.push(encoding(message))
23 } else {
24 if (!this._connecting) {
25 this.once('connect', () => this.push(message))
26 this._getConnection()
27 }
28 }
29 }
30
31 stop () {
32 if (this._connection) {
33 this._connection.end()
34 }
35 }
36
37 _getConnection () {
38 this._connecting = true
39 this._getPeerAddresses(this._id, (err, peerAddresses) => {
40 if (err) {
41 this.emit('error', err)
42 return // early
43 }
44
45 if (!peerAddresses.length) {
46 this.emit('error', new Error('could not connect to ' + this._id))
47 return // early
48 }
49
50 this._ipfs._libp2pNode.dial(peerAddresses[0], PROTOCOL, (err, conn) => {
51 if (err) {
52 this.emit('error', err)
53 return // early
54 }
55 this._connecting = false
56 const pushable = Pushable()
57 this._connection = pushable
58 pull(
59 pushable,
60 conn,
61 pull.onEnd(() => {
62 delete this._connection
63 this.emit('disconnect')
64 })
65 )
66 this.emit('connect', pushable)
67 })
68 })
69 }
70
71 _getPeerAddresses (peerId, callback) {
72 this._ipfs.swarm.peers((err, peersAddresses) => {
73 if (err) {
74 callback(err)
75 return // early
76 }
77
78 callback(
79 null,
80 peersAddresses
81 .filter((peerAddress) => peerAddress.peer.id.toB58String() === peerId)
82 .map(peerAddress => peerAddress.peer))
83 })
84 }
85}