UNPKG

2.86 kBJavaScriptView Raw
1var assert = require('assert');
2var util = require('util');
3var net = require('net');
4var EventEmitter = require('events').EventEmitter;
5var zetta = require('../');
6var EventSocket = require('../lib/event_socket');
7var EventBroker = require('../lib/event_broker');
8var PeerRegistry = require('./fixture/scout_test_mocks').MockPeerRegistry;
9var PeerSocket = require('../lib/peer_socket');
10var Registry = require('./fixture/scout_test_mocks').MockRegistry;
11
12var Ws = function() {
13 EventEmitter.call(this)
14 this._socket = new net.Socket();
15 this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'};
16};
17util.inherits(Ws, EventEmitter);
18Ws.prototype.send = function(data, options, cb) {
19 var r = this.emit('onsend', data, options, cb);
20};
21Ws.prototype.close = function() {};
22
23
24describe('EventBroker', function() {
25 var msg = JSON.stringify({topic: '_peer/connect', data: {somedata: 1}, timestamp: new Date().getTime()});
26 var query = null;
27 var app = null;
28 var broker = null;
29 var peerRegistry = null;
30 beforeEach(function() {
31 var reg = new Registry();
32 peerRegistry = new PeerRegistry();
33 app = zetta({ registry: reg, peerRegistry: peerRegistry }).silent();
34 query = { topic: '_peer/connect', name: app.id };
35 broker = new EventBroker(app);
36 });
37
38
39 it('it should add peer by server name', function() {
40 var ws = new Ws();
41 var peer = new PeerSocket(ws, 'some-peer', peerRegistry);
42 peer.name = 'some-peer2';
43 broker.peer(peer);
44 assert.equal(peer, broker.peers['some-peer2']);
45 });
46
47 it('it should pass data from local pubsub to clients', function(done) {
48 var ws = new Ws();
49 var client = new EventSocket(ws, query);
50 broker.client(client);
51
52 ws.on('onsend', function(buf) {
53 var msg = JSON.parse(buf);
54 assert.equal(msg.topic, '_peer/connect');
55 assert(msg.timestamp);
56 assert.deepEqual(msg.data, { somedata: 1 });
57 done();
58 });
59
60 app.pubsub.publish('_peer/connect', msg);
61 });
62
63 it('should keep local pubsub subscription open when more than one client is active', function(done) {
64 var clientA = new EventSocket(new Ws(), query);
65 var clientB = new EventSocket(new Ws(), query);
66 broker.client(clientA);
67 broker.client(clientB);
68
69 var recievedA = 0;
70 var recievedB = 0;
71 clientA.ws.on('onsend', function(buf) {
72 recievedA++;
73 });
74 clientB.ws.on('onsend', function(buf) {
75 recievedB++;
76 });
77
78 setTimeout(function() {
79 assert.equal(recievedA, 1);
80 assert.equal(recievedB, 1);
81
82 clientA.emit('close');
83
84 setTimeout(function() {
85 assert.equal(recievedA, 1);
86 assert.equal(recievedB, 2);
87 done();
88 }, 2);
89
90 app.pubsub.publish('_peer/connect', {});
91 }, 2);
92
93 app.pubsub.publish('_peer/connect', {});
94 });
95
96});