UNPKG

3.79 kBJavaScriptView Raw
1'use strict'
2
3const diff = require('hyperdiff')
4const EventEmitter = require('events')
5const clone = require('lodash.clonedeep')
6
7const PROTOCOL = require('./protocol')
8const Connection = require('./connection')
9const encoding = require('./encoding')
10const directConnection = require('./direct-connection-handler')
11
12const DEFAULT_OPTIONS = {
13 pollInterval: 1000
14}
15
16let index = 0
17
18class PubSubRoom extends EventEmitter {
19 constructor (libp2p, topic, options) {
20 super()
21 this._libp2p = libp2p
22 this._topic = topic
23 this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
24 this._peers = []
25 this._connections = {}
26
27 this._handleDirectMessage = this._handleDirectMessage.bind(this)
28 this._handleMessage = this._onMessage.bind(this)
29
30 if (!this._libp2p.pubsub) {
31 throw new Error('pubsub has not been configured')
32 }
33
34 this._interval = setInterval(
35 this._pollPeers.bind(this),
36 this._options.pollInterval
37 )
38
39 this._libp2p.handle(PROTOCOL, directConnection.handler)
40 directConnection.emitter.on(this._topic, this._handleDirectMessage)
41
42 this._libp2p.pubsub.subscribe(this._topic, this._handleMessage)
43
44 this._idx = index++
45 }
46
47 getPeers () {
48 return this._peers.slice(0)
49 }
50
51 hasPeer (peer) {
52 return Boolean(this._peers.find(p => p.toString() === peer.toString()))
53 }
54
55 async leave () {
56 clearInterval(this._interval)
57 Object.keys(this._connections).forEach((peer) => {
58 this._connections[peer].stop()
59 })
60 directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
61 this._libp2p.unhandle(PROTOCOL, directConnection.handler)
62 await this._libp2p.pubsub.unsubscribe(this._topic, this._handleMessage)
63 }
64
65 async broadcast (_message) {
66 const message = encoding(_message)
67
68 await this._libp2p.pubsub.publish(this._topic, message)
69 }
70
71 sendTo (peer, message) {
72 let conn = this._connections[peer]
73 if (!conn) {
74 conn = new Connection(peer, this._libp2p, this)
75 conn.on('error', (err) => this.emit('error', err))
76 this._connections[peer] = conn
77
78 conn.once('disconnect', () => {
79 delete this._connections[peer]
80 this._peers = this._peers.filter((p) => p.toString() !== peer.toString())
81 this.emit('peer left', peer)
82 })
83 }
84
85 // We should use the same sequence number generation as js-libp2p-floosub does:
86 // const seqno = Buffer.from(utils.randomSeqno())
87
88 // Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
89 // generator, let's use 0 as the sequence number for all private messages
90 // const seqno = Buffer.from([0])
91 const seqno = Buffer.from([0])
92
93 const msg = {
94 to: peer,
95 from: this._libp2p.peerInfo.id.toB58String(),
96 data: Buffer.from(message).toString('hex'),
97 seqno: seqno.toString('hex'),
98 topicIDs: [this._topic],
99 topicCIDs: [this._topic]
100 }
101
102 conn.push(Buffer.from(JSON.stringify(msg)))
103 }
104
105 async _pollPeers () {
106 const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort()
107
108 if (this._emitChanges(newPeers)) {
109 this._peers = newPeers
110 }
111 }
112
113 _emitChanges (newPeers) {
114 const differences = diff(this._peers, newPeers)
115
116 differences.added.forEach((peer) => this.emit('peer joined', peer))
117 differences.removed.forEach((peer) => this.emit('peer left', peer))
118
119 return differences.added.length > 0 || differences.removed.length > 0
120 }
121
122 _onMessage (message) {
123 this.emit('message', message)
124 }
125
126 _handleDirectMessage (message) {
127 if (message.to.toString() === this._libp2p.peerInfo.id.toB58String()) {
128 const m = Object.assign({}, message)
129 delete m.to
130 this.emit('message', m)
131 }
132 }
133}
134
135module.exports = PubSubRoom