UNPKG

4.31 kBJavaScriptView Raw
1'use strict'
2
3const diff = require('hyperdiff')
4const EventEmitter = require('events')
5const timers = require('timers')
6const clone = require('lodash.clonedeep')
7const Buffer = require('safe-buffer').Buffer
8
9const PROTOCOL = require('./protocol')
10const Connection = require('./connection')
11const encoding = require('./encoding')
12const directConnection = require('./direct-connection-handler')
13
14const DEFAULT_OPTIONS = {
15 pollInterval: 1000
16}
17
18module.exports = (ipfs, topic, options) => {
19 return new PubSubRoom(ipfs, topic, options)
20}
21
22class PubSubRoom extends EventEmitter {
23 constructor (ipfs, topic, options) {
24 super()
25 this._ipfs = ipfs
26 this._topic = topic
27 this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
28 this._peers = []
29 this._connections = {}
30
31 this._handleDirectMessage = this._handleDirectMessage.bind(this)
32
33 if (!this._ipfs.pubsub) {
34 throw new Error('This IPFS node does not have pubsub.')
35 }
36
37 if (this._ipfs.isOnline()) {
38 this._start()
39 } else {
40 this._ipfs.on('ready', this._start.bind(this))
41 }
42
43 this._ipfs.on('stop', this.leave.bind(this))
44 }
45
46 getPeers () {
47 return this._peers.slice(0)
48 }
49
50 hasPeer (peer) {
51 return this._peers.indexOf(peer) >= 0
52 }
53
54 leave () {
55 timers.clearInterval()
56 Object.keys(this._connections).forEach((peer) => {
57 this._connections[peer].stop()
58 })
59 directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
60 this.emit('stop')
61 }
62
63 broadcast (_message) {
64 let message = encoding(_message)
65 this._ipfs.pubsub.publish(this._topic, message, (err) => {
66 if (err) {
67 this.emit('error', err)
68 }
69 })
70 }
71
72 sendTo (peer, message) {
73 let conn = this._connections[peer]
74 if (!conn) {
75 conn = new Connection(peer, this._ipfs, this)
76 conn.on('error', (err) => this.emit('error', err))
77 this._connections[peer] = conn
78
79 conn.once('disconnect', () => {
80 delete this._connections[peer]
81 this._peers = this._peers.filter((p) => p !== peer)
82 this.emit('peer left', peer)
83 })
84 }
85
86 // We should use the same sequence number generation as js-libp2p-floosub does:
87 // const seqno = Buffer.from(utils.randomSeqno())
88
89 // Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
90 // generator, let's use 0 as the sequence number for all private messages
91 // const seqno = Buffer.from([0])
92 const seqno = Buffer.from([0])
93
94 const msg = {
95 to: peer,
96 from: this._ipfs._peerInfo.id.toB58String(),
97 data: Buffer.from(message).toString('hex'),
98 seqno: seqno.toString('hex'),
99 topicIDs: [ this._topic ],
100 topicCIDs: [ this._topic ]
101 }
102
103 conn.push(Buffer.from(JSON.stringify(msg)))
104 }
105
106 _start () {
107 this._interval = timers.setInterval(
108 this._pollPeers.bind(this),
109 this._options.pollInterval)
110
111 const listener = this._onMessage.bind(this)
112 this._ipfs.pubsub.subscribe(this._topic, listener, (err) => {
113 if (err) {
114 this.emit('error', err)
115 } else {
116 this.emit('subscribed', this._topic)
117 }
118 })
119
120 this.once('stop', () => {
121 this._ipfs.pubsub.unsubscribe(this._topic, listener)
122 })
123
124 this._ipfs._libp2pNode.handle(PROTOCOL, directConnection.handler)
125
126 directConnection.emitter.on(this._topic, this._handleDirectMessage)
127 }
128
129 _pollPeers () {
130 this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => {
131 if (err) {
132 this.emit('error', err)
133 return // early
134 }
135
136 const newPeers = _newPeers.sort()
137
138 if (this._emitChanges(newPeers)) {
139 this._peers = newPeers
140 }
141 })
142 }
143
144 _emitChanges (newPeers) {
145 const differences = diff(this._peers, newPeers)
146
147 differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer))
148 differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer))
149
150 return differences.added.length > 0 || differences.removed.length > 0
151 }
152
153 _onMessage (message) {
154 this.emit('message', message)
155 }
156
157 _handleDirectMessage (message) {
158 if (message.to === this._ipfs._peerInfo.id.toB58String()) {
159 const m = Object.assign({}, message)
160 delete m.to
161 this.emit('message', m)
162 }
163 }
164}