UNPKG

11.7 kBPlain TextView Raw
1import {
2 BinaryWriter,
3 common,
4 createSerializeWire,
5 crypto,
6 InvalidFormatError,
7 IOHelper,
8 SerializableWire,
9 SerializeWire,
10} from '@neo-one/client-common-esnext-esm';
11import {
12 BinaryReader,
13 Block,
14 ConsensusPayload,
15 deserializeTransactionWire,
16 DeserializeWireBaseOptions,
17 DeserializeWireContext,
18 DeserializeWireOptions,
19 Transaction,
20} from '@neo-one/node-core-esnext-esm';
21import { makeErrorWithCode, utils } from '@neo-one/utils-esnext-esm';
22import { Transform } from 'stream';
23import { assertCommand, Command } from './Command';
24import {
25 AddrPayload,
26 FilterAddPayload,
27 FilterLoadPayload,
28 GetBlocksPayload,
29 HeadersPayload,
30 InvPayload,
31 MerkleBlockPayload,
32 VersionPayload,
33} from './payload';
34export type MessageValue =
35 | { readonly command: Command.addr; readonly payload: AddrPayload }
36 | { readonly command: Command.block; readonly payload: Block }
37 | { readonly command: Command.consensus; readonly payload: ConsensusPayload }
38 | { readonly command: Command.filteradd; readonly payload: FilterAddPayload }
39 | { readonly command: Command.filterclear }
40 | { readonly command: Command.filterload; readonly payload: FilterLoadPayload }
41 | { readonly command: Command.getaddr }
42 | { readonly command: Command.getblocks; readonly payload: GetBlocksPayload }
43 | { readonly command: Command.getdata; readonly payload: InvPayload }
44 | { readonly command: Command.getheaders; readonly payload: GetBlocksPayload }
45 | { readonly command: Command.headers; readonly payload: HeadersPayload }
46 | { readonly command: Command.inv; readonly payload: InvPayload }
47 | { readonly command: Command.mempool }
48 | { readonly command: Command.tx; readonly payload: Transaction }
49 | { readonly command: Command.verack }
50 | { readonly command: Command.version; readonly payload: VersionPayload }
51 | { readonly command: Command.alert }
52 | { readonly command: Command.merkleblock; readonly payload: MerkleBlockPayload }
53 | { readonly command: Command.notfound }
54 | { readonly command: Command.ping }
55 | { readonly command: Command.pong }
56 | { readonly command: Command.reject };
57export interface MessageAdd {
58 readonly magic: number;
59 readonly value: MessageValue;
60}
61
62export const COMMAND_LENGTH = 12;
63export const PAYLOAD_MAX_SIZE = 0x02000000;
64
65const calculateChecksum = (buffer: Buffer) => common.toUInt32LE(crypto.hash256(buffer));
66
67const deserializeMessageHeader = ({
68 context,
69 reader,
70}: DeserializeWireBaseOptions): {
71 readonly command: Command;
72 readonly length: number;
73 readonly checksum: number;
74} => {
75 if (reader.readUInt32LE() !== context.messageMagic) {
76 throw new InvalidFormatError();
77 }
78 const command = assertCommand(reader.readFixedString(COMMAND_LENGTH));
79 const length = reader.readUInt32LE();
80 if (length > PAYLOAD_MAX_SIZE) {
81 throw new InvalidFormatError();
82 }
83 const checksum = reader.readUInt32LE();
84
85 return { command, length, checksum };
86};
87
88export class Message implements SerializableWire<Message> {
89 public static deserializeWireBase(options: DeserializeWireBaseOptions): Message {
90 const { reader, context } = options;
91 const { command, length, checksum } = deserializeMessageHeader(options);
92 const payloadBuffer = reader.readBytes(length);
93 if (calculateChecksum(payloadBuffer) !== checksum) {
94 throw new InvalidFormatError();
95 }
96
97 const payloadOptions = {
98 context: options.context,
99 buffer: payloadBuffer,
100 };
101
102 let value: MessageValue;
103 switch (command) {
104 case Command.addr:
105 value = {
106 command: Command.addr,
107 payload: AddrPayload.deserializeWire(payloadOptions),
108 };
109
110 break;
111 case Command.block:
112 value = {
113 command: Command.block,
114 payload: Block.deserializeWire(payloadOptions),
115 };
116
117 break;
118 case Command.consensus:
119 value = {
120 command: Command.consensus,
121 payload: ConsensusPayload.deserializeWire(payloadOptions),
122 };
123
124 break;
125 case Command.filteradd:
126 value = {
127 command: Command.filteradd,
128 payload: FilterAddPayload.deserializeWire(payloadOptions),
129 };
130
131 break;
132 case Command.filterclear:
133 value = { command: Command.filterclear };
134 break;
135 case Command.filterload:
136 value = {
137 command: Command.filterload,
138 payload: FilterLoadPayload.deserializeWire(payloadOptions),
139 };
140
141 break;
142 case Command.getaddr:
143 value = { command: Command.getaddr };
144 break;
145 case Command.getblocks:
146 value = {
147 command: Command.getblocks,
148 payload: GetBlocksPayload.deserializeWire(payloadOptions),
149 };
150
151 break;
152 case Command.getdata:
153 value = {
154 command: Command.getdata,
155 payload: InvPayload.deserializeWire(payloadOptions),
156 };
157
158 break;
159 case Command.getheaders:
160 value = {
161 command: Command.getheaders,
162 payload: GetBlocksPayload.deserializeWire(payloadOptions),
163 };
164
165 break;
166 case Command.headers:
167 value = {
168 command: Command.headers,
169 payload: HeadersPayload.deserializeWire(payloadOptions),
170 };
171
172 break;
173 case Command.inv:
174 value = {
175 command: Command.inv,
176 payload: InvPayload.deserializeWire(payloadOptions),
177 };
178
179 break;
180 case Command.mempool:
181 value = { command: Command.mempool };
182 break;
183 case Command.tx:
184 value = {
185 command: Command.tx,
186 payload: deserializeTransactionWire(payloadOptions),
187 };
188
189 break;
190 case Command.verack:
191 value = { command: Command.verack };
192 break;
193 case Command.version:
194 value = {
195 command: Command.version,
196 payload: VersionPayload.deserializeWire(payloadOptions),
197 };
198
199 break;
200 case Command.alert:
201 value = { command: Command.alert };
202 break;
203 case Command.merkleblock:
204 value = {
205 command: Command.merkleblock,
206 payload: MerkleBlockPayload.deserializeWire(payloadOptions),
207 };
208
209 break;
210 case Command.notfound:
211 value = { command: Command.notfound };
212 break;
213 case Command.ping:
214 value = { command: Command.ping };
215 break;
216 case Command.pong:
217 value = { command: Command.pong };
218 break;
219 case Command.reject:
220 value = { command: Command.reject };
221 break;
222 default:
223 utils.assertNever(command);
224 throw new InvalidFormatError();
225 }
226
227 return new this({ magic: context.messageMagic, value });
228 }
229
230 public static deserializeWire(options: DeserializeWireOptions): Message {
231 return this.deserializeWireBase({
232 context: options.context,
233 reader: new BinaryReader(options.buffer),
234 });
235 }
236
237 public readonly magic: number;
238 public readonly value: MessageValue;
239 public readonly serializeWire: SerializeWire = createSerializeWire(this.serializeWireBase.bind(this));
240
241 public constructor({ magic, value }: MessageAdd) {
242 this.magic = magic;
243 this.value = value;
244 }
245
246 public serializeWireBase(writer: BinaryWriter): void {
247 const { value } = this;
248
249 writer.writeUInt32LE(this.magic);
250 writer.writeFixedString(value.command, COMMAND_LENGTH);
251
252 let payload = Buffer.alloc(0);
253 switch (value.command) {
254 case Command.addr:
255 payload = value.payload.serializeWire();
256 break;
257 case Command.block:
258 payload = value.payload.serializeWire();
259 break;
260 case Command.consensus:
261 payload = value.payload.serializeWire();
262 break;
263 case Command.filteradd:
264 payload = value.payload.serializeWire();
265 break;
266 case Command.filterclear:
267 break;
268 case Command.filterload:
269 payload = value.payload.serializeWire();
270 break;
271 case Command.getaddr:
272 break;
273 case Command.getblocks:
274 payload = value.payload.serializeWire();
275 break;
276 case Command.getdata:
277 payload = value.payload.serializeWire();
278 break;
279 case Command.getheaders:
280 payload = value.payload.serializeWire();
281 break;
282 case Command.headers:
283 payload = value.payload.serializeWire();
284 break;
285 case Command.inv:
286 payload = value.payload.serializeWire();
287 break;
288 case Command.mempool:
289 break;
290 case Command.tx:
291 payload = value.payload.serializeWire();
292 break;
293 case Command.verack:
294 break;
295 case Command.version:
296 payload = value.payload.serializeWire();
297 break;
298 case Command.alert:
299 break;
300 case Command.merkleblock:
301 payload = value.payload.serializeWire();
302 break;
303 case Command.notfound:
304 break;
305 case Command.ping:
306 break;
307 case Command.pong:
308 break;
309 case Command.reject:
310 break;
311 default:
312 utils.assertNever(value);
313 throw new InvalidFormatError();
314 }
315
316 writer.writeUInt32LE(payload.length);
317 writer.writeUInt32LE(calculateChecksum(payload));
318 writer.writeBytes(payload);
319 }
320}
321
322export const InvalidMessageTransformEncodingError = makeErrorWithCode(
323 'INVALID_MESSAGE_TRANSFORM_ENCODING',
324 (message: string) => message,
325);
326
327const SIZE_OF_MESSAGE_HEADER =
328 IOHelper.sizeOfUInt32LE +
329 IOHelper.sizeOfFixedString(COMMAND_LENGTH) +
330 IOHelper.sizeOfUInt32LE +
331 IOHelper.sizeOfUInt32LE;
332
333export class MessageTransform extends Transform {
334 public readonly context: DeserializeWireContext;
335 public mutableBuffer: Buffer;
336
337 public constructor(context: DeserializeWireContext) {
338 super({ readableObjectMode: true });
339 this.context = context;
340 this.mutableBuffer = Buffer.from([]);
341 }
342
343 public _transform(
344 chunk: Buffer | string,
345 encoding: string,
346 callback: (error: Error | undefined, data?: Buffer | string) => void,
347 ): void {
348 if (typeof chunk === 'string') {
349 throw new InvalidMessageTransformEncodingError(
350 `Invalid Message Transform Chunk Type. Expected chunk type to be 'string', found: ${typeof chunk}`,
351 );
352 }
353 if (encoding !== 'buffer') {
354 throw new InvalidMessageTransformEncodingError(
355 `Invalid Message Transform Encoding. Expected: 'buffer', found: ${encoding}`,
356 );
357 }
358
359 this.mutableBuffer = Buffer.concat([this.mutableBuffer, chunk]);
360 try {
361 const { remainingBuffer, mutableMessages } = this.processBuffer(new BinaryReader(this.mutableBuffer));
362
363 this.mutableBuffer = remainingBuffer;
364 mutableMessages.forEach((message) => this.push(message));
365 callback(undefined);
366 } catch (error) {
367 callback(error);
368 }
369 }
370
371 private processBuffer(
372 reader: BinaryReader,
373 ): {
374 readonly remainingBuffer: Buffer;
375 readonly mutableMessages: Message[];
376 } {
377 if (reader.remaining < SIZE_OF_MESSAGE_HEADER) {
378 return { remainingBuffer: reader.remainingBuffer, mutableMessages: [] };
379 }
380
381 const { length } = deserializeMessageHeader({
382 context: this.context,
383 reader: reader.clone(),
384 });
385
386 if (reader.remaining < SIZE_OF_MESSAGE_HEADER + length) {
387 return { remainingBuffer: reader.remainingBuffer, mutableMessages: [] };
388 }
389
390 const message = Message.deserializeWireBase({
391 context: this.context,
392 reader,
393 });
394
395 const { remainingBuffer, mutableMessages } = this.processBuffer(reader);
396 mutableMessages.push(message);
397
398 return { remainingBuffer, mutableMessages };
399 }
400}