1 | import { Socket, createSocket } from 'dgram';
|
2 | import { EventEmitter } from 'events';
|
3 | import { AssertionError } from 'assert';
|
4 |
|
5 | import { Packet } from './packet';
|
6 | import { multicastGroup } from './util';
|
7 |
|
8 | interface Props {
|
9 | universes?: number[];
|
10 | port?: number;
|
11 | iface?: string;
|
12 | reuseAddr?: boolean;
|
13 | }
|
14 |
|
15 | export declare interface Receiver {
|
16 | on(event: 'packet', listener: (packet: Packet) => void): this;
|
17 | on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
|
18 | on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
|
19 | on(event: 'error', listener: (err: Error) => void): this;
|
20 | }
|
21 |
|
22 | export class Receiver extends EventEmitter {
|
23 | private socket: Socket;
|
24 |
|
25 | private lastSequence: Record<string, number>;
|
26 |
|
27 | private readonly port: Props['port'];
|
28 |
|
29 | public universes: Props['universes'];
|
30 |
|
31 | private readonly iface: Props['iface'];
|
32 |
|
33 | constructor({
|
34 | universes = [1],
|
35 | port = 5568,
|
36 | iface = undefined,
|
37 | reuseAddr = false,
|
38 | }: Props) {
|
39 | super();
|
40 | this.universes = universes;
|
41 | this.port = port;
|
42 | this.iface = iface;
|
43 |
|
44 | this.socket = createSocket({ type: 'udp4', reuseAddr });
|
45 | this.lastSequence = {};
|
46 |
|
47 | this.socket.on('message', (msg, rinfo) => {
|
48 | try {
|
49 | const packet = new Packet(msg, rinfo.address);
|
50 |
|
51 |
|
52 |
|
53 | if (!this.universes.includes(packet.universe)) return;
|
54 |
|
55 | if (
|
56 | this.lastSequence[packet.universe] &&
|
57 | Math.abs(this.lastSequence[packet.universe] - packet.sequence) > 20
|
58 | ) {
|
59 | throw new Error(
|
60 | `Packet significantly out of order in universe ${
|
61 | packet.universe
|
62 | } (${this.lastSequence[packet.universe]} -> ${packet.sequence})`,
|
63 | );
|
64 | }
|
65 | this.lastSequence[packet.universe] =
|
66 | packet.sequence === 255 ? -1 : packet.sequence;
|
67 | this.emit('packet', packet);
|
68 | } catch (err) {
|
69 | const event =
|
70 | err instanceof AssertionError
|
71 | ? 'PacketCorruption'
|
72 | : 'PacketOutOfOrder';
|
73 | this.emit(event, err);
|
74 | }
|
75 | });
|
76 | this.socket.bind(this.port, () => {
|
77 | for (const uni of this.universes) {
|
78 | try {
|
79 | this.socket.addMembership(multicastGroup(uni), this.iface);
|
80 | } catch (err) {
|
81 | this.emit('error', err);
|
82 | }
|
83 | }
|
84 | });
|
85 | }
|
86 |
|
87 | public addUniverse(universe: number): this {
|
88 |
|
89 | if (this.universes.includes(universe)) return this;
|
90 |
|
91 | this.socket.addMembership(multicastGroup(universe), this.iface);
|
92 | this.universes.push(universe);
|
93 | return this;
|
94 | }
|
95 |
|
96 | public removeUniverse(universe: number): this {
|
97 |
|
98 | if (!this.universes.includes(universe)) return this;
|
99 |
|
100 | this.socket.dropMembership(multicastGroup(universe), this.iface);
|
101 | this.universes = this.universes.filter((n) => n !== universe);
|
102 | return this;
|
103 | }
|
104 |
|
105 | public close(callback?: () => void): this {
|
106 | this.socket.close(callback);
|
107 | return this;
|
108 | }
|
109 | }
|