UNPKG

4.41 kBJavaScriptView Raw
1'use strict';
2
3const {TimeoutError} = require("cyclon.p2p-common");
4const {OutgoingShuffleState} = require("../lib/OutgoingShuffleState");
5const ClientMocks = require("./ClientMocks");
6
7describe("The Outgoing ShuffleState", function () {
8
9 const TIMEOUT_ID = 12345;
10 const SHUFFLE_SET = ["a", "b", "c"];
11 const DESTINATION_NODE_POINTER = {
12 id: "OTHER_NODE_ID"
13 };
14 const RESPONSE_PAYLOAD = "RESPONSE_PAYLOAD";
15
16 let localCyclonNode,
17 asyncExecService,
18 loggingService,
19 channel;
20
21 let successCallback, failureCallback;
22
23 let outgoingShuffleState;
24
25 beforeEach(function () {
26 successCallback = ClientMocks.createSuccessCallback();
27 failureCallback = ClientMocks.createFailureCallback();
28
29 localCyclonNode = ClientMocks.mockCyclonNode();
30 asyncExecService = ClientMocks.mockAsyncExecService();
31 loggingService = ClientMocks.mockLoggingService();
32 channel = ClientMocks.mockChannel();
33
34 //
35 // Mock behaviours
36 //
37 asyncExecService.setTimeout.and.returnValue(TIMEOUT_ID);
38
39 outgoingShuffleState = new OutgoingShuffleState(localCyclonNode, DESTINATION_NODE_POINTER, SHUFFLE_SET, asyncExecService, loggingService);
40 });
41
42 describe("after channel establishment", function() {
43
44 beforeEach(function() {
45 outgoingShuffleState.storeChannel(channel);
46 });
47
48 describe("when sending a shuffle request", function () {
49
50 beforeEach(function () {
51 outgoingShuffleState.sendShuffleRequest();
52 });
53
54 it("should send the message over the data channel", function () {
55 expect(channel.send).toHaveBeenCalledWith("shuffleRequest", SHUFFLE_SET);
56 });
57 });
58
59 describe("when processing a shuffle response", function () {
60
61 describe("and a response is not received before the timeout", function () {
62
63 beforeEach(async () => {
64 let timeoutError = new TimeoutError('timeout');
65 channel.receive.and.returnValue(Promise.reject(timeoutError));
66 try {
67 await outgoingShuffleState.processShuffleResponse();
68 fail();
69 } catch (e) {
70 expect(e).toBe(timeoutError);
71 }
72 });
73
74 it("should not attempt to handle a response", function () {
75 expect(localCyclonNode.handleShuffleResponse).not.toHaveBeenCalled();
76 });
77 });
78
79 describe("and a response is received before timeout", function () {
80
81 beforeEach(async () => {
82 channel.receive.and.returnValue(Promise.resolve(RESPONSE_PAYLOAD));
83 await outgoingShuffleState.processShuffleResponse();
84 });
85
86 it("should delegate to the local node to handle the response", function () {
87 expect(localCyclonNode.handleShuffleResponse).toHaveBeenCalledWith(DESTINATION_NODE_POINTER, RESPONSE_PAYLOAD);
88 });
89 });
90 });
91
92 describe("when sending a response acknowledgement", function () {
93
94 describe("and everything succeeds", function () {
95 beforeEach(async () => {
96 asyncExecService.setTimeout.and.callFake(function(callback) {
97 callback();
98 });
99 await outgoingShuffleState.sendResponseAcknowledgement();
100 });
101
102 it("sends the acknowledgement over the channel", function () {
103 expect(channel.send).toHaveBeenCalledWith("shuffleResponseAcknowledgement");
104 });
105 });
106 });
107
108 describe("when closing", function() {
109
110 let channelClosingTimeoutId = "channelClosingTimeoutId";
111
112 beforeEach(function() {
113 asyncExecService.setTimeout.and.returnValue(channelClosingTimeoutId);
114 outgoingShuffleState.sendResponseAcknowledgement();
115
116 outgoingShuffleState.close();
117 });
118
119 it("clears the channel closing timeout", function() {
120 expect(asyncExecService.clearTimeout).toHaveBeenCalledWith(channelClosingTimeoutId);
121 });
122 });
123 });
124});