1 | import {
|
2 | BinaryWriter,
|
3 | common,
|
4 | createSerializeWire,
|
5 | crypto,
|
6 | InvalidFormatError,
|
7 | IOHelper,
|
8 | SerializableWire,
|
9 | SerializeWire,
|
10 | } from '@neo-one/client-common-esnext-esm';
|
11 | import {
|
12 | BinaryReader,
|
13 | Block,
|
14 | ConsensusPayload,
|
15 | deserializeTransactionWire,
|
16 | DeserializeWireBaseOptions,
|
17 | DeserializeWireContext,
|
18 | DeserializeWireOptions,
|
19 | Transaction,
|
20 | } from '@neo-one/node-core-esnext-esm';
|
21 | import { makeErrorWithCode, utils } from '@neo-one/utils-esnext-esm';
|
22 | import { Transform } from 'stream';
|
23 | import { assertCommand, Command } from './Command';
|
24 | import {
|
25 | AddrPayload,
|
26 | FilterAddPayload,
|
27 | FilterLoadPayload,
|
28 | GetBlocksPayload,
|
29 | HeadersPayload,
|
30 | InvPayload,
|
31 | MerkleBlockPayload,
|
32 | VersionPayload,
|
33 | } from './payload';
|
34 | export type MessageValue =
|
35 | | { readonly command: Command.addr; readonly payload: AddrPayload }
|
36 | | { readonly command: Command.block; readonly payload: Block }
|
37 | | { readonly command: Command.consensus; readonly payload: ConsensusPayload }
|
38 | | { readonly command: Command.filteradd; readonly payload: FilterAddPayload }
|
39 | | { readonly command: Command.filterclear }
|
40 | | { readonly command: Command.filterload; readonly payload: FilterLoadPayload }
|
41 | | { readonly command: Command.getaddr }
|
42 | | { readonly command: Command.getblocks; readonly payload: GetBlocksPayload }
|
43 | | { readonly command: Command.getdata; readonly payload: InvPayload }
|
44 | | { readonly command: Command.getheaders; readonly payload: GetBlocksPayload }
|
45 | | { readonly command: Command.headers; readonly payload: HeadersPayload }
|
46 | | { readonly command: Command.inv; readonly payload: InvPayload }
|
47 | | { readonly command: Command.mempool }
|
48 | | { readonly command: Command.tx; readonly payload: Transaction }
|
49 | | { readonly command: Command.verack }
|
50 | | { readonly command: Command.version; readonly payload: VersionPayload }
|
51 | | { readonly command: Command.alert }
|
52 | | { readonly command: Command.merkleblock; readonly payload: MerkleBlockPayload }
|
53 | | { readonly command: Command.notfound }
|
54 | | { readonly command: Command.ping }
|
55 | | { readonly command: Command.pong }
|
56 | | { readonly command: Command.reject };
|
57 | export interface MessageAdd {
|
58 | readonly magic: number;
|
59 | readonly value: MessageValue;
|
60 | }
|
61 |
|
62 | export const COMMAND_LENGTH = 12;
|
63 | export const PAYLOAD_MAX_SIZE = 0x02000000;
|
64 |
|
65 | const calculateChecksum = (buffer: Buffer) => common.toUInt32LE(crypto.hash256(buffer));
|
66 |
|
67 | const deserializeMessageHeader = ({
|
68 | context,
|
69 | reader,
|
70 | }: DeserializeWireBaseOptions): {
|
71 | readonly command: Command;
|
72 | readonly length: number;
|
73 | readonly checksum: number;
|
74 | } => {
|
75 | if (reader.readUInt32LE() !== context.messageMagic) {
|
76 | throw new InvalidFormatError();
|
77 | }
|
78 | const command = assertCommand(reader.readFixedString(COMMAND_LENGTH));
|
79 | const length = reader.readUInt32LE();
|
80 | if (length > PAYLOAD_MAX_SIZE) {
|
81 | throw new InvalidFormatError();
|
82 | }
|
83 | const checksum = reader.readUInt32LE();
|
84 |
|
85 | return { command, length, checksum };
|
86 | };
|
87 |
|
88 | export class Message implements SerializableWire<Message> {
|
89 | public static deserializeWireBase(options: DeserializeWireBaseOptions): Message {
|
90 | const { reader, context } = options;
|
91 | const { command, length, checksum } = deserializeMessageHeader(options);
|
92 | const payloadBuffer = reader.readBytes(length);
|
93 | if (calculateChecksum(payloadBuffer) !== checksum) {
|
94 | throw new InvalidFormatError();
|
95 | }
|
96 |
|
97 | const payloadOptions = {
|
98 | context: options.context,
|
99 | buffer: payloadBuffer,
|
100 | };
|
101 |
|
102 | let value: MessageValue;
|
103 | switch (command) {
|
104 | case Command.addr:
|
105 | value = {
|
106 | command: Command.addr,
|
107 | payload: AddrPayload.deserializeWire(payloadOptions),
|
108 | };
|
109 |
|
110 | break;
|
111 | case Command.block:
|
112 | value = {
|
113 | command: Command.block,
|
114 | payload: Block.deserializeWire(payloadOptions),
|
115 | };
|
116 |
|
117 | break;
|
118 | case Command.consensus:
|
119 | value = {
|
120 | command: Command.consensus,
|
121 | payload: ConsensusPayload.deserializeWire(payloadOptions),
|
122 | };
|
123 |
|
124 | break;
|
125 | case Command.filteradd:
|
126 | value = {
|
127 | command: Command.filteradd,
|
128 | payload: FilterAddPayload.deserializeWire(payloadOptions),
|
129 | };
|
130 |
|
131 | break;
|
132 | case Command.filterclear:
|
133 | value = { command: Command.filterclear };
|
134 | break;
|
135 | case Command.filterload:
|
136 | value = {
|
137 | command: Command.filterload,
|
138 | payload: FilterLoadPayload.deserializeWire(payloadOptions),
|
139 | };
|
140 |
|
141 | break;
|
142 | case Command.getaddr:
|
143 | value = { command: Command.getaddr };
|
144 | break;
|
145 | case Command.getblocks:
|
146 | value = {
|
147 | command: Command.getblocks,
|
148 | payload: GetBlocksPayload.deserializeWire(payloadOptions),
|
149 | };
|
150 |
|
151 | break;
|
152 | case Command.getdata:
|
153 | value = {
|
154 | command: Command.getdata,
|
155 | payload: InvPayload.deserializeWire(payloadOptions),
|
156 | };
|
157 |
|
158 | break;
|
159 | case Command.getheaders:
|
160 | value = {
|
161 | command: Command.getheaders,
|
162 | payload: GetBlocksPayload.deserializeWire(payloadOptions),
|
163 | };
|
164 |
|
165 | break;
|
166 | case Command.headers:
|
167 | value = {
|
168 | command: Command.headers,
|
169 | payload: HeadersPayload.deserializeWire(payloadOptions),
|
170 | };
|
171 |
|
172 | break;
|
173 | case Command.inv:
|
174 | value = {
|
175 | command: Command.inv,
|
176 | payload: InvPayload.deserializeWire(payloadOptions),
|
177 | };
|
178 |
|
179 | break;
|
180 | case Command.mempool:
|
181 | value = { command: Command.mempool };
|
182 | break;
|
183 | case Command.tx:
|
184 | value = {
|
185 | command: Command.tx,
|
186 | payload: deserializeTransactionWire(payloadOptions),
|
187 | };
|
188 |
|
189 | break;
|
190 | case Command.verack:
|
191 | value = { command: Command.verack };
|
192 | break;
|
193 | case Command.version:
|
194 | value = {
|
195 | command: Command.version,
|
196 | payload: VersionPayload.deserializeWire(payloadOptions),
|
197 | };
|
198 |
|
199 | break;
|
200 | case Command.alert:
|
201 | value = { command: Command.alert };
|
202 | break;
|
203 | case Command.merkleblock:
|
204 | value = {
|
205 | command: Command.merkleblock,
|
206 | payload: MerkleBlockPayload.deserializeWire(payloadOptions),
|
207 | };
|
208 |
|
209 | break;
|
210 | case Command.notfound:
|
211 | value = { command: Command.notfound };
|
212 | break;
|
213 | case Command.ping:
|
214 | value = { command: Command.ping };
|
215 | break;
|
216 | case Command.pong:
|
217 | value = { command: Command.pong };
|
218 | break;
|
219 | case Command.reject:
|
220 | value = { command: Command.reject };
|
221 | break;
|
222 | default:
|
223 | utils.assertNever(command);
|
224 | throw new InvalidFormatError();
|
225 | }
|
226 |
|
227 | return new this({ magic: context.messageMagic, value });
|
228 | }
|
229 |
|
230 | public static deserializeWire(options: DeserializeWireOptions): Message {
|
231 | return this.deserializeWireBase({
|
232 | context: options.context,
|
233 | reader: new BinaryReader(options.buffer),
|
234 | });
|
235 | }
|
236 |
|
237 | public readonly magic: number;
|
238 | public readonly value: MessageValue;
|
239 | public readonly serializeWire: SerializeWire = createSerializeWire(this.serializeWireBase.bind(this));
|
240 |
|
241 | public constructor({ magic, value }: MessageAdd) {
|
242 | this.magic = magic;
|
243 | this.value = value;
|
244 | }
|
245 |
|
246 | public serializeWireBase(writer: BinaryWriter): void {
|
247 | const { value } = this;
|
248 |
|
249 | writer.writeUInt32LE(this.magic);
|
250 | writer.writeFixedString(value.command, COMMAND_LENGTH);
|
251 |
|
252 | let payload = Buffer.alloc(0);
|
253 | switch (value.command) {
|
254 | case Command.addr:
|
255 | payload = value.payload.serializeWire();
|
256 | break;
|
257 | case Command.block:
|
258 | payload = value.payload.serializeWire();
|
259 | break;
|
260 | case Command.consensus:
|
261 | payload = value.payload.serializeWire();
|
262 | break;
|
263 | case Command.filteradd:
|
264 | payload = value.payload.serializeWire();
|
265 | break;
|
266 | case Command.filterclear:
|
267 | break;
|
268 | case Command.filterload:
|
269 | payload = value.payload.serializeWire();
|
270 | break;
|
271 | case Command.getaddr:
|
272 | break;
|
273 | case Command.getblocks:
|
274 | payload = value.payload.serializeWire();
|
275 | break;
|
276 | case Command.getdata:
|
277 | payload = value.payload.serializeWire();
|
278 | break;
|
279 | case Command.getheaders:
|
280 | payload = value.payload.serializeWire();
|
281 | break;
|
282 | case Command.headers:
|
283 | payload = value.payload.serializeWire();
|
284 | break;
|
285 | case Command.inv:
|
286 | payload = value.payload.serializeWire();
|
287 | break;
|
288 | case Command.mempool:
|
289 | break;
|
290 | case Command.tx:
|
291 | payload = value.payload.serializeWire();
|
292 | break;
|
293 | case Command.verack:
|
294 | break;
|
295 | case Command.version:
|
296 | payload = value.payload.serializeWire();
|
297 | break;
|
298 | case Command.alert:
|
299 | break;
|
300 | case Command.merkleblock:
|
301 | payload = value.payload.serializeWire();
|
302 | break;
|
303 | case Command.notfound:
|
304 | break;
|
305 | case Command.ping:
|
306 | break;
|
307 | case Command.pong:
|
308 | break;
|
309 | case Command.reject:
|
310 | break;
|
311 | default:
|
312 | utils.assertNever(value);
|
313 | throw new InvalidFormatError();
|
314 | }
|
315 |
|
316 | writer.writeUInt32LE(payload.length);
|
317 | writer.writeUInt32LE(calculateChecksum(payload));
|
318 | writer.writeBytes(payload);
|
319 | }
|
320 | }
|
321 |
|
322 | export const InvalidMessageTransformEncodingError = makeErrorWithCode(
|
323 | 'INVALID_MESSAGE_TRANSFORM_ENCODING',
|
324 | (message: string) => message,
|
325 | );
|
326 |
|
327 | const SIZE_OF_MESSAGE_HEADER =
|
328 | IOHelper.sizeOfUInt32LE +
|
329 | IOHelper.sizeOfFixedString(COMMAND_LENGTH) +
|
330 | IOHelper.sizeOfUInt32LE +
|
331 | IOHelper.sizeOfUInt32LE;
|
332 |
|
333 | export class MessageTransform extends Transform {
|
334 | public readonly context: DeserializeWireContext;
|
335 | public mutableBuffer: Buffer;
|
336 |
|
337 | public constructor(context: DeserializeWireContext) {
|
338 | super({ readableObjectMode: true });
|
339 | this.context = context;
|
340 | this.mutableBuffer = Buffer.from([]);
|
341 | }
|
342 |
|
343 | public _transform(
|
344 | chunk: Buffer | string,
|
345 | encoding: string,
|
346 | callback: (error: Error | undefined, data?: Buffer | string) => void,
|
347 | ): void {
|
348 | if (typeof chunk === 'string') {
|
349 | throw new InvalidMessageTransformEncodingError(
|
350 | `Invalid Message Transform Chunk Type. Expected chunk type to be 'string', found: ${typeof chunk}`,
|
351 | );
|
352 | }
|
353 | if (encoding !== 'buffer') {
|
354 | throw new InvalidMessageTransformEncodingError(
|
355 | `Invalid Message Transform Encoding. Expected: 'buffer', found: ${encoding}`,
|
356 | );
|
357 | }
|
358 |
|
359 | this.mutableBuffer = Buffer.concat([this.mutableBuffer, chunk]);
|
360 | try {
|
361 | const { remainingBuffer, mutableMessages } = this.processBuffer(new BinaryReader(this.mutableBuffer));
|
362 |
|
363 | this.mutableBuffer = remainingBuffer;
|
364 | mutableMessages.forEach((message) => this.push(message));
|
365 | callback(undefined);
|
366 | } catch (error) {
|
367 | callback(error);
|
368 | }
|
369 | }
|
370 |
|
371 | private processBuffer(
|
372 | reader: BinaryReader,
|
373 | ): {
|
374 | readonly remainingBuffer: Buffer;
|
375 | readonly mutableMessages: Message[];
|
376 | } {
|
377 | if (reader.remaining < SIZE_OF_MESSAGE_HEADER) {
|
378 | return { remainingBuffer: reader.remainingBuffer, mutableMessages: [] };
|
379 | }
|
380 |
|
381 | const { length } = deserializeMessageHeader({
|
382 | context: this.context,
|
383 | reader: reader.clone(),
|
384 | });
|
385 |
|
386 | if (reader.remaining < SIZE_OF_MESSAGE_HEADER + length) {
|
387 | return { remainingBuffer: reader.remainingBuffer, mutableMessages: [] };
|
388 | }
|
389 |
|
390 | const message = Message.deserializeWireBase({
|
391 | context: this.context,
|
392 | reader,
|
393 | });
|
394 |
|
395 | const { remainingBuffer, mutableMessages } = this.processBuffer(reader);
|
396 | mutableMessages.push(message);
|
397 |
|
398 | return { remainingBuffer, mutableMessages };
|
399 | }
|
400 | }
|