UNPKG

3.58 kBJavaScriptView Raw
1/* eslint-env mocha */
2/* eslint max-nested-callbacks: ["error", 5] */
3'use strict'
4
5const chai = require('chai')
6chai.use(require('dirty-chai'))
7const expect = chai.expect
8
9const delay = require('delay')
10
11const PubSubRoom = require('../')
12const createLibp2p = require('./utils/create-libp2p')
13
14const topic = 'pubsub-room-concurrency-test-' + Date.now() + '-' + Math.random()
15
16describe('concurrent rooms', function () {
17 this.timeout(30000)
18 let node1, node2
19 let id1, id2
20 let room1A, room1B, room2A, room2B
21 const topicA = topic + '-A'
22 const topicB = topic + '-B'
23
24 before(async () => {
25 node1 = await createLibp2p()
26 id1 = node1.peerInfo.id.toB58String()
27 })
28
29 before(async () => {
30 node2 = await createLibp2p(node1)
31 id2 = node2.peerInfo.id.toB58String()
32 })
33
34 after(() => {
35 return Promise.all([
36 room1A.leave(),
37 room1B.leave(),
38 room2A.leave(),
39 room2B.leave()
40 ])
41 })
42
43 after(() => {
44 return Promise.all([
45 node1.stop(),
46 node2.stop()
47 ])
48 })
49
50 it('can create a room, and they find each other', async () => {
51 room1A = new PubSubRoom(node1, topicA)
52 room2A = new PubSubRoom(node2, topicA)
53 room1B = new PubSubRoom(node1, topicB)
54 room2B = new PubSubRoom(node2, topicB)
55
56 const roomNodes = [
57 [room1A, id2],
58 [room2A, id1],
59 [room1B, id2],
60 [room2A, id1]
61 ]
62
63 await Promise.all(
64 roomNodes.map(async (roomNode) => {
65 const room = roomNode[0]
66 const waitingFor = roomNode[1]
67
68 await new Promise((resolve) => {
69 room.once('peer joined', (peer) => {
70 expect(peer).to.equal(waitingFor)
71 resolve()
72 })
73 })
74 })
75 )
76 })
77
78 it('has peer', (done) => {
79 expect(room1A.getPeers()).to.deep.equal([id2])
80 expect(room1B.getPeers()).to.deep.equal([id2])
81 expect(room2A.getPeers()).to.deep.equal([id1])
82 expect(room2B.getPeers()).to.deep.equal([id1])
83 done()
84 })
85
86 it('can broadcast', (done) => {
87 let gotMessage = false
88 const crash = Crash('no broadcast message should leak to room B')
89 room1B.on('message', crash)
90 room1A.once('message', (message) => {
91 if (gotMessage) {
92 throw new Error('double message')
93 }
94 gotMessage = true
95 expect(message.from.toString()).to.equal(id2.toString())
96 expect(message.data.toString()).to.equal('message 1')
97
98 room1B.removeListener('message', crash)
99 done()
100 })
101 room2A.broadcast('message 1')
102 })
103
104 it('can send private message', (done) => {
105 const crash = Crash('no private message should leak to room B')
106
107 room2B.on('message', crash)
108 room2A.once('message', (message) => {
109 expect(message.from.toString()).to.equal(id1.toString())
110 expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString())
111 expect(message.topicIDs).to.deep.equal([topicA])
112 expect(message.topicCIDs).to.deep.equal([topicA])
113 expect(message.data.toString()).to.equal('message 2')
114 room2B.removeListener('message', crash)
115 done()
116 })
117 room1A.sendTo(id2, 'message 2')
118 })
119
120 it('can leave room', (done) => {
121 room1A.once('peer left', (peer) => {
122 expect(peer.toString()).to.equal(id2.toString())
123 done()
124 })
125 room2A.leave()
126 })
127
128 it('after leaving, it does not receive more messages', async () => {
129 room2A.on('message', Crash('should not receive this'))
130 await room2A.leave()
131 room1A.broadcast('message 3')
132 await delay(3000)
133 })
134})
135
136function Crash (message) {
137 return function () {
138 throw new Error(message)
139 }
140}