UNPKG

3.21 kBPlain TextView Raw
1import { Socket, createSocket } from 'dgram';
2import { EventEmitter } from 'events';
3import { AssertionError } from 'assert';
4
5import { Packet } from './packet';
6import { multicastGroup } from './util';
7
8interface Props {
9 universes?: number[];
10 port?: number;
11 iface?: string; // local ip address of network inteface to use
12 reuseAddr?: boolean;
13}
14
15export declare interface Receiver {
16 on(event: 'packet', listener: (packet: Packet) => void): this;
17 on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
18 on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
19 on(event: 'error', listener: (err: Error) => void): this;
20}
21
22export class Receiver extends EventEmitter {
23 private socket: Socket;
24
25 private lastSequence: Record<string, number>;
26
27 private readonly port: Props['port'];
28
29 public universes: Props['universes'];
30
31 private readonly iface: Props['iface'];
32
33 constructor({
34 universes = [1],
35 port = 5568,
36 iface = undefined,
37 reuseAddr = false,
38 }: Props) {
39 super();
40 this.universes = universes;
41 this.port = port;
42 this.iface = iface;
43
44 this.socket = createSocket({ type: 'udp4', reuseAddr });
45 this.lastSequence = {};
46
47 this.socket.on('message', (msg, rinfo) => {
48 try {
49 const packet = new Packet(msg, rinfo.address);
50
51 // somehow we received a packet for a universe we're not listening to
52 // silently drop this packet
53 if (!this.universes.includes(packet.universe)) return;
54
55 if (
56 this.lastSequence[packet.universe] &&
57 Math.abs(this.lastSequence[packet.universe] - packet.sequence) > 20
58 ) {
59 throw new Error(
60 `Packet significantly out of order in universe ${
61 packet.universe
62 } (${this.lastSequence[packet.universe]} -> ${packet.sequence})`,
63 );
64 }
65 this.lastSequence[packet.universe] =
66 packet.sequence === 255 ? -1 : packet.sequence;
67 this.emit('packet', packet);
68 } catch (err) {
69 const event =
70 err instanceof AssertionError
71 ? 'PacketCorruption'
72 : 'PacketOutOfOrder';
73 this.emit(event, err);
74 }
75 });
76 this.socket.bind(this.port, () => {
77 for (const uni of this.universes) {
78 try {
79 this.socket.addMembership(multicastGroup(uni), this.iface);
80 } catch (err) {
81 this.emit('error', err); // emit errors from socket.addMembership
82 }
83 }
84 });
85 }
86
87 public addUniverse(universe: number): this {
88 // already listening to this one; do nothing
89 if (this.universes.includes(universe)) return this;
90
91 this.socket.addMembership(multicastGroup(universe), this.iface);
92 this.universes.push(universe);
93 return this;
94 }
95
96 public removeUniverse(universe: number): this {
97 // not listening to this one; do nothing
98 if (!this.universes.includes(universe)) return this;
99
100 this.socket.dropMembership(multicastGroup(universe), this.iface);
101 this.universes = this.universes.filter((n) => n !== universe);
102 return this;
103 }
104
105 public close(callback?: () => void): this {
106 this.socket.close(callback);
107 return this;
108 }
109}