UNPKG

4.05 kBJavaScriptView Raw
1import diff from 'hyperdiff'
2import EventEmitter from 'events'
3import clone from 'lodash.clonedeep'
4import Connection from './connection.js'
5import encoding from './encoding.js'
6import * as directConnection from './direct-connection-handler.js'
7import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
8import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
9
10const DEFAULT_OPTIONS = {
11 pollInterval: 1000
12}
13
14let index = 0
15
16export default class PubSubRoom extends EventEmitter {
17 constructor (libp2p, topic, options) {
18 super()
19 this._libp2p = libp2p.libp2p || libp2p
20 this._topic = topic
21 this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
22 this._peers = []
23 this._connections = {}
24
25 this._handleDirectMessage = this._handleDirectMessage.bind(this)
26 this._handleMessage = this._onMessage.bind(this)
27
28 if (!this._libp2p.pubsub) {
29 throw new Error('pubsub has not been configured')
30 }
31
32 this._interval = setInterval(
33 this._pollPeers.bind(this),
34 this._options.pollInterval
35 )
36
37 directConnection.handle(libp2p)
38 directConnection.emitter.on(this._topic, this._handleDirectMessage)
39
40 this._libp2p.pubsub.subscribe(this._topic)
41 this._libp2p.pubsub.addEventListener('message', this._handleMessage)
42
43 this._idx = index++
44 }
45
46 getPeers () {
47 return this._peers.slice(0)
48 }
49
50 hasPeer (peer) {
51 return Boolean(this._peers.find(p => p.toString() === peer.toString()))
52 }
53
54 async leave () {
55 clearInterval(this._interval)
56 Object.keys(this._connections).forEach((peer) => {
57 this._connections[peer].stop()
58 })
59 directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
60 // directConnection.unhandle(this._libp2p)
61 await this._libp2p.pubsub.unsubscribe(this._topic)
62 this._libp2p.pubsub.removeEventListener('message', this._handleMessage)
63 }
64
65 async broadcast (_message) {
66 const message = encoding(_message)
67 await this._libp2p.pubsub.publish(this._topic, message)
68 }
69
70 sendTo (peer, message) {
71 let conn = this._connections[peer]
72 if (!conn) {
73 conn = new Connection(peer, this._libp2p, this)
74 conn.on('error', (err) => this.emit('error', err))
75 this._connections[peer] = conn
76
77 conn.once('disconnect', () => {
78 delete this._connections[peer]
79 this._peers = this._peers.filter((p) => p.toString() !== peer.toString())
80 this.emit('peer left', peer)
81 })
82 }
83
84 // We should use the same sequence number generation as js-libp2p-floosub does:
85 // const seqno = Uint8Array.from(utils.randomSeqno())
86
87 // Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
88 // generator, let's use 0 as the sequence number for all private messages
89 const seqno = 0n
90
91 const msg = {
92 to: peer,
93 from: this._libp2p.peerId.toString(),
94 data: uint8ArrayToString(uint8ArrayFromString(message), 'hex'),
95 seqno: seqno.toString(),
96 topic: this._topic
97 }
98
99 conn.push(uint8ArrayFromString(JSON.stringify(msg)))
100 }
101
102 async _pollPeers () {
103 const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort()
104
105 if (this._emitChanges(newPeers)) {
106 this._peers = newPeers
107 }
108 }
109
110 _emitChanges (newPeers) {
111 const differences = diff(this._peers.map(p => p.toString()), newPeers.map(p => p.toString()))
112
113 differences.added.forEach((peer) => this.emit('peer joined', peer))
114 differences.removed.forEach((peer) => this.emit('peer left', peer))
115
116 return differences.added.length > 0 || differences.removed.length > 0
117 }
118
119 _onMessage (event) {
120 const message = event.detail
121
122 if (message.topic === this._topic) {
123 this.emit('message', message)
124 }
125 }
126
127 _handleDirectMessage (message) {
128 if (message.to.toString() !== this._libp2p.peerId.toString()) {
129 return
130 }
131
132 if (message.topic === this._topic) {
133 const m = Object.assign({}, message)
134 delete m.to
135 this.emit('message', m)
136 }
137 }
138}