UNPKG

4.79 kBPlain TextView Raw
1import {CyclonNode, CyclonNodePointer, MetadataProvider} from 'cyclon.p2p';
2import {Channel, RTC, WebRTCCyclonNodePointer} from 'cyclon.p2p-rtc-client';
3import {Logger, TimeoutError} from 'cyclon.p2p-common';
4import {ShuffleStateFactory} from './ShuffleStateFactory';
5import {OutgoingShuffleState} from './OutgoingShuffleState';
6
7const CYCLON_SHUFFLE_CHANNEL_TYPE = 'cyclonShuffle';
8
9export class WebRTCComms {
10
11 private localNode?: CyclonNode;
12 private lastShuffleNode?: CyclonNodePointer;
13
14 constructor(private readonly rtc: RTC,
15 private readonly shuffleStateFactory: ShuffleStateFactory,
16 private readonly logger: Logger,
17 private readonly roomsToJoin: string[]) {
18 if (!(roomsToJoin && roomsToJoin.length > 0)) {
19 throw new Error('Must specify at least one room to join');
20 }
21 }
22
23 /**
24 * Initialize the Comms object
25 *
26 * @param localNode The local Cyclon node
27 * @param metadataProviders
28 */
29 initialize(localNode: CyclonNode, metadataProviders: { [key: string]: MetadataProvider }) {
30 this.localNode = localNode;
31 this.rtc.connect(metadataProviders, this.roomsToJoin);
32 this.rtc.onChannel('cyclonShuffle', (channel) => this.handleIncomingShuffle(channel));
33 this.rtc.on('incomingTimeout', (channelType: string, sourcePointer: CyclonNodePointer) => {
34 if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
35 this.requireLocalNode().emit('shuffleTimeout', 'incoming', sourcePointer);
36 }
37 });
38 this.rtc.on('incomingError', (channelType, sourcePointer, error) => {
39 if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
40 this.logger.error('An error occurred on an incoming shuffle', error);
41 this.requireLocalNode().emit('shuffleError', 'incoming', sourcePointer, error);
42 }
43 });
44 this.rtc.on('offerReceived', (channelType, sourcePointer) => {
45 if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
46 this.logger.debug(`Incoming shuffle starting with ${sourcePointer.id}`);
47 this.requireLocalNode().emit('shuffleStarted', 'incoming', sourcePointer);
48 }
49 });
50 }
51
52 /**
53 * Send a shuffle request to another node
54 *
55 * @param destinationNodePointer
56 * @param shuffleSet
57 */
58 async sendShuffleRequest(destinationNodePointer: WebRTCCyclonNodePointer, shuffleSet: WebRTCCyclonNodePointer[]): Promise<void> {
59 this.lastShuffleNode = destinationNodePointer;
60 return await this.createOutgoingShuffle(
61 this.shuffleStateFactory.createOutgoingShuffleState(this.requireLocalNode(), destinationNodePointer, shuffleSet),
62 destinationNodePointer);
63 }
64
65 private async createOutgoingShuffle(outgoingState: OutgoingShuffleState, destinationNodePointer: WebRTCCyclonNodePointer): Promise<void> {
66 try {
67 const channel = await this.rtc.openChannel(CYCLON_SHUFFLE_CHANNEL_TYPE, destinationNodePointer);
68 outgoingState.storeChannel(channel);
69 outgoingState.sendShuffleRequest();
70 await outgoingState.processShuffleResponse();
71 await outgoingState.sendResponseAcknowledgement();
72 } finally {
73 outgoingState.close();
74 }
75 }
76
77 createNewPointer(): CyclonNodePointer {
78 return this.rtc.createNewPointer();
79 }
80
81 getLocalId() {
82 return this.rtc.getLocalId();
83 }
84
85 /**
86 * Handle an incoming shuffle
87 */
88 async handleIncomingShuffle(channel: Channel): Promise<void> {
89 const remotePeer = channel.getRemotePeer();
90
91 const incomingShuffleState = this.shuffleStateFactory.createIncomingShuffleState(this.requireLocalNode(), remotePeer);
92
93 try {
94 await incomingShuffleState.processShuffleRequest(channel);
95 await incomingShuffleState.waitForResponseAcknowledgement(channel);
96 this.requireLocalNode().emit('shuffleCompleted', 'incoming', remotePeer);
97 } catch (e) {
98 if (e instanceof TimeoutError) {
99 this.logger.warn(e.message);
100 this.requireLocalNode().emit('shuffleTimeout', 'incoming', remotePeer);
101 } else {
102 this.logger.error('An unknown error occurred on an incoming shuffle', e);
103 this.requireLocalNode().emit('shuffleError', 'incoming', remotePeer, 'unknown');
104 }
105 } finally {
106 channel.close();
107 }
108 }
109
110 private requireLocalNode(): CyclonNode {
111 if (this.localNode) {
112 return this.localNode;
113 } else {
114 throw new Error('Comms not yet initialized (localNode is not defined)');
115 }
116 }
117}