1 | import diff from 'hyperdiff'
|
2 | import EventEmitter from 'events'
|
3 | import clone from 'lodash.clonedeep'
|
4 | import Connection from './connection.js'
|
5 | import encoding from './encoding.js'
|
6 | import * as directConnection from './direct-connection-handler.js'
|
7 | import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
8 | import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
9 |
|
10 | const DEFAULT_OPTIONS = {
|
11 | pollInterval: 1000
|
12 | }
|
13 |
|
14 | let index = 0
|
15 |
|
16 | export default class PubSubRoom extends EventEmitter {
|
17 | constructor (libp2p, topic, options) {
|
18 | super()
|
19 | this._libp2p = libp2p.libp2p || libp2p
|
20 | this._topic = topic
|
21 | this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
|
22 | this._peers = []
|
23 | this._connections = {}
|
24 |
|
25 | this._handleDirectMessage = this._handleDirectMessage.bind(this)
|
26 | this._handleMessage = this._onMessage.bind(this)
|
27 |
|
28 | if (!this._libp2p.pubsub) {
|
29 | throw new Error('pubsub has not been configured')
|
30 | }
|
31 |
|
32 | this._interval = setInterval(
|
33 | this._pollPeers.bind(this),
|
34 | this._options.pollInterval
|
35 | )
|
36 |
|
37 | directConnection.handle(libp2p)
|
38 | directConnection.emitter.on(this._topic, this._handleDirectMessage)
|
39 |
|
40 | this._libp2p.pubsub.subscribe(this._topic)
|
41 | this._libp2p.pubsub.addEventListener('message', this._handleMessage)
|
42 |
|
43 | this._idx = index++
|
44 | }
|
45 |
|
46 | getPeers () {
|
47 | return this._peers.slice(0)
|
48 | }
|
49 |
|
50 | hasPeer (peer) {
|
51 | return Boolean(this._peers.find(p => p.toString() === peer.toString()))
|
52 | }
|
53 |
|
54 | async leave () {
|
55 | clearInterval(this._interval)
|
56 | Object.keys(this._connections).forEach((peer) => {
|
57 | this._connections[peer].stop()
|
58 | })
|
59 | directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
|
60 |
|
61 | await this._libp2p.pubsub.unsubscribe(this._topic)
|
62 | this._libp2p.pubsub.removeEventListener('message', this._handleMessage)
|
63 | }
|
64 |
|
65 | async broadcast (_message) {
|
66 | const message = encoding(_message)
|
67 | await this._libp2p.pubsub.publish(this._topic, message)
|
68 | }
|
69 |
|
70 | sendTo (peer, message) {
|
71 | let conn = this._connections[peer]
|
72 | if (!conn) {
|
73 | conn = new Connection(peer, this._libp2p, this)
|
74 | conn.on('error', (err) => this.emit('error', err))
|
75 | this._connections[peer] = conn
|
76 |
|
77 | conn.once('disconnect', () => {
|
78 | delete this._connections[peer]
|
79 | this._peers = this._peers.filter((p) => p.toString() !== peer.toString())
|
80 | this.emit('peer left', peer)
|
81 | })
|
82 | }
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 | const seqno = 0n
|
90 |
|
91 | const msg = {
|
92 | to: peer,
|
93 | from: this._libp2p.peerId.toString(),
|
94 | data: uint8ArrayToString(uint8ArrayFromString(message), 'hex'),
|
95 | seqno: seqno.toString(),
|
96 | topic: this._topic
|
97 | }
|
98 |
|
99 | conn.push(uint8ArrayFromString(JSON.stringify(msg)))
|
100 | }
|
101 |
|
102 | async _pollPeers () {
|
103 | const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort()
|
104 |
|
105 | if (this._emitChanges(newPeers)) {
|
106 | this._peers = newPeers
|
107 | }
|
108 | }
|
109 |
|
110 | _emitChanges (newPeers) {
|
111 | const differences = diff(this._peers.map(p => p.toString()), newPeers.map(p => p.toString()))
|
112 |
|
113 | differences.added.forEach((peer) => this.emit('peer joined', peer))
|
114 | differences.removed.forEach((peer) => this.emit('peer left', peer))
|
115 |
|
116 | return differences.added.length > 0 || differences.removed.length > 0
|
117 | }
|
118 |
|
119 | _onMessage (event) {
|
120 | const message = event.detail
|
121 |
|
122 | if (message.topic === this._topic) {
|
123 | this.emit('message', message)
|
124 | }
|
125 | }
|
126 |
|
127 | _handleDirectMessage (message) {
|
128 | if (message.to.toString() !== this._libp2p.peerId.toString()) {
|
129 | return
|
130 | }
|
131 |
|
132 | if (message.topic === this._topic) {
|
133 | const m = Object.assign({}, message)
|
134 | delete m.to
|
135 | this.emit('message', m)
|
136 | }
|
137 | }
|
138 | }
|