1 | var assert = require('assert');
|
2 | var util = require('util');
|
3 | var net = require('net');
|
4 | var EventEmitter = require('events').EventEmitter;
|
5 | var zetta = require('../');
|
6 | var EventSocket = require('../lib/event_socket');
|
7 | var EventBroker = require('../lib/event_broker');
|
8 | var PeerRegistry = require('./fixture/scout_test_mocks').MockPeerRegistry;
|
9 | var PeerSocket = require('../lib/peer_socket');
|
10 | var Registry = require('./fixture/scout_test_mocks').MockRegistry;
|
11 |
|
12 | var Ws = function() {
|
13 | EventEmitter.call(this)
|
14 | this._socket = new net.Socket();
|
15 | this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'};
|
16 | };
|
17 | util.inherits(Ws, EventEmitter);
|
18 | Ws.prototype.send = function(data, options, cb) {
|
19 | var r = this.emit('onsend', data, options, cb);
|
20 | };
|
21 | Ws.prototype.close = function() {};
|
22 |
|
23 |
|
24 | describe('EventBroker', function() {
|
25 | var msg = JSON.stringify({topic: '_peer/connect', data: {somedata: 1}, timestamp: new Date().getTime()});
|
26 | var query = null;
|
27 | var app = null;
|
28 | var broker = null;
|
29 | var peerRegistry = null;
|
30 | beforeEach(function() {
|
31 | var reg = new Registry();
|
32 | peerRegistry = new PeerRegistry();
|
33 | app = zetta({ registry: reg, peerRegistry: peerRegistry }).silent();
|
34 | query = { topic: '_peer/connect', name: app.id };
|
35 | broker = new EventBroker(app);
|
36 | });
|
37 |
|
38 |
|
39 | it('it should add peer by server name', function() {
|
40 | var ws = new Ws();
|
41 | var peer = new PeerSocket(ws, 'some-peer', peerRegistry);
|
42 | peer.name = 'some-peer2';
|
43 | broker.peer(peer);
|
44 | assert.equal(peer, broker.peers['some-peer2']);
|
45 | });
|
46 |
|
47 | it('it should pass data from local pubsub to clients', function(done) {
|
48 | var ws = new Ws();
|
49 | var client = new EventSocket(ws, query);
|
50 | broker.client(client);
|
51 |
|
52 | ws.on('onsend', function(buf) {
|
53 | var msg = JSON.parse(buf);
|
54 | assert.equal(msg.topic, '_peer/connect');
|
55 | assert(msg.timestamp);
|
56 | assert.deepEqual(msg.data, { somedata: 1 });
|
57 | done();
|
58 | });
|
59 |
|
60 | app.pubsub.publish('_peer/connect', msg);
|
61 | });
|
62 |
|
63 | it('should keep local pubsub subscription open when more than one client is active', function(done) {
|
64 | var clientA = new EventSocket(new Ws(), query);
|
65 | var clientB = new EventSocket(new Ws(), query);
|
66 | broker.client(clientA);
|
67 | broker.client(clientB);
|
68 |
|
69 | var recievedA = 0;
|
70 | var recievedB = 0;
|
71 | clientA.ws.on('onsend', function(buf) {
|
72 | recievedA++;
|
73 | });
|
74 | clientB.ws.on('onsend', function(buf) {
|
75 | recievedB++;
|
76 | });
|
77 |
|
78 | setTimeout(function() {
|
79 | assert.equal(recievedA, 1);
|
80 | assert.equal(recievedB, 1);
|
81 |
|
82 | clientA.emit('close');
|
83 |
|
84 | setTimeout(function() {
|
85 | assert.equal(recievedA, 1);
|
86 | assert.equal(recievedB, 2);
|
87 | done();
|
88 | }, 2);
|
89 |
|
90 | app.pubsub.publish('_peer/connect', {});
|
91 | }, 2);
|
92 |
|
93 | app.pubsub.publish('_peer/connect', {});
|
94 | });
|
95 |
|
96 | });
|