UNPKG

3.75 kBJavaScriptView Raw
1'use strict';
2
3const {TimeoutError} = require("cyclon.p2p-common");
4
5const {IncomingShuffleState} = require("../lib/IncomingShuffleState.js");
6const ClientMocks = require("./ClientMocks");
7
8describe("The Incoming ShuffleState", function () {
9
10 const SOURCE_POINTER = {id: "SOURCE_ID", age: 10};
11
12 const REQUEST_PAYLOAD = "REQUEST_PAYLOAD";
13 const RESPONSE_PAYLOAD = "RESPONSE_PAYLOAD";
14
15 let localCyclonNode,
16 asyncExecService,
17 loggingService,
18 successCallback,
19 failureCallback,
20 channel;
21
22 let incomingShuffleState;
23
24 beforeEach(function () {
25 successCallback = ClientMocks.createSuccessCallback();
26 failureCallback = ClientMocks.createFailureCallback();
27
28 localCyclonNode = ClientMocks.mockCyclonNode();
29 asyncExecService = ClientMocks.mockAsyncExecService();
30 loggingService = ClientMocks.mockLoggingService();
31 channel = ClientMocks.mockChannel();
32
33 //
34 // Mock behaviour
35 //
36 localCyclonNode.handleShuffleRequest.and.returnValue(RESPONSE_PAYLOAD);
37
38 incomingShuffleState = new IncomingShuffleState(localCyclonNode, SOURCE_POINTER, asyncExecService, loggingService);
39 });
40
41 describe("when processing a shuffle request", function () {
42
43 describe("and everything succeeds", function() {
44 beforeEach(function () {
45 channel.receive.and.returnValue(Promise.resolve(REQUEST_PAYLOAD));
46 asyncExecService.setTimeout.and.callFake(function (callback) {
47 callback();
48 });
49 });
50
51 it("delegates to the node to handle the request, then sends the response via the data channel", async () => {
52 await incomingShuffleState.processShuffleRequest(channel);
53 expect(localCyclonNode.handleShuffleRequest).toHaveBeenCalledWith(SOURCE_POINTER, REQUEST_PAYLOAD);
54 expect(channel.send).toHaveBeenCalledWith("shuffleResponse", RESPONSE_PAYLOAD);
55 });
56 });
57
58 describe("and a timeout occurs waiting for the request", function(){
59 let timeoutError;
60
61 beforeEach(async () => {
62 timeoutError = new TimeoutError('timeout');
63 channel.receive.and.returnValue(Promise.reject(timeoutError));
64 try {
65 await incomingShuffleState.processShuffleRequest(channel);
66 fail();
67 } catch (e) {
68 expect(e).toBe(timeoutError);
69 }
70 });
71
72 it("does not attempt to handle the request", function() {
73 expect(localCyclonNode.handleShuffleRequest).not.toHaveBeenCalled();
74 });
75 });
76 });
77
78 describe("when waiting for the response acknowledgement", function() {
79
80 describe("and everything succeeds", function() {
81 beforeEach(async () => {
82 channel.receive.and.returnValue(Promise.resolve(null));
83 await incomingShuffleState.waitForResponseAcknowledgement(channel);
84 });
85
86 it("delegates to the messaging utilities to receive the acknowledgement", function() {
87 expect(channel.receive).toHaveBeenCalledWith("shuffleResponseAcknowledgement", jasmine.any(Number));
88 });
89 });
90
91 describe("and a timeout occurs", function() {
92
93 it("logs a warning and resolves", async () => {
94 channel.receive.and.returnValue(Promise.reject(new TimeoutError('timeout')));
95 await incomingShuffleState.waitForResponseAcknowledgement(channel);
96 expect(loggingService.warn).toHaveBeenCalled();
97 });
98 });
99 });
100});