UNPKG

4.56 kBJavaScriptView Raw
1'use strict'
2
3const diff = require('hyperdiff')
4const EventEmitter = require('events')
5const timers = require('timers')
6const clone = require('lodash.clonedeep')
7
8const PROTOCOL = require('./protocol')
9const Connection = require('./connection')
10const encoding = require('./encoding')
11const directConnection = require('./direct-connection-handler')
12const libp2p = require('./libp2p')
13
14const DEFAULT_OPTIONS = {
15 pollInterval: 1000
16}
17
18module.exports = (ipfs, topic, options) => {
19 return new PubSubRoom(ipfs, topic, options)
20}
21
22class PubSubRoom extends EventEmitter {
23 constructor (ipfs, topic, options) {
24 super()
25 this._ipfs = ipfs
26 this._topic = topic
27 this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
28 this._peers = []
29 this._connections = {}
30
31 this._handleDirectMessage = this._handleDirectMessage.bind(this)
32
33 if (!this._ipfs.pubsub) {
34 throw new Error('This IPFS node does not have pubsub.')
35 }
36
37 if (this._ipfs.isOnline()) {
38 this._start()
39 } else {
40 this._ipfs.on('ready', this._start.bind(this))
41 }
42
43 this._ipfs.on('stop', this.leave.bind(this))
44 }
45
46 getPeers () {
47 return this._peers.slice(0)
48 }
49
50 hasPeer (peer) {
51 return this._peers.indexOf(peer) >= 0
52 }
53
54 leave () {
55 return new Promise((resolve, reject) => {
56 timers.clearInterval(this._interval)
57 Object.keys(this._connections).forEach((peer) => {
58 this._connections[peer].stop()
59 })
60 directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
61 this.once('stopped', () => resolve())
62 this.emit('stopping')
63 })
64 }
65
66 broadcast (_message) {
67 let message = encoding(_message)
68
69 this._ipfs.pubsub.publish(this._topic, message, (err) => {
70 if (err) {
71 this.emit('error', err)
72 }
73 })
74 }
75
76 sendTo (peer, message) {
77 let conn = this._connections[peer]
78 if (!conn) {
79 conn = new Connection(peer, this._ipfs, this)
80 conn.on('error', (err) => this.emit('error', err))
81 this._connections[peer] = conn
82
83 conn.once('disconnect', () => {
84 delete this._connections[peer]
85 this._peers = this._peers.filter((p) => p !== peer)
86 this.emit('peer left', peer)
87 })
88 }
89
90 // We should use the same sequence number generation as js-libp2p-floosub does:
91 // const seqno = Buffer.from(utils.randomSeqno())
92
93 // Until we figure out a good way to bring in the js-libp2p-floosub's randomSeqno
94 // generator, let's use 0 as the sequence number for all private messages
95 // const seqno = Buffer.from([0])
96 const seqno = Buffer.from([0])
97
98 const msg = {
99 to: peer,
100 from: this._ipfs._peerInfo.id.toB58String(),
101 data: Buffer.from(message).toString('hex'),
102 seqno: seqno.toString('hex'),
103 topicIDs: [ this._topic ],
104 topicCIDs: [ this._topic ]
105 }
106
107 conn.push(Buffer.from(JSON.stringify(msg)))
108 }
109
110 _start () {
111 this._interval = timers.setInterval(
112 this._pollPeers.bind(this),
113 this._options.pollInterval)
114
115 const listener = this._onMessage.bind(this)
116 this._ipfs.pubsub.subscribe(this._topic, listener, {}, (err) => {
117 if (err) {
118 this.emit('error', err)
119 } else {
120 this.emit('subscribed', this._topic)
121 }
122 })
123
124 this.once('stopping', () => {
125 this._ipfs.pubsub.unsubscribe(this._topic, listener, (err) => {
126 if (err) {
127 this.emit('error', err)
128 } else {
129 this.emit('stopped')
130 }
131 })
132 })
133
134 libp2p(this._ipfs).handle(PROTOCOL, directConnection.handler)
135
136 directConnection.emitter.on(this._topic, this._handleDirectMessage)
137 }
138
139 _pollPeers () {
140 this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => {
141 if (err) {
142 this.emit('error', err)
143 return // early
144 }
145
146 const newPeers = _newPeers.sort()
147
148 if (this._emitChanges(newPeers)) {
149 this._peers = newPeers
150 }
151 })
152 }
153
154 _emitChanges (newPeers) {
155 const differences = diff(this._peers, newPeers)
156
157 differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer))
158 differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer))
159
160 return differences.added.length > 0 || differences.removed.length > 0
161 }
162
163 _onMessage (message) {
164 this.emit('message', message)
165 }
166
167 _handleDirectMessage (message) {
168 if (message.to === this._ipfs._peerInfo.id.toB58String()) {
169 const m = Object.assign({}, message)
170 delete m.to
171 this.emit('message', m)
172 }
173 }
174}