1 | import {CyclonNode, CyclonNodePointer, MetadataProvider} from 'cyclon.p2p';
|
2 | import {Channel, RTC, WebRTCCyclonNodePointer} from 'cyclon.p2p-rtc-client';
|
3 | import {Logger, TimeoutError} from 'cyclon.p2p-common';
|
4 | import {ShuffleStateFactory} from './ShuffleStateFactory';
|
5 | import {OutgoingShuffleState} from './OutgoingShuffleState';
|
6 |
|
7 | const CYCLON_SHUFFLE_CHANNEL_TYPE = 'cyclonShuffle';
|
8 |
|
9 | export class WebRTCComms {
|
10 |
|
11 | private localNode?: CyclonNode;
|
12 | private lastShuffleNode?: CyclonNodePointer;
|
13 |
|
14 | constructor(private readonly rtc: RTC,
|
15 | private readonly shuffleStateFactory: ShuffleStateFactory,
|
16 | private readonly logger: Logger,
|
17 | private readonly roomsToJoin: string[]) {
|
18 | if (!(roomsToJoin && roomsToJoin.length > 0)) {
|
19 | throw new Error('Must specify at least one room to join');
|
20 | }
|
21 | }
|
22 |
|
23 | |
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 | initialize(localNode: CyclonNode, metadataProviders: { [key: string]: MetadataProvider }) {
|
30 | this.localNode = localNode;
|
31 | this.rtc.connect(metadataProviders, this.roomsToJoin);
|
32 | this.rtc.onChannel('cyclonShuffle', (channel) => this.handleIncomingShuffle(channel));
|
33 | this.rtc.on('incomingTimeout', (channelType: string, sourcePointer: CyclonNodePointer) => {
|
34 | if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
|
35 | this.requireLocalNode().emit('shuffleTimeout', 'incoming', sourcePointer);
|
36 | }
|
37 | });
|
38 | this.rtc.on('incomingError', (channelType, sourcePointer, error) => {
|
39 | if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
|
40 | this.logger.error('An error occurred on an incoming shuffle', error);
|
41 | this.requireLocalNode().emit('shuffleError', 'incoming', sourcePointer, error);
|
42 | }
|
43 | });
|
44 | this.rtc.on('offerReceived', (channelType, sourcePointer) => {
|
45 | if (channelType === CYCLON_SHUFFLE_CHANNEL_TYPE) {
|
46 | this.logger.debug(`Incoming shuffle starting with ${sourcePointer.id}`);
|
47 | this.requireLocalNode().emit('shuffleStarted', 'incoming', sourcePointer);
|
48 | }
|
49 | });
|
50 | }
|
51 |
|
52 | |
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | async sendShuffleRequest(destinationNodePointer: WebRTCCyclonNodePointer, shuffleSet: WebRTCCyclonNodePointer[]): Promise<void> {
|
59 | this.lastShuffleNode = destinationNodePointer;
|
60 | return await this.createOutgoingShuffle(
|
61 | this.shuffleStateFactory.createOutgoingShuffleState(this.requireLocalNode(), destinationNodePointer, shuffleSet),
|
62 | destinationNodePointer);
|
63 | }
|
64 |
|
65 | private async createOutgoingShuffle(outgoingState: OutgoingShuffleState, destinationNodePointer: WebRTCCyclonNodePointer): Promise<void> {
|
66 | try {
|
67 | const channel = await this.rtc.openChannel(CYCLON_SHUFFLE_CHANNEL_TYPE, destinationNodePointer);
|
68 | outgoingState.storeChannel(channel);
|
69 | outgoingState.sendShuffleRequest();
|
70 | await outgoingState.processShuffleResponse();
|
71 | await outgoingState.sendResponseAcknowledgement();
|
72 | } finally {
|
73 | outgoingState.close();
|
74 | }
|
75 | }
|
76 |
|
77 | createNewPointer(): CyclonNodePointer {
|
78 | return this.rtc.createNewPointer();
|
79 | }
|
80 |
|
81 | getLocalId() {
|
82 | return this.rtc.getLocalId();
|
83 | }
|
84 |
|
85 | |
86 |
|
87 |
|
88 | async handleIncomingShuffle(channel: Channel): Promise<void> {
|
89 | const remotePeer = channel.getRemotePeer();
|
90 |
|
91 | const incomingShuffleState = this.shuffleStateFactory.createIncomingShuffleState(this.requireLocalNode(), remotePeer);
|
92 |
|
93 | try {
|
94 | await incomingShuffleState.processShuffleRequest(channel);
|
95 | await incomingShuffleState.waitForResponseAcknowledgement(channel);
|
96 | this.requireLocalNode().emit('shuffleCompleted', 'incoming', remotePeer);
|
97 | } catch (e) {
|
98 | if (e instanceof TimeoutError) {
|
99 | this.logger.warn(e.message);
|
100 | this.requireLocalNode().emit('shuffleTimeout', 'incoming', remotePeer);
|
101 | } else {
|
102 | this.logger.error('An unknown error occurred on an incoming shuffle', e);
|
103 | this.requireLocalNode().emit('shuffleError', 'incoming', remotePeer, 'unknown');
|
104 | }
|
105 | } finally {
|
106 | channel.close();
|
107 | }
|
108 | }
|
109 |
|
110 | private requireLocalNode(): CyclonNode {
|
111 | if (this.localNode) {
|
112 | return this.localNode;
|
113 | } else {
|
114 | throw new Error('Comms not yet initialized (localNode is not defined)');
|
115 | }
|
116 | }
|
117 | }
|