UNPKG

7.74 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var util = require('util');
4var net = require('net');
5var EventEmitter = require('events').EventEmitter;
6var zetta = require('../zetta');
7var MemRegistry = require('./fixture/mem_registry');
8var MemPeerRegistry = require('./fixture/mem_peer_registry');
9var PeerSocket = require('../lib/peer_socket');
10var PeerClient = require('../lib/peer_client');
11
12var Ws = function() {
13 EventEmitter.call(this)
14 this._socket = new net.Socket();
15 this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'};
16};
17util.inherits(Ws, EventEmitter);
18Ws.prototype.close = function() {};
19Ws.prototype.send = function(data, options, cb) {
20 var r = this.emit('onsend', data, options, cb);
21};
22
23
24describe('Peer Connection Logic', function() {
25 var cloud = null;
26 var cloudUrl = null;
27 beforeEach(function(done) {
28 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
29 cloud.silent();
30 cloud.listen(0, function(err) {
31 if (err) {
32 return done(err);
33 }
34
35 cloudUrl = 'ws://localhost:' + cloud.httpServer.server.address().port;
36 done();
37 })
38 });
39
40 afterEach(function(done) {
41 cloud.httpServer.server.close();
42 done();
43 });
44
45 describe('#link', function() {
46 it('should work before .listen is ran', function(done) {
47 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
48 .silent()
49 .link(cloudUrl)
50 .listen(0);
51
52 z.pubsub.subscribe('_peer/connect', function(topic, data) {
53 if (data.peer.url.indexOf(cloudUrl) === 0) {
54 done();
55 } else {
56 done(new Error('Peer connected to another url then expected'))
57 }
58 })
59 })
60
61 it('should work after .listen is ran', function(done) {
62 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
63 .silent()
64 .listen(0, function() {
65 z.link(cloudUrl);
66 });
67
68 z.pubsub.subscribe('_peer/connect', function(topic, data) {
69 if (data.peer.url.indexOf(cloudUrl) === 0) {
70 done();
71 } else {
72 done(new Error('Peer connected to another url then expected'))
73 }
74 })
75 })
76
77 it('should wire up request extensions', function(done) {
78 var called = false;
79 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
80 .silent()
81 .use(function(server) {
82 server.onPeerRequest(function(client) {
83 client
84 .use(function(handle) {
85 handle('request', function(pipeline) {
86 return pipeline.map(function(env) {
87 assert(env.request);
88 if (!called) {
89 called = true;
90 done();
91 }
92 return env;
93 });
94 });
95 });
96 });
97 })
98 .link(cloudUrl)
99 .listen(0);
100 });
101
102 it('should wire up response extensions', function(done) {
103 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
104 .silent()
105 .use(function(server) {
106 server.onPeerResponse(function(request) {
107 return request
108 .map(function(env) {
109 assert(env.request);
110 assert(env.response);
111 assert(env.upgrade);
112 done();
113 return env;
114 });
115 });
116 })
117 .link(cloudUrl)
118 .listen(0);
119 });
120 })
121
122 describe('Handle spdy agent errors', function() {
123 it('should catch error event', function(done) {
124 var ws = new Ws();
125 var socket = new PeerSocket(ws, 'some-peer', new MemPeerRegistry);
126 socket.on('error', function(err) {
127 if (err.message === 'spdy-error') {
128 done();
129 }
130 });
131 socket.agent.emit('error', new Error('spdy-error'));
132 });
133 })
134
135 describe('Peer_socket error events', function() {
136
137 it('http-server should handle multiple error events', function(done) {
138 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
139 .name('test-peer')
140 .silent()
141 .link(cloudUrl)
142 .listen(0);
143
144 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
145 assert(cloud.httpServer.peers['test-peer']);
146 var peer = cloud.httpServer.peers['test-peer'];
147
148 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
149 assert.equal(cloud.httpServer.peers['test-peer'].state, PeerSocket.DISCONNECTED);
150 });
151
152 peer.emit('error', new Error('some error'));
153 peer.emit('error', new Error('some error'));
154 done();
155 })
156 });
157
158
159 it('http-server should handle multiple end events', function(done) {
160 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
161 .name('test-peer')
162 .silent()
163 .link(cloudUrl)
164 .listen(0);
165
166 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
167 assert(cloud.httpServer.peers['test-peer']);
168 var peer = cloud.httpServer.peers['test-peer'];
169
170 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
171 assert.equal(cloud.httpServer.peers['test-peer'].state, PeerSocket.DISCONNECTED);
172 });
173
174 peer.emit('end');
175 peer.emit('end');
176 done();
177 })
178 });
179
180 });
181
182 describe('Handle timings with ws connects vs actual peer connects', function() {
183 var z = null;
184 beforeEach(function(done) {
185 z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
186 .name('peer-1')
187 .silent()
188 .listen(0, done);
189 })
190
191 it('peer connects should be the same peer object on the cloud', function(done) {
192 var client = new PeerClient(cloudUrl, z);
193
194 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
195 assert(data.peer === cloud.httpServer.peers['peer-1']);
196 done();
197 });
198
199 client.start();
200 })
201
202 it('peer connects should be the same peer object on the cloud with reconnect', function(done) {
203 var client = new PeerClient(cloudUrl, z);
204
205 var count = 0;
206 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
207 count++;
208 assert(data.peer === cloud.httpServer.peers['peer-1']);
209 if (count === 2) {
210 return done();
211 }
212 cloud.httpServer.peers['peer-1'].close();
213 });
214 client.start();
215 });
216
217 it('peer connects should be the same peer object on the cloud with reconnect with timing issue', function(done) {
218 this.timeout(5000);
219 var client = new PeerClient(cloudUrl, z);
220
221 var lastPeer = null;
222 var count = 0;
223 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
224 count++;
225 assert(data.peer === cloud.httpServer.peers['peer-1']);
226 if (count === 1) {
227 lastPeer = data.peer;
228 cloud.httpServer.peers['peer-1'].close();
229
230 client.once('connecting', function() {
231 var origRequest = client.onRequest;
232 client.server.removeListener('request', client.onRequest);
233 client.onRequest = function(req, res) {
234 client.ws.close();
235 };
236 client.server.once('request', client.onRequest.bind(client));
237 })
238
239 } else if (count === 2) {
240 assert(data.peer === lastPeer, 'a new PeerSocket on the cloud was created instead of a reuse');
241 done();
242 }
243 });
244 client.start();
245 });
246
247
248 });
249
250})