UNPKG

1.94 kBPlain TextView Raw
1import {CyclonNode, CyclonNodePointer} from 'cyclon.p2p';
2import {AsyncExecService, Logger, TimeoutError} from 'cyclon.p2p-common';
3import {Channel} from 'cyclon.p2p-rtc-client';
4
5const SHUFFLE_REQUEST_TIMEOUT_MS = 15000;
6const SHUFFLE_RESPONSE_ACKNOWLEDGEMENT_TIMEOUT_MS = 15000;
7
8export class IncomingShuffleState {
9
10 constructor(private readonly localNode: CyclonNode,
11 private readonly sourcePointer: CyclonNodePointer,
12 private readonly asyncExecService: AsyncExecService,
13 private readonly logger: Logger) {
14 }
15
16 /**
17 * Receive an inbound shuffle
18 *
19 * @param channel
20 */
21 async processShuffleRequest(channel: Channel): Promise<void> {
22 let shuffleRequestMessage = await channel.receive("shuffleRequest", SHUFFLE_REQUEST_TIMEOUT_MS);
23 this.logger.debug("Received shuffle request from " + this.sourcePointer.id + " : " + JSON.stringify(shuffleRequestMessage));
24 const response = this.localNode.handleShuffleRequest(this.sourcePointer, shuffleRequestMessage);
25 channel.send("shuffleResponse", response);
26 this.logger.debug("Sent shuffle response to " + this.sourcePointer.id);
27 }
28
29 /**
30 * Wait for an acknowledgment that our shuffle response
31 * was received (to prevent prematurely closing the data channel)
32 */
33 async waitForResponseAcknowledgement(channel: Channel): Promise<Channel | null> {
34 try {
35 return await channel.receive("shuffleResponseAcknowledgement", SHUFFLE_RESPONSE_ACKNOWLEDGEMENT_TIMEOUT_MS);
36 } catch (error) {
37 if (error instanceof TimeoutError) {
38 this.logger.warn("Timeout occurred waiting for response acknowledgement, continuing");
39 } else {
40 this.logger.error('An unknown error occurred waiting for response acknowledgement, continuing', error);
41 }
42 return null;
43 }
44 }
45}