1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | enum ReadState {
|
19 | NO_DATA,
|
20 | READING_SIZE,
|
21 | READING_MESSAGE,
|
22 | }
|
23 |
|
24 | export class StreamDecoder {
|
25 | private readState: ReadState = ReadState.NO_DATA;
|
26 | private readCompressFlag: Buffer = Buffer.alloc(1);
|
27 | private readPartialSize: Buffer = Buffer.alloc(4);
|
28 | private readSizeRemaining = 4;
|
29 | private readMessageSize = 0;
|
30 | private readPartialMessage: Buffer[] = [];
|
31 | private readMessageRemaining = 0;
|
32 |
|
33 | write(data: Buffer): Buffer[] {
|
34 | let readHead = 0;
|
35 | let toRead: number;
|
36 | const result: Buffer[] = [];
|
37 |
|
38 | while (readHead < data.length) {
|
39 | switch (this.readState) {
|
40 | case ReadState.NO_DATA:
|
41 | this.readCompressFlag = data.slice(readHead, readHead + 1);
|
42 | readHead += 1;
|
43 | this.readState = ReadState.READING_SIZE;
|
44 | this.readPartialSize.fill(0);
|
45 | this.readSizeRemaining = 4;
|
46 | this.readMessageSize = 0;
|
47 | this.readMessageRemaining = 0;
|
48 | this.readPartialMessage = [];
|
49 | break;
|
50 | case ReadState.READING_SIZE:
|
51 | toRead = Math.min(data.length - readHead, this.readSizeRemaining);
|
52 | data.copy(
|
53 | this.readPartialSize,
|
54 | 4 - this.readSizeRemaining,
|
55 | readHead,
|
56 | readHead + toRead
|
57 | );
|
58 | this.readSizeRemaining -= toRead;
|
59 | readHead += toRead;
|
60 |
|
61 | if (this.readSizeRemaining === 0) {
|
62 | this.readMessageSize = this.readPartialSize.readUInt32BE(0);
|
63 | this.readMessageRemaining = this.readMessageSize;
|
64 | if (this.readMessageRemaining > 0) {
|
65 | this.readState = ReadState.READING_MESSAGE;
|
66 | } else {
|
67 | const message = Buffer.concat(
|
68 | [this.readCompressFlag, this.readPartialSize],
|
69 | 5
|
70 | );
|
71 |
|
72 | this.readState = ReadState.NO_DATA;
|
73 | result.push(message);
|
74 | }
|
75 | }
|
76 | break;
|
77 | case ReadState.READING_MESSAGE:
|
78 | toRead = Math.min(data.length - readHead, this.readMessageRemaining);
|
79 | this.readPartialMessage.push(data.slice(readHead, readHead + toRead));
|
80 | this.readMessageRemaining -= toRead;
|
81 | readHead += toRead;
|
82 |
|
83 | if (this.readMessageRemaining === 0) {
|
84 |
|
85 | const framedMessageBuffers = [
|
86 | this.readCompressFlag,
|
87 | this.readPartialSize,
|
88 | ].concat(this.readPartialMessage);
|
89 | const framedMessage = Buffer.concat(
|
90 | framedMessageBuffers,
|
91 | this.readMessageSize + 5
|
92 | );
|
93 |
|
94 | this.readState = ReadState.NO_DATA;
|
95 | result.push(framedMessage);
|
96 | }
|
97 | break;
|
98 | default:
|
99 | throw new Error('Unexpected read state');
|
100 | }
|
101 | }
|
102 |
|
103 | return result;
|
104 | }
|
105 | }
|