UNPKG

3.77 kBPlain TextView Raw
1import url from 'url';
2import {randomSample} from 'cyclon.p2p-common';
3import {SignallingSocket} from 'cyclon.p2p-rtc-client/lib/SignallingSocket';
4import {HttpRequestService} from 'cyclon.p2p-rtc-client';
5import {CyclonNode, CyclonNodePointer} from 'cyclon.p2p';
6import {SignallingServerSpec} from 'cyclon.p2p-rtc-client/lib/SignallingServerSpec';
7import {allSettled, SettledPromise} from './PromiseTools';
8
9const API_PATH = './api/peers';
10
11export class SignallingServerBootstrap {
12
13 constructor(private readonly signallingSocket: SignallingSocket,
14 private readonly httpRequestService: HttpRequestService,
15 private readonly roomsToBootstrapFrom: string[]) {
16 if (!(roomsToBootstrapFrom && roomsToBootstrapFrom.length > 0)) {
17 throw new Error('Must specify at least one room to bootstrap from');
18 }
19 }
20
21 /**
22 * Fetch a list of registered peers from the server
23 */
24 getInitialPeerSet(cyclonNode: CyclonNode, limit: number): Promise<any> {
25
26 const serverSpecs = this.signallingSocket.getCurrentServerSpecs();
27 if (serverSpecs.length > 0) {
28
29 const roomToSampleFrom = this.roomsToBootstrapFrom[Math.floor(Math.random() * this.roomsToBootstrapFrom.length)];
30 const specPromises: Promise<CyclonNodePointer[]>[] = serverSpecs.map((serverSpec) => {
31 return this.getInitialPeerSetFromServer(cyclonNode, serverSpec, limit, roomToSampleFrom);
32 });
33
34 return allSettled(specPromises).then((results) => {
35 const allResults = SignallingServerBootstrap.collateSuccessfulResults(results);
36 return randomSample(SignallingServerBootstrap.deDuplicatePeerList(allResults), limit);
37 });
38 }
39
40 return Promise.reject(new Error('Not connected to any signalling servers, can\'t bootstrap'));
41 }
42
43 private static collateSuccessfulResults(arrayOfPromises: SettledPromise<CyclonNodePointer[]>[]): CyclonNodePointer[] {
44 return arrayOfPromises.reduce((current: CyclonNodePointer[], next: SettledPromise<CyclonNodePointer[]>) => {
45 // @ts-ignore
46 if (next.status === 'fulfilled') {
47 return current.concat(next.value as CyclonNodePointer[]);
48 } else {
49 return current;
50 }
51 }, []);
52 }
53
54 private static deDuplicatePeerList(arrayOfPeers: CyclonNodePointer[]): CyclonNodePointer[] {
55 const peerMap: { [id: string]: CyclonNodePointer } = {};
56
57 arrayOfPeers.forEach(function (peer) {
58 if (peerMap.hasOwnProperty(peer.id)) {
59 if (peerMap[peer.id].seq < peer.seq) {
60 peerMap[peer.id] = peer;
61 }
62 } else {
63 peerMap[peer.id] = peer;
64 }
65 });
66
67 const uniquePeers = [];
68 for (const nodeId in peerMap) {
69 uniquePeers.push(peerMap[nodeId]);
70 }
71 return uniquePeers;
72 }
73
74 private async getInitialPeerSetFromServer(cyclonNode: CyclonNode, serverSpec: SignallingServerSpec, limit: number, roomToSampleFrom: string): Promise<CyclonNodePointer[]> {
75 const response = await this.httpRequestService.get(SignallingServerBootstrap.generateUrl(serverSpec.signallingApiBase, limit, roomToSampleFrom));
76 return Object.keys(response).filter((peerId) => {
77 return peerId !== cyclonNode.getId();
78 }).map((peerId) => {
79 return response[peerId];
80 });
81 }
82
83 private static generateUrl(apiBase: string, limit: number, room: string): string {
84 //noinspection JSCheckFunctionSignatures
85 return url.resolve(apiBase, API_PATH) + `?room=${room}&limit=${limit}&nocache=${new Date().getTime()}`;
86 }
87}