UNPKG

3.72 kBJavaScriptView Raw
1var assert = require('assert');
2var util = require('util');
3var net = require('net');
4var EventEmitter = require('events').EventEmitter;
5var zetta = require('../');
6var EventSocket = require('../lib/event_socket');
7var EventBroker = require('../lib/event_broker');
8var PeerRegistry = require('./fixture/scout_test_mocks').MockPeerRegistry;
9var PeerSocket = require('../lib/peer_socket');
10var Registry = require('./fixture/scout_test_mocks').MockRegistry;
11
12var Ws = function() {
13 EventEmitter.call(this)
14 this._socket = new net.Socket();
15 this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'};
16};
17util.inherits(Ws, EventEmitter);
18Ws.prototype.send = function(data, options, cb) {
19 var r = this.emit('onsend', data, options, cb);
20};
21Ws.prototype.close = function() {};
22
23
24describe('EventBroker', function() {
25 var msg = JSON.stringify({topic: 'some-topic', data: {somedata: 1}, timestamp: new Date().getTime()});
26 var query = null;
27 var app = null;
28 var broker = null;
29 var peerRegistry = null;
30 beforeEach(function() {
31 var reg = new Registry();
32 peerRegistry = new PeerRegistry();
33 app = zetta({ registry: reg, peerRegistry: peerRegistry }).silent();
34 query = { topic: 'some-topic', name: app.id };
35 broker = new EventBroker(app);
36 });
37
38
39 it('it should add peer by server name', function() {
40 var ws = new Ws();
41 var peer = new PeerSocket(ws, 'some-peer', peerRegistry);
42 peer.name = 'some-peer2';
43 broker.peer(peer);
44 assert.equal(peer, broker.peers['some-peer2']);
45 });
46
47
48 it('it should add client and subscribe to topic', function() {
49 var ws = new Ws();
50 var client = new EventSocket(ws, query);
51 broker.client(client);
52 assert.equal(broker.clients.length, 1);
53 assert.equal(broker.subscriptions['some-topic'].count, 1);
54 });
55
56 it('it should remove subscription when client closes', function(done) {
57 var ws = new Ws();
58 var client = new EventSocket(ws, query);
59 broker.client(client);
60 assert.equal(broker.clients.length, 1);
61 assert.equal(broker.subscriptions['some-topic'].count, 1);
62
63 client.emit('close');
64
65 setTimeout(function() {
66 assert.equal(broker.clients.length, 0);
67 assert(!broker.subscriptions['some-topic']);
68 done();
69 }, 1);
70 });
71
72 it('it should pass data from local pubsub to clients', function(done) {
73 var ws = new Ws();
74 var client = new EventSocket(ws, query);
75 broker.client(client);
76
77 var recieved = 0;
78 ws.on('onsend', function(buf) {
79 recieved++;
80 var msg = JSON.parse(buf);
81 assert.equal(msg.topic, 'some-topic');
82 assert(msg.timestamp);
83 assert.deepEqual(msg.data, {somedata: 1});
84 });
85
86 setTimeout(function() {
87 assert.equal(recieved, 1);
88 done();
89 }, 2);
90
91 app.pubsub.publish('some-topic', msg);
92 });
93
94 it('should keep local pubsub subscription open when more than one client is active', function(done) {
95 var clientA = new EventSocket(new Ws(), query);
96 var clientB = new EventSocket(new Ws(), query);
97 broker.client(clientA);
98 broker.client(clientB);
99
100 var recievedA = 0;
101 var recievedB = 0;
102 clientA.ws.on('onsend', function(buf) {
103 recievedA++;
104 });
105 clientB.ws.on('onsend', function(buf) {
106 recievedB++;
107 });
108
109 setTimeout(function() {
110 assert.equal(recievedA, 1);
111 assert.equal(recievedB, 1);
112
113 clientA.emit('close');
114
115 done();
116 return;
117
118 setTimeout(function() {
119 assert.equal(recievedA, 1);
120 assert.equal(recievedB, 2);
121 done();
122 }, 2);
123
124 app.pubsub.publish('some-topic', msg);
125 }, 2);
126
127 app.pubsub.publish('some-topic', msg);
128 });
129
130});