1 | import { Socket, createSocket } from 'dgram';
|
2 | import { EventEmitter } from 'events';
|
3 | import { AssertionError } from 'assert';
|
4 |
|
5 | import { Packet } from './packet';
|
6 | import { multicastGroup } from './util';
|
7 |
|
8 | interface Props {
|
9 | universes?: number[];
|
10 | port?: number;
|
11 | iface?: string;
|
12 | reuseAddr?: boolean;
|
13 | }
|
14 |
|
15 | export declare interface Receiver {
|
16 | on(event: 'packet', listener: (packet: Packet) => void): this;
|
17 | on(event: 'PacketCorruption', listener: (err: AssertionError) => void): this;
|
18 | on(event: 'PacketOutOfOrder', listener: (err: Error) => void): this;
|
19 | on(event: 'error', listener: (err: Error) => void): this;
|
20 | }
|
21 |
|
22 | export class Receiver extends EventEmitter {
|
23 | private socket: Socket;
|
24 |
|
25 | private lastSequence: Record<string, number>;
|
26 |
|
27 | private port: Props['port'];
|
28 |
|
29 | public universes: Props['universes'];
|
30 |
|
31 | private iface: Props['iface'];
|
32 |
|
33 | constructor({
|
34 | universes = [1],
|
35 | port = 5568,
|
36 | iface = undefined,
|
37 | reuseAddr = false,
|
38 | }: Props) {
|
39 | super();
|
40 | this.universes = universes;
|
41 | this.port = port;
|
42 | this.iface = iface;
|
43 |
|
44 | this.socket = createSocket({ type: 'udp4', reuseAddr });
|
45 | this.lastSequence = {};
|
46 |
|
47 | this.socket.on('message', (msg, rinfo) => {
|
48 | try {
|
49 | const packet = new Packet(msg, rinfo.address);
|
50 | if (
|
51 | this.lastSequence[packet.universe] &&
|
52 | Math.abs(this.lastSequence[packet.universe] - packet.sequence) > 20
|
53 | ) {
|
54 | throw new Error(
|
55 | `Packet significantly out of order in universe ${
|
56 | packet.universe
|
57 | } (${this.lastSequence[packet.universe]} -> ${packet.sequence})`,
|
58 | );
|
59 | }
|
60 | this.lastSequence[packet.universe] =
|
61 | packet.sequence === 255 ? -1 : packet.sequence;
|
62 | this.emit('packet', packet);
|
63 | } catch (err) {
|
64 | const event =
|
65 | err instanceof AssertionError
|
66 | ? 'PacketCorruption'
|
67 | : 'PacketOutOfOrder';
|
68 | this.emit(event, err);
|
69 | }
|
70 | });
|
71 | this.socket.bind(this.port, () => {
|
72 | for (const uni of this.universes) {
|
73 | try {
|
74 | this.socket.addMembership(multicastGroup(uni), this.iface);
|
75 | } catch (err) {
|
76 | this.emit('error', err);
|
77 | }
|
78 | }
|
79 | });
|
80 | }
|
81 |
|
82 | public addUniverse(universe: number): this {
|
83 |
|
84 | if (this.universes.includes(universe)) return this;
|
85 |
|
86 | this.socket.addMembership(multicastGroup(universe), this.iface);
|
87 | this.universes.push(universe);
|
88 | return this;
|
89 | }
|
90 |
|
91 | public removeUniverse(universe: number): this {
|
92 |
|
93 | if (!this.universes.includes(universe)) return this;
|
94 |
|
95 | this.socket.dropMembership(multicastGroup(universe), this.iface);
|
96 | this.universes = this.universes.filter((n) => n !== universe);
|
97 | return this;
|
98 | }
|
99 |
|
100 | public close(): this {
|
101 | this.socket.close();
|
102 | return this;
|
103 | }
|
104 | }
|