1 | 'use strict'
|
2 |
|
3 | const diff = require('hyperdiff')
|
4 | const EventEmitter = require('events')
|
5 | const timers = require('timers')
|
6 | const clone = require('lodash.clonedeep')
|
7 |
|
8 | const PROTOCOL = require('./protocol')
|
9 | const Connection = require('./connection')
|
10 | const encoding = require('./encoding')
|
11 | const directConnection = require('./direct-connection-handler')
|
12 | const libp2p = require('./libp2p')
|
13 |
|
14 | const DEFAULT_OPTIONS = {
|
15 | pollInterval: 1000
|
16 | }
|
17 |
|
18 | module.exports = (ipfs, topic, options) => {
|
19 | return new PubSubRoom(ipfs, topic, options)
|
20 | }
|
21 |
|
22 | class PubSubRoom extends EventEmitter {
|
23 | constructor (ipfs, topic, options) {
|
24 | super()
|
25 | this._ipfs = ipfs
|
26 | this._topic = topic
|
27 | this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
|
28 | this._peers = []
|
29 | this._connections = {}
|
30 |
|
31 | this._handleDirectMessage = this._handleDirectMessage.bind(this)
|
32 |
|
33 | if (!this._ipfs.pubsub) {
|
34 | throw new Error('This IPFS node does not have pubsub.')
|
35 | }
|
36 |
|
37 | if (this._ipfs.isOnline()) {
|
38 | this._start()
|
39 | } else {
|
40 | this._ipfs.on('ready', this._start.bind(this))
|
41 | }
|
42 |
|
43 | this._ipfs.on('stop', this.leave.bind(this))
|
44 | }
|
45 |
|
46 | getPeers () {
|
47 | return this._peers.slice(0)
|
48 | }
|
49 |
|
50 | hasPeer (peer) {
|
51 | return this._peers.indexOf(peer) >= 0
|
52 | }
|
53 |
|
54 | leave () {
|
55 | return new Promise((resolve, reject) => {
|
56 | timers.clearInterval(this._interval)
|
57 | Object.keys(this._connections).forEach((peer) => {
|
58 | this._connections[peer].stop()
|
59 | })
|
60 | directConnection.emitter.removeListener(this._topic, this._handleDirectMessage)
|
61 | this.once('stopped', () => resolve())
|
62 | this.emit('stopping')
|
63 | })
|
64 | }
|
65 |
|
66 | broadcast (_message) {
|
67 | let message = encoding(_message)
|
68 |
|
69 | this._ipfs.pubsub.publish(this._topic, message, (err) => {
|
70 | if (err) {
|
71 | this.emit('error', err)
|
72 | }
|
73 | })
|
74 | }
|
75 |
|
76 | sendTo (peer, message) {
|
77 | let conn = this._connections[peer]
|
78 | if (!conn) {
|
79 | conn = new Connection(peer, this._ipfs, this)
|
80 | conn.on('error', (err) => this.emit('error', err))
|
81 | this._connections[peer] = conn
|
82 |
|
83 | conn.once('disconnect', () => {
|
84 | delete this._connections[peer]
|
85 | this._peers = this._peers.filter((p) => p !== peer)
|
86 | this.emit('peer left', peer)
|
87 | })
|
88 | }
|
89 |
|
90 |
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 | const seqno = Buffer.from([0])
|
97 |
|
98 | const msg = {
|
99 | to: peer,
|
100 | from: this._ipfs._peerInfo.id.toB58String(),
|
101 | data: Buffer.from(message).toString('hex'),
|
102 | seqno: seqno.toString('hex'),
|
103 | topicIDs: [ this._topic ],
|
104 | topicCIDs: [ this._topic ]
|
105 | }
|
106 |
|
107 | conn.push(Buffer.from(JSON.stringify(msg)))
|
108 | }
|
109 |
|
110 | _start () {
|
111 | this._interval = timers.setInterval(
|
112 | this._pollPeers.bind(this),
|
113 | this._options.pollInterval)
|
114 |
|
115 | const listener = this._onMessage.bind(this)
|
116 | this._ipfs.pubsub.subscribe(this._topic, listener, {}, (err) => {
|
117 | if (err) {
|
118 | this.emit('error', err)
|
119 | } else {
|
120 | this.emit('subscribed', this._topic)
|
121 | }
|
122 | })
|
123 |
|
124 | this.once('stopping', () => {
|
125 | this._ipfs.pubsub.unsubscribe(this._topic, listener, (err) => {
|
126 | if (err) {
|
127 | this.emit('error', err)
|
128 | } else {
|
129 | this.emit('stopped')
|
130 | }
|
131 | })
|
132 | })
|
133 |
|
134 | libp2p(this._ipfs).handle(PROTOCOL, directConnection.handler)
|
135 |
|
136 | directConnection.emitter.on(this._topic, this._handleDirectMessage)
|
137 | }
|
138 |
|
139 | _pollPeers () {
|
140 | this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => {
|
141 | if (err) {
|
142 | this.emit('error', err)
|
143 | return
|
144 | }
|
145 |
|
146 | const newPeers = _newPeers.sort()
|
147 |
|
148 | if (this._emitChanges(newPeers)) {
|
149 | this._peers = newPeers
|
150 | }
|
151 | })
|
152 | }
|
153 |
|
154 | _emitChanges (newPeers) {
|
155 | const differences = diff(this._peers, newPeers)
|
156 |
|
157 | differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer))
|
158 | differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer))
|
159 |
|
160 | return differences.added.length > 0 || differences.removed.length > 0
|
161 | }
|
162 |
|
163 | _onMessage (message) {
|
164 | this.emit('message', message)
|
165 | }
|
166 |
|
167 | _handleDirectMessage (message) {
|
168 | if (message.to === this._ipfs._peerInfo.id.toB58String()) {
|
169 | const m = Object.assign({}, message)
|
170 | delete m.to
|
171 | this.emit('message', m)
|
172 | }
|
173 | }
|
174 | }
|