1 | import {CyclonNode, CyclonNodePointer} from 'cyclon.p2p';
|
2 | import {AsyncExecService, Logger, TimeoutError} from 'cyclon.p2p-common';
|
3 | import {Channel} from 'cyclon.p2p-rtc-client';
|
4 |
|
5 | const SHUFFLE_REQUEST_TIMEOUT_MS = 15000;
|
6 | const SHUFFLE_RESPONSE_ACKNOWLEDGEMENT_TIMEOUT_MS = 15000;
|
7 |
|
8 | export class IncomingShuffleState {
|
9 |
|
10 | constructor(private readonly localNode: CyclonNode,
|
11 | private readonly sourcePointer: CyclonNodePointer,
|
12 | private readonly asyncExecService: AsyncExecService,
|
13 | private readonly logger: Logger) {
|
14 | }
|
15 |
|
16 | |
17 |
|
18 |
|
19 |
|
20 |
|
21 | async processShuffleRequest(channel: Channel): Promise<void> {
|
22 | let shuffleRequestMessage = await channel.receive("shuffleRequest", SHUFFLE_REQUEST_TIMEOUT_MS);
|
23 | this.logger.debug("Received shuffle request from " + this.sourcePointer.id + " : " + JSON.stringify(shuffleRequestMessage));
|
24 | const response = this.localNode.handleShuffleRequest(this.sourcePointer, shuffleRequestMessage);
|
25 | channel.send("shuffleResponse", response);
|
26 | this.logger.debug("Sent shuffle response to " + this.sourcePointer.id);
|
27 | }
|
28 |
|
29 | |
30 |
|
31 |
|
32 |
|
33 | async waitForResponseAcknowledgement(channel: Channel): Promise<Channel | null> {
|
34 | try {
|
35 | return await channel.receive("shuffleResponseAcknowledgement", SHUFFLE_RESPONSE_ACKNOWLEDGEMENT_TIMEOUT_MS);
|
36 | } catch (error) {
|
37 | if (error instanceof TimeoutError) {
|
38 | this.logger.warn("Timeout occurred waiting for response acknowledgement, continuing");
|
39 | } else {
|
40 | this.logger.error('An unknown error occurred waiting for response acknowledgement, continuing', error);
|
41 | }
|
42 | return null;
|
43 | }
|
44 | }
|
45 | }
|