1 | 'use strict'
|
2 |
|
3 | const diff = require('hyperdiff')
|
4 | const EventEmitter = require('events')
|
5 | const timers = require('timers')
|
6 | const clone = require('lodash.clonedeep')
|
7 | const Buffer = require('safe-buffer').Buffer
|
8 |
|
9 | const PROTOCOL = require('./protocol')
|
10 | const Connection = require('./connection')
|
11 | const encoding = require('./encoding')
|
12 | const directConnection = require('./direct-connection-handler')
|
13 |
|
14 | const DEFAULT_OPTIONS = {
|
15 | pollInterval: 1000
|
16 | }
|
17 |
|
18 | module.exports = (ipfs, topic, options) => {
|
19 | return new PubSubRoom(ipfs, topic, options)
|
20 | }
|
21 |
|
22 | class PubSubRoom extends EventEmitter {
|
23 | constructor (ipfs, topic, options) {
|
24 | super()
|
25 | this._ipfs = ipfs
|
26 | this._topic = topic
|
27 | this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
|
28 | this._peers = []
|
29 | this._connections = {}
|
30 |
|
31 | this._handleDirectMessage = this._handleDirectMessage.bind(this)
|
32 |
|
33 | if (!this._ipfs.pubsub) {
|
34 | throw new Error('This IPFS node does not have pubsub.')
|
35 | }
|
36 |
|
37 | if (this._ipfs.isOnline()) {
|
38 | this._start()
|
39 | } else {
|
40 | this._ipfs.on('ready', this._start.bind(this))
|
41 | }
|
42 |
|
43 | this._ipfs.on('stop', this.leave.bind(this))
|
44 | }
|
45 |
|
46 | getPeers () {
|
47 | return this._peers.slice(0)
|
48 | }
|
49 |
|
50 | hasPeer (peer) {
|
51 | return this._peers.indexOf(peer) >= 0
|
52 | }
|
53 |
|
54 | leave () {
|
55 | timers.clearInterval()
|
56 | Object.keys(this._connections).forEach((peer) => {
|
57 | this._connections[peer].stop()
|
58 | })
|
59 | directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
|
60 | this.emit('stop')
|
61 | }
|
62 |
|
63 | broadcast (_message) {
|
64 | let message = encoding(_message)
|
65 | this._ipfs.pubsub.publish(this._topic, message, (err) => {
|
66 | if (err) {
|
67 | this.emit('error', err)
|
68 | }
|
69 | })
|
70 | }
|
71 |
|
72 | sendTo (peer, message) {
|
73 | let conn = this._connections[peer]
|
74 | if (!conn) {
|
75 | conn = new Connection(peer, this._ipfs, this)
|
76 | conn.on('error', (err) => this.emit('error', err))
|
77 | this._connections[peer] = conn
|
78 |
|
79 | conn.once('disconnect', () => {
|
80 | delete this._connections[peer]
|
81 | this._peers = this._peers.filter((p) => p !== peer)
|
82 | this.emit('peer left', peer)
|
83 | })
|
84 | }
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 |
|
92 | const seqno = Buffer.from([0])
|
93 |
|
94 | const msg = {
|
95 | to: peer,
|
96 | from: this._ipfs._peerInfo.id.toB58String(),
|
97 | data: Buffer.from(message).toString('hex'),
|
98 | seqno: seqno.toString('hex'),
|
99 | topicIDs: [ this._topic ],
|
100 | topicCIDs: [ this._topic ]
|
101 | }
|
102 |
|
103 | conn.push(Buffer.from(JSON.stringify(msg)))
|
104 | }
|
105 |
|
106 | _start () {
|
107 | this._interval = timers.setInterval(
|
108 | this._pollPeers.bind(this),
|
109 | this._options.pollInterval)
|
110 |
|
111 | const listener = this._onMessage.bind(this)
|
112 | this._ipfs.pubsub.subscribe(this._topic, listener, (err) => {
|
113 | if (err) {
|
114 | this.emit('error', err)
|
115 | } else {
|
116 | this.emit('subscribed', this._topic)
|
117 | }
|
118 | })
|
119 |
|
120 | this.once('stop', () => {
|
121 | this._ipfs.pubsub.unsubscribe(this._topic, listener)
|
122 | })
|
123 |
|
124 | this._ipfs._libp2pNode.handle(PROTOCOL, directConnection.handler)
|
125 |
|
126 | directConnection.emitter.on(this._topic, this._handleDirectMessage)
|
127 | }
|
128 |
|
129 | _pollPeers () {
|
130 | this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => {
|
131 | if (err) {
|
132 | this.emit('error', err)
|
133 | return
|
134 | }
|
135 |
|
136 | const newPeers = _newPeers.sort()
|
137 |
|
138 | if (this._emitChanges(newPeers)) {
|
139 | this._peers = newPeers
|
140 | }
|
141 | })
|
142 | }
|
143 |
|
144 | _emitChanges (newPeers) {
|
145 | const differences = diff(this._peers, newPeers)
|
146 |
|
147 | differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer))
|
148 | differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer))
|
149 |
|
150 | return differences.added.length > 0 || differences.removed.length > 0
|
151 | }
|
152 |
|
153 | _onMessage (message) {
|
154 | this.emit('message', message)
|
155 | }
|
156 |
|
157 | _handleDirectMessage (message) {
|
158 | if (message.to === this._ipfs._peerInfo.id.toB58String()) {
|
159 | const m = Object.assign({}, message)
|
160 | delete m.to
|
161 | this.emit('message', m)
|
162 | }
|
163 | }
|
164 | }
|