UNPKG

41.9 kBPlain TextView Raw
1import { common, crypto, UInt256Hex, utils } from '@neo-one/client-common-esnext-esm';
2import { metrics, Monitor } from '@neo-one/monitor-esnext-esm';
3import { Consensus, ConsensusOptions } from '@neo-one/node-consensus-esnext-esm';
4import {
5 Block,
6 Blockchain,
7 ConnectedPeer,
8 ConsensusPayload,
9 createEndpoint,
10 CreateNetwork,
11 Endpoint,
12 getEndpointConfig,
13 Header,
14 MerkleTree,
15 NegotiateResult,
16 Network,
17 NetworkEventMessage,
18 Node as INode,
19 Peer,
20 RegisterTransaction,
21 RelayTransactionResult,
22 Transaction,
23 TransactionType,
24 VerifyTransactionResult,
25} from '@neo-one/node-core-esnext-esm';
26import { finalize, labels, neverComplete, utils as commonUtils } from '@neo-one/utils-esnext-esm';
27import { ScalingBloem } from 'bloem';
28// tslint:disable-next-line:match-default-export-name
29import BloomFilter from 'bloom-filter';
30import BN from 'bn.js';
31import fetch from 'cross-fetch';
32import { Address6 } from 'ip-address';
33import _ from 'lodash';
34import LRU from 'lru-cache';
35import { combineLatest, defer, Observable, of as _of } from 'rxjs';
36import { distinctUntilChanged, map, switchMap, take } from 'rxjs/operators';
37import { Command } from './Command';
38import { AlreadyConnectedError, NegotiationError } from './errors';
39import { Message, MessageTransform, MessageValue } from './Message';
40import {
41 AddrPayload,
42 FilterAddPayload,
43 FilterLoadPayload,
44 GetBlocksPayload,
45 HeadersPayload,
46 InventoryType,
47 InvPayload,
48 MerkleBlockPayload,
49 NetworkAddress,
50 SERVICES,
51 VersionPayload,
52} from './payload';
53import { PeerData } from './PeerData';
54
55const messageReceivedLabelNames: ReadonlyArray<string> = [labels.COMMAND_NAME];
56const messageReceivedLabels = Object.keys(Command).map((command) => ({
57 [labels.COMMAND_NAME]: command,
58}));
59
60const NEO_PROTOCOL_MESSAGES_RECEIVED_TOTAL = metrics.createCounter({
61 name: 'neo_protocol_messages_received_total',
62 labelNames: messageReceivedLabelNames,
63 labels: messageReceivedLabels,
64});
65
66const NEO_PROTOCOL_MESSAGES_FAILURES_TOTAL = metrics.createCounter({
67 name: 'neo_protocol_messages_failures_total',
68 labelNames: messageReceivedLabelNames,
69 labels: messageReceivedLabels,
70});
71
72const NEO_PROTOCOL_MEMPOOL_SIZE = metrics.createGauge({
73 name: 'neo_protocol_mempool_size',
74});
75export interface TransactionAndFee {
76 readonly transaction: Transaction;
77 readonly networkFee: BN;
78}
79
80export interface Environment {
81 readonly externalPort?: number;
82}
83export interface Options {
84 readonly consensus?: {
85 readonly enabled: boolean;
86 readonly options: ConsensusOptions;
87 };
88 readonly rpcURLs?: ReadonlyArray<string>;
89 readonly unhealthyPeerSeconds?: number;
90}
91
92const createPeerBloomFilter = ({
93 filter,
94 k,
95 tweak,
96}: {
97 readonly filter: Buffer;
98 readonly k: number;
99 readonly tweak: number;
100}) =>
101 new BloomFilter({
102 vData: Buffer.from(filter),
103 nHashFuncs: k,
104 nTweak: tweak,
105 });
106
107const createScalingBloomFilter = () =>
108 new ScalingBloem(0.05, {
109 initial_capacity: 100000,
110 scaling: 4,
111 });
112
113const compareTransactionAndFees = (val1: TransactionAndFee, val2: TransactionAndFee) => {
114 const a = val1.networkFee.divn(val1.transaction.size);
115 const b = val2.networkFee.divn(val2.transaction.size);
116 if (a.lt(b)) {
117 return -1;
118 }
119 if (b.lt(a)) {
120 return 1;
121 }
122
123 return val1.transaction.hash.compare(val2.transaction.hash);
124};
125
126const MEM_POOL_SIZE = 5000;
127const GET_ADDR_PEER_COUNT = 200;
128const GET_BLOCKS_COUNT = 500;
129// Assume that we get 500 back, but if not, at least request every 10 seconds
130const GET_BLOCKS_BUFFER = GET_BLOCKS_COUNT / 3;
131const GET_BLOCKS_TIME_MS = 10000;
132const GET_BLOCKS_THROTTLE_MS = 1000;
133const TRIM_MEMPOOL_THROTTLE = 5000;
134const GET_BLOCKS_CLOSE_COUNT = 2;
135const UNHEALTHY_PEER_SECONDS = 300;
136const LOCAL_HOST_ADDRESSES = new Set(['', '0.0.0.0', 'localhost', '127.0.0.1', '::', '::1']);
137
138interface PeerHealth {
139 readonly healthy: boolean;
140 readonly blockIndex: number | undefined;
141 readonly checkTimeSeconds: number;
142}
143
144export class Node implements INode {
145 public get consensus(): Consensus | undefined {
146 return this.mutableConsensus;
147 }
148
149 public get connectedPeers(): ReadonlyArray<Endpoint> {
150 return this.network.connectedPeers.map((peer) => peer.endpoint);
151 }
152
153 public get memPool(): { readonly [hash: string]: Transaction } {
154 return this.mutableMemPool;
155 }
156 public readonly blockchain: Blockchain;
157 // tslint:disable-next-line readonly-keyword
158 private mutableMemPool: { [hash: string]: Transaction };
159 private readonly monitor: Monitor;
160 private readonly network: Network<Message, PeerData>;
161 private readonly options$: Observable<Options>;
162 private readonly externalPort: number;
163 private readonly nonce: number;
164 private readonly userAgent: string;
165 private mutableKnownBlockHashes: ScalingBloem;
166 private readonly tempKnownBlockHashes: Set<UInt256Hex>;
167 private mutableKnownTransactionHashes: ScalingBloem;
168 private readonly tempKnownTransactionHashes: Set<UInt256Hex>;
169 private mutableKnownHeaderHashes: ScalingBloem;
170 private readonly tempKnownHeaderHashes: Set<UInt256Hex>;
171 private mutableGetBlocksRequestsIndex: number | undefined;
172 private mutableGetBlocksRequestTime: number | undefined;
173 private mutableGetBlocksRequestsCount: number;
174 private mutableBestPeer: ConnectedPeer<Message, PeerData> | undefined;
175 private mutableUnhealthyPeerSeconds = UNHEALTHY_PEER_SECONDS;
176 private readonly consensusCache: LRU.Cache<string, ConsensusPayload>;
177 // tslint:disable-next-line readonly-keyword
178 private mutableBlockIndex: { [endpoint: string]: number };
179 private mutableConsensus: Consensus | undefined;
180 private readonly requestBlocks = _.debounce(() => {
181 const peer = this.mutableBestPeer;
182 const previousBlock = this.blockchain.previousBlock;
183 const block = previousBlock === undefined ? this.blockchain.currentBlock : previousBlock;
184 if (peer !== undefined && block.index < peer.data.startHeight) {
185 if (this.mutableGetBlocksRequestsCount > GET_BLOCKS_CLOSE_COUNT) {
186 this.mutableBestPeer = this.findBestPeer(peer);
187 this.network.blacklistAndClose(peer);
188 this.mutableGetBlocksRequestsCount = 0;
189 } else if (this.shouldRequestBlocks()) {
190 if (this.mutableGetBlocksRequestsIndex === block.index) {
191 this.mutableGetBlocksRequestsCount += 1;
192 } else {
193 this.mutableGetBlocksRequestsCount = 1;
194 this.mutableGetBlocksRequestsIndex = block.index;
195 }
196 this.mutableGetBlocksRequestTime = Date.now();
197 this.sendMessage(
198 peer,
199 this.createMessage({
200 command: Command.getblocks,
201 payload: new GetBlocksPayload({
202 hashStart: [block.hash],
203 }),
204 }),
205 );
206 }
207
208 this.requestBlocks();
209 }
210 }, GET_BLOCKS_THROTTLE_MS);
211 private readonly onRequestEndpoints = _.throttle((): void => {
212 this.relay(this.createMessage({ command: Command.getaddr }));
213 // tslint:disable-next-line no-floating-promises
214 this.fetchEndpointsFromRPC();
215 }, 5000);
216
217 // tslint:disable-next-line no-unnecessary-type-annotation
218 private readonly trimMemPool = _.throttle(async (monitor: Monitor): Promise<void> => {
219 const memPool = Object.values(this.mutableMemPool);
220 if (memPool.length > MEM_POOL_SIZE) {
221 await monitor.captureSpan(
222 async () => {
223 const transactionAndFees = await Promise.all(
224 memPool.map<Promise<TransactionAndFee>>(async (transaction) => {
225 const networkFee = await transaction.getNetworkFee({
226 getOutput: this.blockchain.output.get,
227 governingToken: this.blockchain.settings.governingToken,
228 utilityToken: this.blockchain.settings.utilityToken,
229 fees: this.blockchain.settings.fees,
230 registerValidatorFee: this.blockchain.settings.registerValidatorFee,
231 });
232
233 return { transaction, networkFee };
234 }),
235 );
236
237 const hashesToRemove = _.take<TransactionAndFee>(
238 // tslint:disable-next-line no-array-mutation
239 transactionAndFees.slice().sort(compareTransactionAndFees),
240 this.blockchain.settings.memPoolSize,
241 ).map((transactionAndFee) => transactionAndFee.transaction.hashHex);
242 hashesToRemove.forEach((hash) => {
243 // tslint:disable-next-line no-dynamic-delete
244 delete this.mutableMemPool[hash];
245 });
246 NEO_PROTOCOL_MEMPOOL_SIZE.set(Object.keys(this.mutableMemPool).length);
247 },
248 {
249 name: 'neo_protocol_trim_mempool',
250 },
251 );
252 }
253 }, TRIM_MEMPOOL_THROTTLE);
254
255 public constructor({
256 monitor,
257 blockchain,
258 createNetwork,
259 environment = {},
260 options$,
261 }: {
262 readonly monitor: Monitor;
263 readonly blockchain: Blockchain;
264 readonly createNetwork: CreateNetwork;
265 readonly environment?: Environment;
266 readonly options$: Observable<Options>;
267 }) {
268 this.blockchain = blockchain;
269 this.monitor = monitor.at('node_protocol');
270 this.network = createNetwork({
271 negotiate: this.negotiate,
272 checkPeerHealth: this.checkPeerHealth,
273 createMessageTransform: () => new MessageTransform(this.blockchain.deserializeWireContext),
274 onMessageReceived: (peer, message: Message) => {
275 this.onMessageReceived(peer, message);
276 },
277 onRequestEndpoints: this.onRequestEndpoints.bind(this),
278 onEvent: this.onEvent,
279 });
280
281 this.options$ = options$;
282
283 const { externalPort = 0 } = environment;
284 this.externalPort = externalPort;
285 this.nonce = Math.floor(Math.random() * utils.UINT_MAX_NUMBER);
286 this.userAgent = `NEO:neo-one-js:1.0.0-preview`;
287
288 this.mutableMemPool = {};
289 this.mutableKnownBlockHashes = createScalingBloomFilter();
290 this.tempKnownBlockHashes = new Set();
291 this.mutableKnownTransactionHashes = createScalingBloomFilter();
292 this.tempKnownTransactionHashes = new Set();
293 this.mutableKnownHeaderHashes = createScalingBloomFilter();
294 this.tempKnownHeaderHashes = new Set();
295 this.mutableGetBlocksRequestsCount = 1;
296 this.consensusCache = LRU(10000);
297 this.mutableBlockIndex = {};
298 }
299
300 public async reset(): Promise<void> {
301 this.mutableMemPool = {};
302 this.mutableKnownBlockHashes = createScalingBloomFilter();
303 this.tempKnownBlockHashes.clear();
304 this.mutableKnownTransactionHashes = createScalingBloomFilter();
305 this.tempKnownTransactionHashes.clear();
306 this.mutableKnownHeaderHashes = createScalingBloomFilter();
307 this.tempKnownHeaderHashes.clear();
308 this.mutableGetBlocksRequestsCount = 1;
309 this.consensusCache.reset();
310 this.mutableBlockIndex = {};
311 }
312
313 // tslint:disable-next-line no-any
314 public start$(): Observable<any> {
315 const network$ = defer(async () => {
316 this.network.start();
317 this.monitor.log({
318 name: 'neo_protocol_start',
319 message: 'Protocol started.',
320 level: 'verbose',
321 });
322 }).pipe(
323 neverComplete(),
324 finalize(() => {
325 this.network.stop();
326 this.monitor.log({
327 name: 'neo_protocol_stop',
328 message: 'Protocol stopped.',
329 level: 'verbose',
330 });
331 }),
332 );
333
334 const defaultOptions = {
335 enabled: false,
336 options: { privateKey: 'unused', privateNet: false },
337 };
338
339 const consensus$ = this.options$.pipe(
340 map(({ consensus = defaultOptions }) => consensus.enabled),
341 distinctUntilChanged(),
342 switchMap((enabled) => {
343 if (enabled) {
344 const mutableConsensus = new Consensus({
345 monitor: this.monitor,
346 options$: this.options$.pipe(
347 map(({ consensus = defaultOptions }) => consensus.options),
348 distinctUntilChanged(),
349 ),
350
351 node: this,
352 });
353
354 this.mutableConsensus = mutableConsensus;
355
356 return mutableConsensus.start$();
357 }
358
359 return _of(undefined);
360 }),
361 );
362
363 const options$ = this.options$.pipe(
364 map(({ unhealthyPeerSeconds = UNHEALTHY_PEER_SECONDS }) => {
365 this.mutableUnhealthyPeerSeconds = unhealthyPeerSeconds;
366 }),
367 );
368
369 return combineLatest(network$, consensus$, options$);
370 }
371
372 public async relayTransaction(
373 transaction: Transaction,
374 {
375 throwVerifyError = false,
376 forceAdd = false,
377 }: { readonly throwVerifyError?: boolean; readonly forceAdd?: boolean } = {
378 throwVerifyError: false,
379 forceAdd: false,
380 },
381 ): Promise<RelayTransactionResult> {
382 const result = {};
383
384 if (
385 transaction.type === TransactionType.Miner ||
386 (this.mutableMemPool[transaction.hashHex] as Transaction | undefined) !== undefined ||
387 this.tempKnownTransactionHashes.has(transaction.hashHex)
388 ) {
389 return result;
390 }
391
392 if (!this.mutableKnownTransactionHashes.has(transaction.hash)) {
393 this.tempKnownTransactionHashes.add(transaction.hashHex);
394
395 try {
396 const memPool = Object.values(this.mutableMemPool);
397 if (memPool.length > MEM_POOL_SIZE / 2 && !forceAdd) {
398 this.mutableKnownTransactionHashes.add(transaction.hash);
399
400 return result;
401 }
402
403 // tslint:disable-next-line prefer-immediate-return
404 const finalResult = await this.monitor
405 .withData({ [labels.NEO_TRANSACTION_HASH]: transaction.hashHex })
406 .captureSpanLog(
407 async (span) => {
408 let foundTransaction;
409 try {
410 foundTransaction = await this.blockchain.transaction.tryGet({
411 hash: transaction.hash,
412 });
413 } finally {
414 span.setLabels({
415 [labels.NEO_TRANSACTION_FOUND]: foundTransaction !== undefined,
416 });
417 }
418 let verifyResult: VerifyTransactionResult | undefined;
419 if (foundTransaction === undefined) {
420 verifyResult = await this.blockchain.verifyTransaction({
421 monitor: span,
422 transaction,
423 memPool: Object.values(this.mutableMemPool),
424 });
425 const verified = verifyResult.verifications.every(({ failureMessage }) => failureMessage === undefined);
426
427 if (verified) {
428 this.mutableMemPool[transaction.hashHex] = transaction;
429 NEO_PROTOCOL_MEMPOOL_SIZE.inc();
430 if (this.mutableConsensus !== undefined) {
431 this.mutableConsensus.onTransactionReceived(transaction);
432 }
433 this.relayTransactionInternal(transaction);
434 await this.trimMemPool(span);
435 }
436 }
437
438 this.mutableKnownTransactionHashes.add(transaction.hash);
439
440 return { verifyResult };
441 },
442 {
443 name: 'neo_relay_transaction',
444 level: { log: 'verbose', span: 'info' },
445 trace: true,
446 },
447 );
448
449 // tslint:disable-next-line no-var-before-return
450 return finalResult;
451 } catch (error) {
452 if (
453 error.code === undefined ||
454 typeof error.code !== 'string' ||
455 !error.code.includes('VERIFY') ||
456 throwVerifyError
457 ) {
458 throw error;
459 }
460 } finally {
461 this.tempKnownTransactionHashes.delete(transaction.hashHex);
462 }
463 }
464
465 return result;
466 }
467
468 public async relayBlock(block: Block, monitor?: Monitor): Promise<void> {
469 await this.persistBlock(block, monitor);
470 }
471
472 public relayConsensusPayload(payload: ConsensusPayload): void {
473 const message = this.createMessage({
474 command: Command.inv,
475 payload: new InvPayload({
476 type: InventoryType.Consensus,
477 hashes: [payload.hash],
478 }),
479 });
480
481 this.consensusCache.set(payload.hashHex, payload);
482 this.relay(message);
483 }
484
485 public syncMemPool(): void {
486 this.relay(this.createMessage({ command: Command.mempool }));
487 }
488
489 private relay(message: Message): void {
490 this.network.relay(message.serializeWire());
491 }
492
493 private relayTransactionInternal(transaction: Transaction): void {
494 const message = this.createMessage({
495 command: Command.inv,
496 payload: new InvPayload({
497 type: InventoryType.Transaction,
498 hashes: [transaction.hash],
499 }),
500 });
501
502 const messagePayload = message.serializeWire();
503 this.network.connectedPeers.forEach((peer) => {
504 if (peer.relay && this.testFilter(peer.data.mutableBloomFilter, transaction)) {
505 peer.write(messagePayload);
506 }
507 });
508 }
509
510 private sendMessage(peer: Peer<Message> | ConnectedPeer<Message, PeerData>, message: Message): void {
511 peer.write(message.serializeWire());
512 }
513 private readonly negotiate = async (peer: Peer<Message>): Promise<NegotiateResult<PeerData>> => {
514 this.sendMessage(
515 peer,
516 this.createMessage({
517 command: Command.version,
518 payload: new VersionPayload({
519 protocolVersion: 0,
520 services: SERVICES.NODE_NETWORK,
521 timestamp: Math.round(Date.now() / 1000),
522 port: this.externalPort,
523 nonce: this.nonce,
524 userAgent: this.userAgent,
525 startHeight: this.blockchain.currentBlockIndex,
526 relay: true,
527 }),
528 }),
529 );
530
531 const message = await peer.receiveMessage(30000);
532 let versionPayload;
533 if (message.value.command === Command.version) {
534 versionPayload = message.value.payload;
535 } else {
536 throw new NegotiationError(message);
537 }
538
539 this.checkVersion(peer, message, versionPayload);
540
541 const { host } = getEndpointConfig(peer.endpoint);
542 let address;
543 if (NetworkAddress.isValid(host)) {
544 address = new NetworkAddress({
545 host,
546 port: versionPayload.port,
547 timestamp: versionPayload.timestamp,
548 services: versionPayload.services,
549 });
550 }
551
552 this.sendMessage(peer, this.createMessage({ command: Command.verack }));
553
554 const nextMessage = await peer.receiveMessage(30000);
555 if (nextMessage.value.command !== Command.verack) {
556 throw new NegotiationError(nextMessage);
557 }
558
559 return {
560 data: {
561 nonce: versionPayload.nonce,
562 startHeight: versionPayload.startHeight,
563 mutableBloomFilter: undefined,
564 address,
565 },
566
567 relay: versionPayload.relay,
568 };
569 };
570 private readonly checkPeerHealth = (peer: ConnectedPeer<Message, PeerData>, prevHealth?: PeerHealth) => {
571 const checkTimeSeconds = commonUtils.nowSeconds();
572 const blockIndex = this.mutableBlockIndex[peer.endpoint] as number | undefined;
573
574 // If first check -> healthy
575 if (prevHealth === undefined) {
576 return { healthy: true, checkTimeSeconds, blockIndex };
577 }
578
579 // If seen new block -> healthy + update check time
580 if (prevHealth.blockIndex !== undefined && blockIndex !== undefined && prevHealth.blockIndex < blockIndex) {
581 return { healthy: true, checkTimeSeconds, blockIndex };
582 }
583
584 // If not seen a block or a new block BUT it has NOT been a long
585 // time -> healthy
586 if (
587 prevHealth.blockIndex === blockIndex &&
588 commonUtils.nowSeconds() - prevHealth.checkTimeSeconds < this.mutableUnhealthyPeerSeconds
589 ) {
590 return {
591 healthy: true,
592 checkTimeSeconds: prevHealth.checkTimeSeconds,
593 blockIndex: prevHealth.blockIndex,
594 };
595 }
596
597 return { healthy: false, checkTimeSeconds, blockIndex };
598 };
599 private readonly onEvent = (event: NetworkEventMessage<Message, PeerData>) => {
600 if (event.event === 'PEER_CONNECT_SUCCESS') {
601 const { connectedPeer } = event;
602 if (
603 this.mutableBestPeer === undefined ||
604 // Only change best peer at most every 100 blocks
605 this.mutableBestPeer.data.startHeight + 100 < connectedPeer.data.startHeight
606 ) {
607 this.mutableBestPeer = connectedPeer;
608 this.resetRequestBlocks();
609 this.requestBlocks();
610 }
611 } else if (
612 event.event === 'PEER_CLOSED' &&
613 this.mutableBestPeer !== undefined &&
614 this.mutableBestPeer.endpoint === event.peer.endpoint
615 ) {
616 this.mutableBestPeer = this.findBestPeer();
617 this.resetRequestBlocks();
618 this.requestBlocks();
619 }
620 };
621
622 private findBestPeer(bestPeer?: ConnectedPeer<Message, PeerData>): ConnectedPeer<Message, PeerData> | undefined {
623 let peers = this.network.connectedPeers;
624 if (bestPeer !== undefined) {
625 peers = peers.filter((peer) => peer.endpoint !== bestPeer.endpoint);
626 }
627 const result = _.maxBy(peers, (peer) => peer.data.startHeight);
628 if (result === undefined) {
629 return undefined;
630 }
631
632 return _.shuffle(peers.filter((peer) => peer.data.startHeight === result.data.startHeight))[0];
633 }
634
635 private resetRequestBlocks(): void {
636 this.mutableGetBlocksRequestsIndex = undefined;
637 this.mutableGetBlocksRequestsCount = 0;
638 }
639
640 private shouldRequestBlocks(): boolean {
641 const block = this.blockchain.currentBlock;
642 const getBlocksRequestTime = this.mutableGetBlocksRequestTime;
643
644 return (
645 this.mutableGetBlocksRequestsIndex === undefined ||
646 block.index - this.mutableGetBlocksRequestsIndex > GET_BLOCKS_BUFFER ||
647 getBlocksRequestTime === undefined ||
648 Date.now() - getBlocksRequestTime > GET_BLOCKS_TIME_MS
649 );
650 }
651
652 private checkVersion(peer: Peer<Message>, message: Message, version: VersionPayload): void {
653 if (version.nonce === this.nonce) {
654 this.network.permanentlyBlacklist(peer.endpoint);
655 throw new NegotiationError(message, 'Nonce equals my nonce.');
656 }
657
658 const connectedPeer = this.network.connectedPeers.find((otherPeer) => version.nonce === otherPeer.data.nonce);
659
660 if (connectedPeer !== undefined) {
661 throw new AlreadyConnectedError('Already connected to nonce.');
662 }
663 }
664
665 private ready(): boolean {
666 const peer = this.mutableBestPeer;
667 const block = this.blockchain.currentBlock;
668
669 return peer !== undefined && block.index >= peer.data.startHeight;
670 }
671
672 private async fetchEndpointsFromRPC(): Promise<void> {
673 try {
674 await this.doFetchEndpointsFromRPC();
675 } catch {
676 // ignore, logged deeper in the stack
677 }
678 }
679
680 private async doFetchEndpointsFromRPC(): Promise<void> {
681 const { rpcURLs = [] } = await this.options$.pipe(take(1)).toPromise();
682 await Promise.all(rpcURLs.map(async (rpcURL) => this.fetchEndpointsFromRPCURL(rpcURL)));
683 }
684
685 private async fetchEndpointsFromRPCURL(rpcURL: string): Promise<void> {
686 try {
687 const response = await fetch(rpcURL, {
688 method: 'POST',
689 headers: {
690 'Content-Type': 'application/json',
691 },
692 body: JSON.stringify({
693 jsonrpc: '2.0',
694 id: 1,
695 method: 'getpeers',
696 params: [],
697 }),
698 });
699
700 if (!response.ok) {
701 throw new Error(`Failed to fetch peers from ${rpcURL}: ${response.status} ${response.statusText}`);
702 }
703
704 const result = await response.json();
705
706 if (
707 typeof result === 'object' &&
708 result.error !== undefined &&
709 typeof result.error === 'object' &&
710 typeof result.error.code === 'number' &&
711 typeof result.error.message === 'string'
712 ) {
713 throw new Error(result.error);
714 }
715
716 const connected: ReadonlyArray<{ readonly address: string; readonly port: number }> = result.result.connected;
717 connected
718 .map((peer) => {
719 const { address, port } = peer;
720 const host = new Address6(address);
721 const canonicalForm = host.canonicalForm() as string | undefined | null;
722
723 return { host: canonicalForm == undefined ? '' : canonicalForm, port };
724 })
725 .filter((endpoint) => !LOCAL_HOST_ADDRESSES.has(endpoint.host))
726 .map((endpoint) =>
727 createEndpoint({
728 type: 'tcp',
729 host: endpoint.host,
730 port: endpoint.port,
731 }),
732 )
733 .forEach((endpoint) => this.network.addEndpoint(endpoint));
734 } catch (error) {
735 this.monitor.withData({ [this.monitor.labels.HTTP_URL]: rpcURL }).logError({
736 name: 'neo_protocol_fetch_endpoints_error',
737 message: `Failed to fetch endpoints from ${rpcURL}`,
738 error,
739 });
740 }
741 }
742
743 private onMessageReceived(peer: ConnectedPeer<Message, PeerData>, message: Message): void {
744 this.monitor
745 .withLabels({ [labels.COMMAND_NAME]: message.value.command })
746 .withData({ [this.monitor.labels.PEER_ADDRESS]: peer.endpoint })
747 .captureLog(
748 async (monitor) => {
749 switch (message.value.command) {
750 case Command.addr:
751 this.onAddrMessageReceived(monitor, message.value.payload);
752 break;
753 case Command.block:
754 await this.onBlockMessageReceived(monitor, peer, message.value.payload);
755
756 break;
757 case Command.consensus:
758 await this.onConsensusMessageReceived(monitor, message.value.payload);
759
760 break;
761 case Command.filteradd:
762 this.onFilterAddMessageReceived(monitor, peer, message.value.payload);
763
764 break;
765 case Command.filterclear:
766 this.onFilterClearMessageReceived(monitor, peer);
767 break;
768 case Command.filterload:
769 this.onFilterLoadMessageReceived(monitor, peer, message.value.payload);
770
771 break;
772 case Command.getaddr:
773 this.onGetAddrMessageReceived(monitor, peer);
774 break;
775 case Command.getblocks:
776 await this.onGetBlocksMessageReceived(monitor, peer, message.value.payload);
777
778 break;
779 case Command.getdata:
780 await this.onGetDataMessageReceived(monitor, peer, message.value.payload);
781
782 break;
783 case Command.getheaders:
784 await this.onGetHeadersMessageReceived(monitor, peer, message.value.payload);
785
786 break;
787 case Command.headers:
788 await this.onHeadersMessageReceived(monitor, peer, message.value.payload);
789
790 break;
791 case Command.inv:
792 this.onInvMessageReceived(monitor, peer, message.value.payload);
793 break;
794 case Command.mempool:
795 this.onMemPoolMessageReceived(monitor, peer);
796 break;
797 case Command.tx:
798 await this.onTransactionReceived(monitor, message.value.payload);
799 break;
800 case Command.verack:
801 this.onVerackMessageReceived(monitor, peer);
802 break;
803 case Command.version:
804 this.onVersionMessageReceived(monitor, peer);
805 break;
806 case Command.alert:
807 break;
808 case Command.merkleblock:
809 break;
810 case Command.notfound:
811 break;
812 case Command.ping:
813 break;
814 case Command.pong:
815 break;
816 case Command.reject:
817 break;
818 default:
819 commonUtils.assertNever(message.value);
820 }
821 },
822 {
823 name: 'neo_protocol_message_received',
824 level: 'debug',
825 message: `Received ${message.value.command} from ${peer.endpoint}`,
826 metric: NEO_PROTOCOL_MESSAGES_RECEIVED_TOTAL,
827 error: {
828 metric: NEO_PROTOCOL_MESSAGES_FAILURES_TOTAL,
829 message: `Failed to process message ${message.value.command} from ${peer.endpoint}`,
830 },
831 },
832 )
833 .catch(() => {
834 // do nothing
835 });
836 }
837
838 private onAddrMessageReceived(_monitor: Monitor, addr: AddrPayload): void {
839 addr.addresses
840 .filter((address) => !LOCAL_HOST_ADDRESSES.has(address.host))
841 .map((address) =>
842 createEndpoint({
843 type: 'tcp',
844 host: address.host,
845 port: address.port,
846 }),
847 )
848 .forEach((endpoint) => this.network.addEndpoint(endpoint));
849 }
850
851 private async onBlockMessageReceived(
852 monitor: Monitor,
853 peer: ConnectedPeer<Message, PeerData>,
854 block: Block,
855 ): Promise<void> {
856 const blockIndex = this.mutableBlockIndex[peer.endpoint] as number | undefined;
857 this.mutableBlockIndex[peer.endpoint] = Math.max(block.index, blockIndex === undefined ? 0 : blockIndex);
858
859 await this.relayBlock(block, monitor);
860 }
861
862 private async persistBlock(block: Block, monitor: Monitor = this.monitor): Promise<void> {
863 if (this.blockchain.currentBlockIndex > block.index || this.tempKnownBlockHashes.has(block.hashHex)) {
864 return;
865 }
866
867 if (!this.mutableKnownBlockHashes.has(block.hash)) {
868 this.tempKnownBlockHashes.add(block.hashHex);
869
870 try {
871 const foundBlock = await this.blockchain.block.tryGet({
872 hashOrIndex: block.hash,
873 });
874
875 if (foundBlock === undefined) {
876 await monitor.withData({ [labels.NEO_BLOCK_INDEX]: block.index }).captureSpanLog(
877 async (span) => {
878 await this.blockchain.persistBlock({ monitor: span, block });
879 if (this.mutableConsensus !== undefined) {
880 this.mutableConsensus.onPersistBlock();
881 }
882
883 const peer = this.mutableBestPeer;
884 if (peer !== undefined && block.index > peer.data.startHeight) {
885 this.relay(
886 this.createMessage({
887 command: Command.inv,
888 payload: new InvPayload({
889 type: InventoryType.Block,
890 hashes: [block.hash],
891 }),
892 }),
893 );
894 }
895 },
896 {
897 name: 'neo_relay_block',
898 level: { log: 'verbose', span: 'info' },
899 trace: true,
900 },
901 );
902 }
903
904 this.mutableKnownBlockHashes.add(block.hash);
905 this.mutableKnownHeaderHashes.add(block.hash);
906 block.transactions.forEach((transaction) => {
907 // tslint:disable-next-line no-dynamic-delete
908 delete this.mutableMemPool[transaction.hashHex];
909 this.mutableKnownTransactionHashes.add(transaction.hash);
910 });
911 NEO_PROTOCOL_MEMPOOL_SIZE.set(Object.keys(this.mutableMemPool).length);
912 } finally {
913 this.tempKnownBlockHashes.delete(block.hashHex);
914 }
915 }
916 }
917
918 private async onConsensusMessageReceived(monitor: Monitor, payload: ConsensusPayload): Promise<void> {
919 const { consensus } = this;
920 if (consensus !== undefined) {
921 await this.blockchain.verifyConsensusPayload(payload, monitor);
922 consensus.onConsensusPayloadReceived(payload);
923 }
924 }
925
926 private onFilterAddMessageReceived(
927 _monitor: Monitor,
928 peer: ConnectedPeer<Message, PeerData>,
929 filterAdd: FilterAddPayload,
930 ): void {
931 if (peer.data.mutableBloomFilter !== undefined) {
932 peer.data.mutableBloomFilter.insert(filterAdd.data);
933 }
934 }
935
936 private onFilterClearMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>): void {
937 // tslint:disable-next-line no-object-mutation
938 peer.data.mutableBloomFilter = undefined;
939 }
940
941 private onFilterLoadMessageReceived(
942 _monitor: Monitor,
943 peer: ConnectedPeer<Message, PeerData>,
944 filterLoad: FilterLoadPayload,
945 ): void {
946 // tslint:disable-next-line no-object-mutation
947 peer.data.mutableBloomFilter = createPeerBloomFilter(filterLoad);
948 }
949
950 private onGetAddrMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>): void {
951 const addresses = _.take(
952 _.shuffle(
953 this.network.connectedPeers.map((connectedPeer) => connectedPeer.data.address).filter(commonUtils.notNull),
954 ),
955 GET_ADDR_PEER_COUNT,
956 );
957
958 if (addresses.length > 0) {
959 this.sendMessage(
960 peer,
961 this.createMessage({
962 command: Command.addr,
963 payload: new AddrPayload({ addresses }),
964 }),
965 );
966 }
967 }
968
969 private async onGetBlocksMessageReceived(
970 _monitor: Monitor,
971 peer: ConnectedPeer<Message, PeerData>,
972 getBlocks: GetBlocksPayload,
973 ): Promise<void> {
974 const headers = await this.getHeaders(getBlocks, this.blockchain.currentBlockIndex);
975
976 this.sendMessage(
977 peer,
978 this.createMessage({
979 command: Command.inv,
980 payload: new InvPayload({
981 type: InventoryType.Block,
982 hashes: headers.map((header) => header.hash),
983 }),
984 }),
985 );
986 }
987
988 private async onGetDataMessageReceived(
989 _monitor: Monitor,
990 peer: ConnectedPeer<Message, PeerData>,
991 getData: InvPayload,
992 ): Promise<void> {
993 switch (getData.type) {
994 case InventoryType.Transaction:
995 await Promise.all(
996 getData.hashes.map(async (hash) => {
997 let transaction = this.mutableMemPool[common.uInt256ToHex(hash)] as Transaction | undefined;
998 if (transaction === undefined) {
999 transaction = await this.blockchain.transaction.tryGet({ hash });
1000 }
1001
1002 if (transaction !== undefined) {
1003 this.sendMessage(
1004 peer,
1005 this.createMessage({
1006 command: Command.tx,
1007 payload: transaction,
1008 }),
1009 );
1010 }
1011 }),
1012 );
1013
1014 break;
1015 case InventoryType.Block: // Block
1016 await Promise.all(
1017 getData.hashes.map(async (hash) => {
1018 const block = await this.blockchain.block.tryGet({
1019 hashOrIndex: hash,
1020 });
1021
1022 if (block !== undefined) {
1023 if (peer.data.mutableBloomFilter === undefined) {
1024 this.sendMessage(
1025 peer,
1026 this.createMessage({
1027 command: Command.block,
1028 payload: block,
1029 }),
1030 );
1031 } else {
1032 this.sendMessage(
1033 peer,
1034 this.createMessage({
1035 command: Command.merkleblock,
1036 payload: this.createMerkleBlockPayload({
1037 block,
1038 flags: block.transactions.map((transaction) =>
1039 this.testFilter(peer.data.mutableBloomFilter, transaction),
1040 ),
1041 }),
1042 }),
1043 );
1044 }
1045 }
1046 }),
1047 );
1048
1049 break;
1050 case InventoryType.Consensus: // Consensus
1051 getData.hashes.forEach((hash) => {
1052 const payload = this.consensusCache.get(common.uInt256ToHex(hash));
1053 if (payload !== undefined) {
1054 this.sendMessage(
1055 peer,
1056 this.createMessage({
1057 command: Command.consensus,
1058 payload,
1059 }),
1060 );
1061 }
1062 });
1063 break;
1064 default:
1065 commonUtils.assertNever(getData.type);
1066 }
1067 }
1068
1069 private async onGetHeadersMessageReceived(
1070 _monitor: Monitor,
1071 peer: ConnectedPeer<Message, PeerData>,
1072 getBlocks: GetBlocksPayload,
1073 ): Promise<void> {
1074 const headers = await this.getHeaders(getBlocks, this.blockchain.currentHeader.index);
1075
1076 this.sendMessage(
1077 peer,
1078 this.createMessage({
1079 command: Command.headers,
1080 payload: new HeadersPayload({ headers }),
1081 }),
1082 );
1083 }
1084
1085 private async onHeadersMessageReceived(
1086 _monitor: Monitor,
1087 peer: ConnectedPeer<Message, PeerData>,
1088 headersPayload: HeadersPayload,
1089 ): Promise<void> {
1090 const headers = headersPayload.headers.filter(
1091 (header) => !this.mutableKnownHeaderHashes.has(header.hash) && !this.tempKnownHeaderHashes.has(header.hashHex),
1092 );
1093
1094 if (headers.length > 0) {
1095 headers.forEach((header) => {
1096 this.tempKnownHeaderHashes.add(header.hashHex);
1097 });
1098 try {
1099 await this.blockchain.persistHeaders(headers);
1100 headers.forEach((header) => {
1101 this.mutableKnownHeaderHashes.add(header.hash);
1102 });
1103 } finally {
1104 headers.forEach((header) => {
1105 this.tempKnownHeaderHashes.delete(header.hashHex);
1106 });
1107 }
1108 }
1109
1110 if (this.blockchain.currentHeader.index < peer.data.startHeight) {
1111 this.sendMessage(
1112 peer,
1113 this.createMessage({
1114 command: Command.getheaders,
1115 payload: new GetBlocksPayload({
1116 hashStart: [this.blockchain.currentHeader.hash],
1117 }),
1118 }),
1119 );
1120 }
1121 }
1122
1123 private onInvMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>, inv: InvPayload): void {
1124 let hashes;
1125 switch (inv.type) {
1126 case InventoryType.Transaction: // Transaction
1127 hashes = inv.hashes.filter(
1128 (hash) =>
1129 !this.mutableKnownTransactionHashes.has(hash) &&
1130 !this.tempKnownTransactionHashes.has(common.uInt256ToHex(hash)),
1131 );
1132
1133 break;
1134 case InventoryType.Block: // Block
1135 hashes = inv.hashes.filter(
1136 (hash) =>
1137 !this.mutableKnownBlockHashes.has(hash) && !this.tempKnownBlockHashes.has(common.uInt256ToHex(hash)),
1138 );
1139
1140 break;
1141 case InventoryType.Consensus: // Consensus
1142 hashes = inv.hashes;
1143 break;
1144 default:
1145 commonUtils.assertNever(inv.type);
1146 hashes = [];
1147 }
1148
1149 if (hashes.length > 0) {
1150 this.sendMessage(
1151 peer,
1152 this.createMessage({
1153 command: Command.getdata,
1154 payload: new InvPayload({ type: inv.type, hashes }),
1155 }),
1156 );
1157 }
1158 }
1159
1160 private onMemPoolMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>): void {
1161 this.sendMessage(
1162 peer,
1163 this.createMessage({
1164 command: Command.inv,
1165 payload: new InvPayload({
1166 type: InventoryType.Transaction,
1167 hashes: Object.values(this.mutableMemPool).map((transaction) => transaction.hash),
1168 }),
1169 }),
1170 );
1171 }
1172
1173 private async onTransactionReceived(_monitor: Monitor, transaction: Transaction): Promise<void> {
1174 if (this.ready()) {
1175 if (transaction.type === TransactionType.Miner) {
1176 if (this.mutableConsensus !== undefined) {
1177 this.mutableConsensus.onTransactionReceived(transaction);
1178 }
1179 } else {
1180 await this.relayTransaction(transaction);
1181 }
1182 }
1183 }
1184
1185 private onVerackMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>): void {
1186 peer.close();
1187 }
1188
1189 private onVersionMessageReceived(_monitor: Monitor, peer: ConnectedPeer<Message, PeerData>): void {
1190 peer.close();
1191 }
1192
1193 private async getHeaders(getBlocks: GetBlocksPayload, maxHeight: number): Promise<ReadonlyArray<Header>> {
1194 let hashStopIndexPromise = Promise.resolve(maxHeight);
1195 if (!getBlocks.hashStop.equals(common.ZERO_UINT256)) {
1196 hashStopIndexPromise = this.blockchain.header
1197 .tryGet({ hashOrIndex: getBlocks.hashStop })
1198 .then((hashStopHeader) =>
1199 hashStopHeader === undefined ? maxHeight : Math.min(hashStopHeader.index, maxHeight),
1200 );
1201 }
1202 const [hashStartHeaders, hashEnd] = await Promise.all([
1203 Promise.all(getBlocks.hashStart.map(async (hash) => this.blockchain.header.tryGet({ hashOrIndex: hash }))),
1204
1205 hashStopIndexPromise,
1206 ]);
1207
1208 const hashStartHeader = _.head(_.orderBy(hashStartHeaders.filter(commonUtils.notNull), [(header) => header.index]));
1209
1210 if (hashStartHeader === undefined) {
1211 return [];
1212 }
1213 const hashStart = hashStartHeader.index + 1;
1214 if (hashStart > maxHeight) {
1215 return [];
1216 }
1217
1218 return Promise.all(
1219 _.range(hashStart, Math.min(hashStart + GET_BLOCKS_COUNT, hashEnd)).map(async (index) =>
1220 this.blockchain.header.get({ hashOrIndex: index }),
1221 ),
1222 );
1223 }
1224
1225 private testFilter(bloomFilterIn: BloomFilter | undefined, transaction: Transaction): boolean {
1226 const bloomFilter = bloomFilterIn;
1227 if (bloomFilter === undefined) {
1228 return true;
1229 }
1230
1231 return (
1232 bloomFilter.contains(transaction.hash) ||
1233 transaction.outputs.some((output) => bloomFilter.contains(output.address)) ||
1234 transaction.inputs.some((input) => bloomFilter.contains(input.serializeWire())) ||
1235 transaction.scripts.some((script) => bloomFilter.contains(crypto.toScriptHash(script.verification))) ||
1236 (transaction.type === TransactionType.Register &&
1237 transaction instanceof RegisterTransaction &&
1238 bloomFilter.contains(transaction.asset.admin))
1239 );
1240 }
1241
1242 private createMerkleBlockPayload({
1243 block,
1244 flags,
1245 }: {
1246 readonly block: Block;
1247 readonly flags: ReadonlyArray<boolean>;
1248 }): MerkleBlockPayload {
1249 const tree = new MerkleTree(block.transactions.map((transaction) => transaction.hash)).trim(flags);
1250
1251 const mutableBuffer = Buffer.allocUnsafe(Math.floor((flags.length + 7) / 8));
1252 // tslint:disable-next-line no-loop-statement
1253 for (let i = 0; i < flags.length; i += 1) {
1254 if (flags[i]) {
1255 // tslint:disable-next-line no-bitwise
1256 mutableBuffer[Math.floor(i / 8)] |= 1 << i % 8;
1257 }
1258 }
1259
1260 return new MerkleBlockPayload({
1261 version: block.version,
1262 previousHash: block.previousHash,
1263 merkleRoot: block.merkleRoot,
1264 timestamp: block.timestamp,
1265 index: block.index,
1266 consensusData: block.consensusData,
1267 nextConsensus: block.nextConsensus,
1268 script: block.script,
1269 transactionCount: block.transactions.length,
1270 hashes: tree.toHashArray(),
1271 flags: mutableBuffer,
1272 });
1273 }
1274
1275 private createMessage(value: MessageValue): Message {
1276 return new Message({
1277 magic: this.blockchain.settings.messageMagic,
1278 value,
1279 });
1280 }
1281}