1 | 'use strict'
|
2 |
|
3 | const diff = require('hyperdiff')
|
4 | const EventEmitter = require('events')
|
5 | const clone = require('lodash.clonedeep')
|
6 |
|
7 | const PROTOCOL = require('./protocol')
|
8 | const Connection = require('./connection')
|
9 | const encoding = require('./encoding')
|
10 | const directConnection = require('./direct-connection-handler')
|
11 |
|
12 | const DEFAULT_OPTIONS = {
|
13 | pollInterval: 1000
|
14 | }
|
15 |
|
16 | let index = 0
|
17 |
|
18 | class PubSubRoom extends EventEmitter {
|
19 | constructor (libp2p, topic, options) {
|
20 | super()
|
21 | this._libp2p = libp2p
|
22 | this._topic = topic
|
23 | this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
|
24 | this._peers = []
|
25 | this._connections = {}
|
26 |
|
27 | this._handleDirectMessage = this._handleDirectMessage.bind(this)
|
28 | this._handleMessage = this._onMessage.bind(this)
|
29 |
|
30 | if (!this._libp2p.pubsub) {
|
31 | throw new Error('pubsub has not been configured')
|
32 | }
|
33 |
|
34 | this._interval = setInterval(
|
35 | this._pollPeers.bind(this),
|
36 | this._options.pollInterval
|
37 | )
|
38 |
|
39 | this._libp2p.handle(PROTOCOL, directConnection.handler)
|
40 | directConnection.emitter.on(this._topic, this._handleDirectMessage)
|
41 |
|
42 | this._libp2p.pubsub.subscribe(this._topic, this._handleMessage)
|
43 |
|
44 | this._idx = index++
|
45 | }
|
46 |
|
47 | getPeers () {
|
48 | return this._peers.slice(0)
|
49 | }
|
50 |
|
51 | hasPeer (peer) {
|
52 | return Boolean(this._peers.find(p => p.toString() === peer.toString()))
|
53 | }
|
54 |
|
55 | async leave () {
|
56 | clearInterval(this._interval)
|
57 | Object.keys(this._connections).forEach((peer) => {
|
58 | this._connections[peer].stop()
|
59 | })
|
60 | directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
|
61 | this._libp2p.unhandle(PROTOCOL, directConnection.handler)
|
62 | await this._libp2p.pubsub.unsubscribe(this._topic, this._handleMessage)
|
63 | }
|
64 |
|
65 | async broadcast (_message) {
|
66 | const message = encoding(_message)
|
67 |
|
68 | await this._libp2p.pubsub.publish(this._topic, message)
|
69 | }
|
70 |
|
71 | sendTo (peer, message) {
|
72 | let conn = this._connections[peer]
|
73 | if (!conn) {
|
74 | conn = new Connection(peer, this._libp2p, this)
|
75 | conn.on('error', (err) => this.emit('error', err))
|
76 | this._connections[peer] = conn
|
77 |
|
78 | conn.once('disconnect', () => {
|
79 | delete this._connections[peer]
|
80 | this._peers = this._peers.filter((p) => p.toString() !== peer.toString())
|
81 | this.emit('peer left', peer)
|
82 | })
|
83 | }
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 | const seqno = Buffer.from([0])
|
92 |
|
93 | const msg = {
|
94 | to: peer,
|
95 | from: this._libp2p.peerInfo.id.toB58String(),
|
96 | data: Buffer.from(message).toString('hex'),
|
97 | seqno: seqno.toString('hex'),
|
98 | topicIDs: [this._topic],
|
99 | topicCIDs: [this._topic]
|
100 | }
|
101 |
|
102 | conn.push(Buffer.from(JSON.stringify(msg)))
|
103 | }
|
104 |
|
105 | async _pollPeers () {
|
106 | const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort()
|
107 |
|
108 | if (this._emitChanges(newPeers)) {
|
109 | this._peers = newPeers
|
110 | }
|
111 | }
|
112 |
|
113 | _emitChanges (newPeers) {
|
114 | const differences = diff(this._peers, newPeers)
|
115 |
|
116 | differences.added.forEach((peer) => this.emit('peer joined', peer))
|
117 | differences.removed.forEach((peer) => this.emit('peer left', peer))
|
118 |
|
119 | return differences.added.length > 0 || differences.removed.length > 0
|
120 | }
|
121 |
|
122 | _onMessage (message) {
|
123 | this.emit('message', message)
|
124 | }
|
125 |
|
126 | _handleDirectMessage (message) {
|
127 | if (message.to.toString() === this._libp2p.peerInfo.id.toB58String()) {
|
128 | const m = Object.assign({}, message)
|
129 | delete m.to
|
130 | this.emit('message', m)
|
131 | }
|
132 | }
|
133 | }
|
134 |
|
135 | module.exports = PubSubRoom
|