UNPKG

11.8 kBJavaScriptView Raw
1'use strict';
2
3const {TimeoutError} = require("cyclon.p2p-common");
4const {WebRTCComms} = require("../lib/WebRTCComms");
5const ClientMocks = require("./ClientMocks");
6const events = require("events");
7
8describe("The WebRTC Comms layer", function () {
9
10 const WAIT_FOR_CHANNEL_TO_OPEN_RESULT = "WAIT_FOR_CHANNEL_TO_OPEN_RESULT",
11 SEND_SHUFFLE_REQUEST_RESULT = "SEND_SHUFFLE_REQUEST_RESULT",
12 PROCESS_SHUFFLE_RESPONSE_RESULT = "PROCESS_SHUFFLE_RESULT_RESULT",
13 SEND_RESPONSE_ACKNOWLEDGEMENT_RESULT = "SEND_RESPONSE_ACKNOWLEDGEMENT_RESULT",
14 PROCESS_SHUFFLE_REQUEST_RESULT = "PROCESS_SHUFFLE_REQUEST_RESULT",
15 WAIT_FOR_RESPONSE_ACKNOWLEDGEMENT_RESULT = "WAIT_FOR_RESPONSE_ACKNOWLEDGEMENT_RESULT",
16 CREATE_NEW_POINTER_RESULT = "CREATE_NEW+POINTER_RESULT",
17 LOCAL_ID = "LOCAL_ID";
18
19 const CYCLON_SHUFFLE_CHANNEL_TYPE = "cyclonShuffle";
20 const REMOTE_POINTER = "REMOTE_POINTER";
21 const INCOMING_ERROR = "INCOMING_ERROR";
22 const ROOMS_TO_JOIN = ['aaa', 'bbb', 'ccc'];
23
24 let comms,
25 channel,
26 rtc,
27 shuffleStateFactory,
28 outgoingShuffleState,
29 localCyclonNode,
30 destinationNodePointer,
31 shuffleSet,
32 incomingShuffleState,
33 logger,
34 successCallback,
35 failureCallback,
36 metadataProviders;
37
38 beforeEach(() => {
39 successCallback = ClientMocks.createSuccessCallback();
40 failureCallback = ClientMocks.createFailureCallback();
41
42 // Create mocks
43 rtc = ClientMocks.mockRtc();
44 channel = ClientMocks.mockChannel();
45 shuffleStateFactory = ClientMocks.mockShuffleStateFactory();
46 localCyclonNode = ClientMocks.mockCyclonNode();
47 outgoingShuffleState = createSucceedingOutgoingShuffleState();
48 incomingShuffleState = createSucceedingIncomingShuffleState();
49 logger = ClientMocks.mockLoggingService();
50 metadataProviders = {
51 something: () => ""
52 };
53
54 destinationNodePointer = createCacheEntry("destinationNodePointer", 12);
55 shuffleSet = [createCacheEntry("a", 456), createCacheEntry("b", 123), createCacheEntry("c", 222)];
56
57 //
58 // Mock behaviour
59 //
60 rtc.openChannel.and.returnValue(Promise.resolve(WAIT_FOR_CHANNEL_TO_OPEN_RESULT));
61 rtc.createNewPointer.and.returnValue(CREATE_NEW_POINTER_RESULT);
62 rtc.getLocalId.and.returnValue(LOCAL_ID);
63 channel.getRemotePeer.and.returnValue(destinationNodePointer);
64 shuffleStateFactory.createOutgoingShuffleState.and.returnValue(outgoingShuffleState);
65 shuffleStateFactory.createIncomingShuffleState.and.returnValue(incomingShuffleState);
66
67 comms = new WebRTCComms(rtc, shuffleStateFactory, logger, ROOMS_TO_JOIN);
68 });
69
70 describe('constructor', () => {
71
72 it('throws an error when no rooms to join are specified', () => {
73 try {
74 new WebRTCComms(localCyclonNode, metadataProviders, []);
75 fail('Constructor allowed empty rooms list');
76 } catch (e) {
77 expect(e).toEqual(new Error('Must specify at least one room to join'));
78 }
79 });
80 });
81
82 describe("when initializing", () => {
83
84 beforeEach(() => {
85 comms.initialize(localCyclonNode, metadataProviders);
86 });
87
88 it("should initialize the RTC layer", () => {
89 expect(rtc.connect).toHaveBeenCalledWith(metadataProviders, ROOMS_TO_JOIN);
90 });
91
92 it("should add a listener for incoming shuffle channels", () => {
93 expect(rtc.onChannel).toHaveBeenCalledWith("cyclonShuffle", jasmine.any(Function));
94 });
95 });
96
97 describe("when cyclon shuffle errors occur on the RTC service", () => {
98
99 beforeEach(() => {
100 rtc = new events.EventEmitter();
101 rtc.onChannel = jasmine.createSpy();
102 rtc.connect = jasmine.createSpy();
103 comms = new WebRTCComms(rtc, shuffleStateFactory, logger, ROOMS_TO_JOIN);
104 comms.initialize(localCyclonNode, metadataProviders);
105 });
106
107 describe("on incomingTimeout", () => {
108
109 it("emits an incoming shuffle timeout event for cyclon channels", () => {
110 rtc.emit("incomingTimeout", CYCLON_SHUFFLE_CHANNEL_TYPE, REMOTE_POINTER);
111 expect(localCyclonNode.emit).toHaveBeenCalledWith("shuffleTimeout", "incoming", REMOTE_POINTER);
112 });
113
114 it("emits no incoming shuffle timeout event for non-cyclon channels", () => {
115 rtc.emit("incomingTimeout", "otherChannelType", REMOTE_POINTER);
116 expect(localCyclonNode.emit).not.toHaveBeenCalled();
117 });
118 });
119
120 describe("on incomingError", () => {
121
122 it("emits an incoming shuffleError event for cyclon channels", () => {
123 rtc.emit("incomingError", CYCLON_SHUFFLE_CHANNEL_TYPE, REMOTE_POINTER, INCOMING_ERROR);
124 expect(localCyclonNode.emit).toHaveBeenCalledWith("shuffleError", "incoming", REMOTE_POINTER, INCOMING_ERROR);
125 });
126
127 it("emits no incoming shuffleError event for non-cyclon channels", () => {
128 rtc.emit("incomingError", "otherChannelType", REMOTE_POINTER, INCOMING_ERROR);
129 expect(localCyclonNode.emit).not.toHaveBeenCalled();
130 });
131 });
132
133 describe("on offerReceived", () => {
134
135 it("emits an incoming shuffleStarted event for cyclon channels", () => {
136 rtc.emit("offerReceived", CYCLON_SHUFFLE_CHANNEL_TYPE, REMOTE_POINTER);
137 expect(localCyclonNode.emit).toHaveBeenCalledWith("shuffleStarted", "incoming", REMOTE_POINTER);
138 });
139
140 it("emits no incoming shuffleStarted event for non-cyclon channels", () => {
141 rtc.emit("offerReceived", "otherChannelType", REMOTE_POINTER);
142 expect(localCyclonNode.emit).not.toHaveBeenCalled();
143 });
144 });
145 });
146
147 describe("before sending a shuffle request", () => {
148
149 beforeEach(() => {
150 comms.initialize(localCyclonNode, metadataProviders);
151 comms.sendShuffleRequest(destinationNodePointer, shuffleSet);
152 });
153
154 it("should create a new outgoing shuffle state", () => {
155 expect(shuffleStateFactory.createOutgoingShuffleState).toHaveBeenCalledWith(localCyclonNode, destinationNodePointer, shuffleSet);
156 });
157 });
158
159 describe("when sending a shuffle request", () => {
160
161 beforeEach(() => {
162 comms.initialize(localCyclonNode, metadataProviders);
163 });
164
165 describe("and everything succeeds", () => {
166 it("should perform the peer exchange then cleanup resources when the offer is created successfully", async () => {
167 await comms.sendShuffleRequest(destinationNodePointer, shuffleSet);
168
169 // The exchange occurred
170 expect(rtc.openChannel).toHaveBeenCalledWith("cyclonShuffle", destinationNodePointer);
171 expect(outgoingShuffleState.storeChannel).toHaveBeenCalledWith(WAIT_FOR_CHANNEL_TO_OPEN_RESULT);
172 expect(outgoingShuffleState.sendShuffleRequest).toHaveBeenCalledWith();
173 expect(outgoingShuffleState.processShuffleResponse).toHaveBeenCalledWith();
174 expect(outgoingShuffleState.sendResponseAcknowledgement).toHaveBeenCalledWith();
175
176 // Clean up occurred
177 expect(outgoingShuffleState.close).toHaveBeenCalled();
178 });
179 });
180
181 it("should not send the request when the channel does not open successfully", (done) => {
182
183 rtc.openChannel.and.returnValue(Promise.reject(new Error("bad")));
184 comms.sendShuffleRequest(localCyclonNode, destinationNodePointer, shuffleSet)
185 .catch(() => {
186 expect(outgoingShuffleState.sendShuffleRequest).not.toHaveBeenCalled();
187 // Clean up occurred
188 expect(outgoingShuffleState.close).toHaveBeenCalled();
189 done();
190 });
191 });
192 });
193
194 describe("when creating a new pointer", () => {
195
196 it("delegates to the RTC service", () => {
197 expect(comms.createNewPointer()).toBe(CREATE_NEW_POINTER_RESULT);
198 });
199 });
200
201 describe("when getting the local ID", () => {
202
203 it("delegates to the RTC service", () => {
204 expect(comms.getLocalId()).toBe(LOCAL_ID);
205 });
206 });
207
208 describe("when handling an incoming shuffle", () => {
209
210 beforeEach(() => {
211 comms.initialize(localCyclonNode, metadataProviders);
212 });
213
214 describe("before processing the shuffle request", () => {
215 beforeEach(() => {
216 comms.handleIncomingShuffle(channel).then(successCallback).catch(failureCallback);
217 });
218
219 it("should create a new incoming shuffle state", () => {
220 expect(shuffleStateFactory.createIncomingShuffleState).toHaveBeenCalledWith(localCyclonNode, destinationNodePointer);
221 });
222 });
223
224 describe("and everything succeeds", () => {
225
226 beforeEach(async () => {
227 await comms.handleIncomingShuffle(channel);
228 });
229
230 it("should perform the exchange with the source peer then clean up when an answer is created successfully", async () => {
231 expect(incomingShuffleState.processShuffleRequest).toHaveBeenCalledWith(channel);
232 expect(incomingShuffleState.waitForResponseAcknowledgement).toHaveBeenCalledWith(channel);
233
234 // and cleanup
235 expect(channel.close).toHaveBeenCalled();
236 });
237 });
238
239 describe("and a timeout occurs waiting for the shuffle request", () => {
240
241 let incomingShufflePromise;
242
243 beforeEach(() => {
244 incomingShuffleState.processShuffleRequest.and.returnValue(Promise.reject(new TimeoutError('timeout')));
245 incomingShufflePromise = comms.handleIncomingShuffle(channel);
246 });
247
248 it("should clean up it state and not wait for the acknowledgement", (done) => {
249 incomingShufflePromise.then(() => {
250 expect(incomingShuffleState.waitForResponseAcknowledgement).not.toHaveBeenCalled();
251
252 // Close should still be called
253 expect(channel.close).toHaveBeenCalled();
254 done();
255 });
256 });
257 });
258 });
259
260 function createSucceedingOutgoingShuffleState(name) {
261 const outgoingShuffleState = ClientMocks.mockOutgoingShuffleState(name);
262 outgoingShuffleState.sendShuffleRequest.and.returnValue(Promise.resolve(SEND_SHUFFLE_REQUEST_RESULT));
263 outgoingShuffleState.processShuffleResponse.and.returnValue(Promise.resolve(PROCESS_SHUFFLE_RESPONSE_RESULT));
264 outgoingShuffleState.sendResponseAcknowledgement.and.returnValue(Promise.resolve(SEND_RESPONSE_ACKNOWLEDGEMENT_RESULT));
265 return outgoingShuffleState;
266 }
267
268 function createSucceedingIncomingShuffleState() {
269 const incomingShuffleState = ClientMocks.mockIncomingShuffleState();
270 incomingShuffleState.processShuffleRequest.and.returnValue(Promise.resolve(PROCESS_SHUFFLE_REQUEST_RESULT));
271 incomingShuffleState.waitForResponseAcknowledgement.and.returnValue(Promise.resolve(WAIT_FOR_RESPONSE_ACKNOWLEDGEMENT_RESULT));
272 return incomingShuffleState;
273 }
274
275 /**
276 * Create a cache entry
277 *
278 * @param id
279 * @param age
280 * @returns {{id: *, age: *}}
281 */
282 function createCacheEntry(id, age) {
283 return {
284 id: id,
285 age: age
286 };
287 }
288});