UNPKG

2.98 kBPlain TextView Raw
1import { Socket, createSocket } from 'dgram';
2import { EventEmitter } from 'events';
3import { AssertionError } from 'assert';
4
5import { Packet } from './packet';
6import { multicastGroup } from './util';
7
8interface Props {
9 universes?: number[];
10 port?: number;
11 iface?: string; // local ip address of network inteface to use
12 reuseAddr?: boolean;
13}
14
15export declare interface Receiver {
16 on(event: 'packet', listener: (packet: Packet) => void): this;
17 on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
18 on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
19 on(event: 'error', listener: (err: Error) => void): this;
20}
21
22export class Receiver extends EventEmitter {
23 private socket: Socket;
24
25 private lastSequence: Record<string, number>;
26
27 private port: Props['port'];
28
29 public universes: Props['universes'];
30
31 private iface: Props['iface'];
32
33 constructor({
34 universes = [1],
35 port = 5568,
36 iface = undefined,
37 reuseAddr = false,
38 }: Props) {
39 super();
40 this.universes = universes;
41 this.port = port;
42 this.iface = iface;
43
44 this.socket = createSocket({ type: 'udp4', reuseAddr });
45 this.lastSequence = {};
46
47 this.socket.on('message', (msg, rinfo) => {
48 try {
49 const packet = new Packet(msg, rinfo.address);
50 if (
51 this.lastSequence[packet.universe] &&
52 Math.abs(this.lastSequence[packet.universe] - packet.sequence) > 20
53 ) {
54 throw new Error(
55 `Packet significantly out of order in universe ${
56 packet.universe
57 } (${this.lastSequence[packet.universe]} -> ${packet.sequence})`,
58 );
59 }
60 this.lastSequence[packet.universe] =
61 packet.sequence === 255 ? -1 : packet.sequence;
62 this.emit('packet', packet);
63 } catch (err) {
64 const event =
65 err instanceof AssertionError
66 ? 'PacketCorruption'
67 : 'PacketOutOfOrder';
68 this.emit(event, err);
69 }
70 });
71 this.socket.bind(this.port, () => {
72 for (const uni of this.universes) {
73 try {
74 this.socket.addMembership(multicastGroup(uni), this.iface);
75 } catch (err) {
76 this.emit('error', err); // emit errors from socket.addMembership
77 }
78 }
79 });
80 }
81
82 public addUniverse(universe: number): this {
83 // already listening to this one; do nothing
84 if (this.universes.includes(universe)) return this;
85
86 this.socket.addMembership(multicastGroup(universe), this.iface);
87 this.universes.push(universe);
88 return this;
89 }
90
91 public removeUniverse(universe: number): this {
92 // not listening to this one; do nothing
93 if (!this.universes.includes(universe)) return this;
94
95 this.socket.dropMembership(multicastGroup(universe), this.iface);
96 this.universes = this.universes.filter((n) => n !== universe);
97 return this;
98 }
99
100 public close(): this {
101 this.socket.close();
102 return this;
103 }
104}