UNPKG

144 kBJavaScriptView Raw
1import { common, crypto, utils } from '@neo-one/client-common-esnext-esm';
2import { metrics } from '@neo-one/monitor-esnext-esm';
3import { Consensus } from '@neo-one/node-consensus-esnext-esm';
4import { createEndpoint, getEndpointConfig, MerkleTree, RegisterTransaction, TransactionType, } from '@neo-one/node-core-esnext-esm';
5import { finalize, labels, neverComplete, utils as commonUtils } from '@neo-one/utils-esnext-esm';
6import { ScalingBloem } from 'bloem';
7// tslint:disable-next-line:match-default-export-name
8import BloomFilter from 'bloom-filter';
9import fetch from 'cross-fetch';
10import { Address6 } from 'ip-address';
11import _ from 'lodash';
12import LRU from 'lru-cache';
13import { combineLatest, defer, of as _of } from 'rxjs';
14import { distinctUntilChanged, map, switchMap, take } from 'rxjs/operators';
15import { Command } from './Command';
16import { AlreadyConnectedError, NegotiationError } from './errors';
17import { Message, MessageTransform } from './Message';
18import { AddrPayload, GetBlocksPayload, HeadersPayload, InventoryType, InvPayload, MerkleBlockPayload, NetworkAddress, SERVICES, VersionPayload, } from './payload';
19const messageReceivedLabelNames = [labels.COMMAND_NAME];
20const messageReceivedLabels = Object.keys(Command).map((command) => ({
21 [labels.COMMAND_NAME]: command,
22}));
23const NEO_PROTOCOL_MESSAGES_RECEIVED_TOTAL = metrics.createCounter({
24 name: 'neo_protocol_messages_received_total',
25 labelNames: messageReceivedLabelNames,
26 labels: messageReceivedLabels,
27});
28const NEO_PROTOCOL_MESSAGES_FAILURES_TOTAL = metrics.createCounter({
29 name: 'neo_protocol_messages_failures_total',
30 labelNames: messageReceivedLabelNames,
31 labels: messageReceivedLabels,
32});
33const NEO_PROTOCOL_MEMPOOL_SIZE = metrics.createGauge({
34 name: 'neo_protocol_mempool_size',
35});
36const createPeerBloomFilter = ({ filter, k, tweak, }) => new BloomFilter({
37 vData: Buffer.from(filter),
38 nHashFuncs: k,
39 nTweak: tweak,
40});
41const createScalingBloomFilter = () => new ScalingBloem(0.05, {
42 initial_capacity: 100000,
43 scaling: 4,
44});
45const compareTransactionAndFees = (val1, val2) => {
46 const a = val1.networkFee.divn(val1.transaction.size);
47 const b = val2.networkFee.divn(val2.transaction.size);
48 if (a.lt(b)) {
49 return -1;
50 }
51 if (b.lt(a)) {
52 return 1;
53 }
54 return val1.transaction.hash.compare(val2.transaction.hash);
55};
56const MEM_POOL_SIZE = 5000;
57const GET_ADDR_PEER_COUNT = 200;
58const GET_BLOCKS_COUNT = 500;
59// Assume that we get 500 back, but if not, at least request every 10 seconds
60const GET_BLOCKS_BUFFER = GET_BLOCKS_COUNT / 3;
61const GET_BLOCKS_TIME_MS = 10000;
62const GET_BLOCKS_THROTTLE_MS = 1000;
63const TRIM_MEMPOOL_THROTTLE = 5000;
64const GET_BLOCKS_CLOSE_COUNT = 2;
65const UNHEALTHY_PEER_SECONDS = 300;
66const LOCAL_HOST_ADDRESSES = new Set(['', '0.0.0.0', 'localhost', '127.0.0.1', '::', '::1']);
67export class Node {
68 constructor({ monitor, blockchain, createNetwork, environment = {}, options$, }) {
69 this.mutableUnhealthyPeerSeconds = UNHEALTHY_PEER_SECONDS;
70 this.requestBlocks = _.debounce(() => {
71 const peer = this.mutableBestPeer;
72 const previousBlock = this.blockchain.previousBlock;
73 const block = previousBlock === undefined ? this.blockchain.currentBlock : previousBlock;
74 if (peer !== undefined && block.index < peer.data.startHeight) {
75 if (this.mutableGetBlocksRequestsCount > GET_BLOCKS_CLOSE_COUNT) {
76 this.mutableBestPeer = this.findBestPeer(peer);
77 this.network.blacklistAndClose(peer);
78 this.mutableGetBlocksRequestsCount = 0;
79 }
80 else if (this.shouldRequestBlocks()) {
81 if (this.mutableGetBlocksRequestsIndex === block.index) {
82 this.mutableGetBlocksRequestsCount += 1;
83 }
84 else {
85 this.mutableGetBlocksRequestsCount = 1;
86 this.mutableGetBlocksRequestsIndex = block.index;
87 }
88 this.mutableGetBlocksRequestTime = Date.now();
89 this.sendMessage(peer, this.createMessage({
90 command: Command.getblocks,
91 payload: new GetBlocksPayload({
92 hashStart: [block.hash],
93 }),
94 }));
95 }
96 this.requestBlocks();
97 }
98 }, GET_BLOCKS_THROTTLE_MS);
99 this.onRequestEndpoints = _.throttle(() => {
100 this.relay(this.createMessage({ command: Command.getaddr }));
101 // tslint:disable-next-line no-floating-promises
102 this.fetchEndpointsFromRPC();
103 }, 5000);
104 // tslint:disable-next-line no-unnecessary-type-annotation
105 this.trimMemPool = _.throttle(async (monitor) => {
106 const memPool = Object.values(this.mutableMemPool);
107 if (memPool.length > MEM_POOL_SIZE) {
108 await monitor.captureSpan(async () => {
109 const transactionAndFees = await Promise.all(memPool.map(async (transaction) => {
110 const networkFee = await transaction.getNetworkFee({
111 getOutput: this.blockchain.output.get,
112 governingToken: this.blockchain.settings.governingToken,
113 utilityToken: this.blockchain.settings.utilityToken,
114 fees: this.blockchain.settings.fees,
115 registerValidatorFee: this.blockchain.settings.registerValidatorFee,
116 });
117 return { transaction, networkFee };
118 }));
119 const hashesToRemove = _.take(
120 // tslint:disable-next-line no-array-mutation
121 transactionAndFees.slice().sort(compareTransactionAndFees), this.blockchain.settings.memPoolSize).map((transactionAndFee) => transactionAndFee.transaction.hashHex);
122 hashesToRemove.forEach((hash) => {
123 // tslint:disable-next-line no-dynamic-delete
124 delete this.mutableMemPool[hash];
125 });
126 NEO_PROTOCOL_MEMPOOL_SIZE.set(Object.keys(this.mutableMemPool).length);
127 }, {
128 name: 'neo_protocol_trim_mempool',
129 });
130 }
131 }, TRIM_MEMPOOL_THROTTLE);
132 this.negotiate = async (peer) => {
133 this.sendMessage(peer, this.createMessage({
134 command: Command.version,
135 payload: new VersionPayload({
136 protocolVersion: 0,
137 services: SERVICES.NODE_NETWORK,
138 timestamp: Math.round(Date.now() / 1000),
139 port: this.externalPort,
140 nonce: this.nonce,
141 userAgent: this.userAgent,
142 startHeight: this.blockchain.currentBlockIndex,
143 relay: true,
144 }),
145 }));
146 const message = await peer.receiveMessage(30000);
147 let versionPayload;
148 if (message.value.command === Command.version) {
149 versionPayload = message.value.payload;
150 }
151 else {
152 throw new NegotiationError(message);
153 }
154 this.checkVersion(peer, message, versionPayload);
155 const { host } = getEndpointConfig(peer.endpoint);
156 let address;
157 if (NetworkAddress.isValid(host)) {
158 address = new NetworkAddress({
159 host,
160 port: versionPayload.port,
161 timestamp: versionPayload.timestamp,
162 services: versionPayload.services,
163 });
164 }
165 this.sendMessage(peer, this.createMessage({ command: Command.verack }));
166 const nextMessage = await peer.receiveMessage(30000);
167 if (nextMessage.value.command !== Command.verack) {
168 throw new NegotiationError(nextMessage);
169 }
170 return {
171 data: {
172 nonce: versionPayload.nonce,
173 startHeight: versionPayload.startHeight,
174 mutableBloomFilter: undefined,
175 address,
176 },
177 relay: versionPayload.relay,
178 };
179 };
180 this.checkPeerHealth = (peer, prevHealth) => {
181 const checkTimeSeconds = commonUtils.nowSeconds();
182 const blockIndex = this.mutableBlockIndex[peer.endpoint];
183 // If first check -> healthy
184 if (prevHealth === undefined) {
185 return { healthy: true, checkTimeSeconds, blockIndex };
186 }
187 // If seen new block -> healthy + update check time
188 if (prevHealth.blockIndex !== undefined && blockIndex !== undefined && prevHealth.blockIndex < blockIndex) {
189 return { healthy: true, checkTimeSeconds, blockIndex };
190 }
191 // If not seen a block or a new block BUT it has NOT been a long
192 // time -> healthy
193 if (prevHealth.blockIndex === blockIndex &&
194 commonUtils.nowSeconds() - prevHealth.checkTimeSeconds < this.mutableUnhealthyPeerSeconds) {
195 return {
196 healthy: true,
197 checkTimeSeconds: prevHealth.checkTimeSeconds,
198 blockIndex: prevHealth.blockIndex,
199 };
200 }
201 return { healthy: false, checkTimeSeconds, blockIndex };
202 };
203 this.onEvent = (event) => {
204 if (event.event === 'PEER_CONNECT_SUCCESS') {
205 const { connectedPeer } = event;
206 if (this.mutableBestPeer === undefined ||
207 // Only change best peer at most every 100 blocks
208 this.mutableBestPeer.data.startHeight + 100 < connectedPeer.data.startHeight) {
209 this.mutableBestPeer = connectedPeer;
210 this.resetRequestBlocks();
211 this.requestBlocks();
212 }
213 }
214 else if (event.event === 'PEER_CLOSED' &&
215 this.mutableBestPeer !== undefined &&
216 this.mutableBestPeer.endpoint === event.peer.endpoint) {
217 this.mutableBestPeer = this.findBestPeer();
218 this.resetRequestBlocks();
219 this.requestBlocks();
220 }
221 };
222 this.blockchain = blockchain;
223 this.monitor = monitor.at('node_protocol');
224 this.network = createNetwork({
225 negotiate: this.negotiate,
226 checkPeerHealth: this.checkPeerHealth,
227 createMessageTransform: () => new MessageTransform(this.blockchain.deserializeWireContext),
228 onMessageReceived: (peer, message) => {
229 this.onMessageReceived(peer, message);
230 },
231 onRequestEndpoints: this.onRequestEndpoints.bind(this),
232 onEvent: this.onEvent,
233 });
234 this.options$ = options$;
235 const { externalPort = 0 } = environment;
236 this.externalPort = externalPort;
237 this.nonce = Math.floor(Math.random() * utils.UINT_MAX_NUMBER);
238 this.userAgent = `NEO:neo-one-js:1.0.0-preview`;
239 this.mutableMemPool = {};
240 this.mutableKnownBlockHashes = createScalingBloomFilter();
241 this.tempKnownBlockHashes = new Set();
242 this.mutableKnownTransactionHashes = createScalingBloomFilter();
243 this.tempKnownTransactionHashes = new Set();
244 this.mutableKnownHeaderHashes = createScalingBloomFilter();
245 this.tempKnownHeaderHashes = new Set();
246 this.mutableGetBlocksRequestsCount = 1;
247 this.consensusCache = LRU(10000);
248 this.mutableBlockIndex = {};
249 }
250 get consensus() {
251 return this.mutableConsensus;
252 }
253 get connectedPeers() {
254 return this.network.connectedPeers.map((peer) => peer.endpoint);
255 }
256 get memPool() {
257 return this.mutableMemPool;
258 }
259 async reset() {
260 this.mutableMemPool = {};
261 this.mutableKnownBlockHashes = createScalingBloomFilter();
262 this.tempKnownBlockHashes.clear();
263 this.mutableKnownTransactionHashes = createScalingBloomFilter();
264 this.tempKnownTransactionHashes.clear();
265 this.mutableKnownHeaderHashes = createScalingBloomFilter();
266 this.tempKnownHeaderHashes.clear();
267 this.mutableGetBlocksRequestsCount = 1;
268 this.consensusCache.reset();
269 this.mutableBlockIndex = {};
270 }
271 // tslint:disable-next-line no-any
272 start$() {
273 const network$ = defer(async () => {
274 this.network.start();
275 this.monitor.log({
276 name: 'neo_protocol_start',
277 message: 'Protocol started.',
278 level: 'verbose',
279 });
280 }).pipe(neverComplete(), finalize(() => {
281 this.network.stop();
282 this.monitor.log({
283 name: 'neo_protocol_stop',
284 message: 'Protocol stopped.',
285 level: 'verbose',
286 });
287 }));
288 const defaultOptions = {
289 enabled: false,
290 options: { privateKey: 'unused', privateNet: false },
291 };
292 const consensus$ = this.options$.pipe(map(({ consensus = defaultOptions }) => consensus.enabled), distinctUntilChanged(), switchMap((enabled) => {
293 if (enabled) {
294 const mutableConsensus = new Consensus({
295 monitor: this.monitor,
296 options$: this.options$.pipe(map(({ consensus = defaultOptions }) => consensus.options), distinctUntilChanged()),
297 node: this,
298 });
299 this.mutableConsensus = mutableConsensus;
300 return mutableConsensus.start$();
301 }
302 return _of(undefined);
303 }));
304 const options$ = this.options$.pipe(map(({ unhealthyPeerSeconds = UNHEALTHY_PEER_SECONDS }) => {
305 this.mutableUnhealthyPeerSeconds = unhealthyPeerSeconds;
306 }));
307 return combineLatest(network$, consensus$, options$);
308 }
309 async relayTransaction(transaction, { throwVerifyError = false, forceAdd = false, } = {
310 throwVerifyError: false,
311 forceAdd: false,
312 }) {
313 const result = {};
314 if (transaction.type === TransactionType.Miner ||
315 this.mutableMemPool[transaction.hashHex] !== undefined ||
316 this.tempKnownTransactionHashes.has(transaction.hashHex)) {
317 return result;
318 }
319 if (!this.mutableKnownTransactionHashes.has(transaction.hash)) {
320 this.tempKnownTransactionHashes.add(transaction.hashHex);
321 try {
322 const memPool = Object.values(this.mutableMemPool);
323 if (memPool.length > MEM_POOL_SIZE / 2 && !forceAdd) {
324 this.mutableKnownTransactionHashes.add(transaction.hash);
325 return result;
326 }
327 // tslint:disable-next-line prefer-immediate-return
328 const finalResult = await this.monitor
329 .withData({ [labels.NEO_TRANSACTION_HASH]: transaction.hashHex })
330 .captureSpanLog(async (span) => {
331 let foundTransaction;
332 try {
333 foundTransaction = await this.blockchain.transaction.tryGet({
334 hash: transaction.hash,
335 });
336 }
337 finally {
338 span.setLabels({
339 [labels.NEO_TRANSACTION_FOUND]: foundTransaction !== undefined,
340 });
341 }
342 let verifyResult;
343 if (foundTransaction === undefined) {
344 verifyResult = await this.blockchain.verifyTransaction({
345 monitor: span,
346 transaction,
347 memPool: Object.values(this.mutableMemPool),
348 });
349 const verified = verifyResult.verifications.every(({ failureMessage }) => failureMessage === undefined);
350 if (verified) {
351 this.mutableMemPool[transaction.hashHex] = transaction;
352 NEO_PROTOCOL_MEMPOOL_SIZE.inc();
353 if (this.mutableConsensus !== undefined) {
354 this.mutableConsensus.onTransactionReceived(transaction);
355 }
356 this.relayTransactionInternal(transaction);
357 await this.trimMemPool(span);
358 }
359 }
360 this.mutableKnownTransactionHashes.add(transaction.hash);
361 return { verifyResult };
362 }, {
363 name: 'neo_relay_transaction',
364 level: { log: 'verbose', span: 'info' },
365 trace: true,
366 });
367 // tslint:disable-next-line no-var-before-return
368 return finalResult;
369 }
370 catch (error) {
371 if (error.code === undefined ||
372 typeof error.code !== 'string' ||
373 !error.code.includes('VERIFY') ||
374 throwVerifyError) {
375 throw error;
376 }
377 }
378 finally {
379 this.tempKnownTransactionHashes.delete(transaction.hashHex);
380 }
381 }
382 return result;
383 }
384 async relayBlock(block, monitor) {
385 await this.persistBlock(block, monitor);
386 }
387 relayConsensusPayload(payload) {
388 const message = this.createMessage({
389 command: Command.inv,
390 payload: new InvPayload({
391 type: InventoryType.Consensus,
392 hashes: [payload.hash],
393 }),
394 });
395 this.consensusCache.set(payload.hashHex, payload);
396 this.relay(message);
397 }
398 syncMemPool() {
399 this.relay(this.createMessage({ command: Command.mempool }));
400 }
401 relay(message) {
402 this.network.relay(message.serializeWire());
403 }
404 relayTransactionInternal(transaction) {
405 const message = this.createMessage({
406 command: Command.inv,
407 payload: new InvPayload({
408 type: InventoryType.Transaction,
409 hashes: [transaction.hash],
410 }),
411 });
412 const messagePayload = message.serializeWire();
413 this.network.connectedPeers.forEach((peer) => {
414 if (peer.relay && this.testFilter(peer.data.mutableBloomFilter, transaction)) {
415 peer.write(messagePayload);
416 }
417 });
418 }
419 sendMessage(peer, message) {
420 peer.write(message.serializeWire());
421 }
422 findBestPeer(bestPeer) {
423 let peers = this.network.connectedPeers;
424 if (bestPeer !== undefined) {
425 peers = peers.filter((peer) => peer.endpoint !== bestPeer.endpoint);
426 }
427 const result = _.maxBy(peers, (peer) => peer.data.startHeight);
428 if (result === undefined) {
429 return undefined;
430 }
431 return _.shuffle(peers.filter((peer) => peer.data.startHeight === result.data.startHeight))[0];
432 }
433 resetRequestBlocks() {
434 this.mutableGetBlocksRequestsIndex = undefined;
435 this.mutableGetBlocksRequestsCount = 0;
436 }
437 shouldRequestBlocks() {
438 const block = this.blockchain.currentBlock;
439 const getBlocksRequestTime = this.mutableGetBlocksRequestTime;
440 return (this.mutableGetBlocksRequestsIndex === undefined ||
441 block.index - this.mutableGetBlocksRequestsIndex > GET_BLOCKS_BUFFER ||
442 getBlocksRequestTime === undefined ||
443 Date.now() - getBlocksRequestTime > GET_BLOCKS_TIME_MS);
444 }
445 checkVersion(peer, message, version) {
446 if (version.nonce === this.nonce) {
447 this.network.permanentlyBlacklist(peer.endpoint);
448 throw new NegotiationError(message, 'Nonce equals my nonce.');
449 }
450 const connectedPeer = this.network.connectedPeers.find((otherPeer) => version.nonce === otherPeer.data.nonce);
451 if (connectedPeer !== undefined) {
452 throw new AlreadyConnectedError('Already connected to nonce.');
453 }
454 }
455 ready() {
456 const peer = this.mutableBestPeer;
457 const block = this.blockchain.currentBlock;
458 return peer !== undefined && block.index >= peer.data.startHeight;
459 }
460 async fetchEndpointsFromRPC() {
461 try {
462 await this.doFetchEndpointsFromRPC();
463 }
464 catch {
465 // ignore, logged deeper in the stack
466 }
467 }
468 async doFetchEndpointsFromRPC() {
469 const { rpcURLs = [] } = await this.options$.pipe(take(1)).toPromise();
470 await Promise.all(rpcURLs.map(async (rpcURL) => this.fetchEndpointsFromRPCURL(rpcURL)));
471 }
472 async fetchEndpointsFromRPCURL(rpcURL) {
473 try {
474 const response = await fetch(rpcURL, {
475 method: 'POST',
476 headers: {
477 'Content-Type': 'application/json',
478 },
479 body: JSON.stringify({
480 jsonrpc: '2.0',
481 id: 1,
482 method: 'getpeers',
483 params: [],
484 }),
485 });
486 if (!response.ok) {
487 throw new Error(`Failed to fetch peers from ${rpcURL}: ${response.status} ${response.statusText}`);
488 }
489 const result = await response.json();
490 if (typeof result === 'object' &&
491 result.error !== undefined &&
492 typeof result.error === 'object' &&
493 typeof result.error.code === 'number' &&
494 typeof result.error.message === 'string') {
495 throw new Error(result.error);
496 }
497 const connected = result.result.connected;
498 connected
499 .map((peer) => {
500 const { address, port } = peer;
501 const host = new Address6(address);
502 const canonicalForm = host.canonicalForm();
503 return { host: canonicalForm == undefined ? '' : canonicalForm, port };
504 })
505 .filter((endpoint) => !LOCAL_HOST_ADDRESSES.has(endpoint.host))
506 .map((endpoint) => createEndpoint({
507 type: 'tcp',
508 host: endpoint.host,
509 port: endpoint.port,
510 }))
511 .forEach((endpoint) => this.network.addEndpoint(endpoint));
512 }
513 catch (error) {
514 this.monitor.withData({ [this.monitor.labels.HTTP_URL]: rpcURL }).logError({
515 name: 'neo_protocol_fetch_endpoints_error',
516 message: `Failed to fetch endpoints from ${rpcURL}`,
517 error,
518 });
519 }
520 }
521 onMessageReceived(peer, message) {
522 this.monitor
523 .withLabels({ [labels.COMMAND_NAME]: message.value.command })
524 .withData({ [this.monitor.labels.PEER_ADDRESS]: peer.endpoint })
525 .captureLog(async (monitor) => {
526 switch (message.value.command) {
527 case Command.addr:
528 this.onAddrMessageReceived(monitor, message.value.payload);
529 break;
530 case Command.block:
531 await this.onBlockMessageReceived(monitor, peer, message.value.payload);
532 break;
533 case Command.consensus:
534 await this.onConsensusMessageReceived(monitor, message.value.payload);
535 break;
536 case Command.filteradd:
537 this.onFilterAddMessageReceived(monitor, peer, message.value.payload);
538 break;
539 case Command.filterclear:
540 this.onFilterClearMessageReceived(monitor, peer);
541 break;
542 case Command.filterload:
543 this.onFilterLoadMessageReceived(monitor, peer, message.value.payload);
544 break;
545 case Command.getaddr:
546 this.onGetAddrMessageReceived(monitor, peer);
547 break;
548 case Command.getblocks:
549 await this.onGetBlocksMessageReceived(monitor, peer, message.value.payload);
550 break;
551 case Command.getdata:
552 await this.onGetDataMessageReceived(monitor, peer, message.value.payload);
553 break;
554 case Command.getheaders:
555 await this.onGetHeadersMessageReceived(monitor, peer, message.value.payload);
556 break;
557 case Command.headers:
558 await this.onHeadersMessageReceived(monitor, peer, message.value.payload);
559 break;
560 case Command.inv:
561 this.onInvMessageReceived(monitor, peer, message.value.payload);
562 break;
563 case Command.mempool:
564 this.onMemPoolMessageReceived(monitor, peer);
565 break;
566 case Command.tx:
567 await this.onTransactionReceived(monitor, message.value.payload);
568 break;
569 case Command.verack:
570 this.onVerackMessageReceived(monitor, peer);
571 break;
572 case Command.version:
573 this.onVersionMessageReceived(monitor, peer);
574 break;
575 case Command.alert:
576 break;
577 case Command.merkleblock:
578 break;
579 case Command.notfound:
580 break;
581 case Command.ping:
582 break;
583 case Command.pong:
584 break;
585 case Command.reject:
586 break;
587 default:
588 commonUtils.assertNever(message.value);
589 }
590 }, {
591 name: 'neo_protocol_message_received',
592 level: 'debug',
593 message: `Received ${message.value.command} from ${peer.endpoint}`,
594 metric: NEO_PROTOCOL_MESSAGES_RECEIVED_TOTAL,
595 error: {
596 metric: NEO_PROTOCOL_MESSAGES_FAILURES_TOTAL,
597 message: `Failed to process message ${message.value.command} from ${peer.endpoint}`,
598 },
599 })
600 .catch(() => {
601 // do nothing
602 });
603 }
604 onAddrMessageReceived(_monitor, addr) {
605 addr.addresses
606 .filter((address) => !LOCAL_HOST_ADDRESSES.has(address.host))
607 .map((address) => createEndpoint({
608 type: 'tcp',
609 host: address.host,
610 port: address.port,
611 }))
612 .forEach((endpoint) => this.network.addEndpoint(endpoint));
613 }
614 async onBlockMessageReceived(monitor, peer, block) {
615 const blockIndex = this.mutableBlockIndex[peer.endpoint];
616 this.mutableBlockIndex[peer.endpoint] = Math.max(block.index, blockIndex === undefined ? 0 : blockIndex);
617 await this.relayBlock(block, monitor);
618 }
619 async persistBlock(block, monitor = this.monitor) {
620 if (this.blockchain.currentBlockIndex > block.index || this.tempKnownBlockHashes.has(block.hashHex)) {
621 return;
622 }
623 if (!this.mutableKnownBlockHashes.has(block.hash)) {
624 this.tempKnownBlockHashes.add(block.hashHex);
625 try {
626 const foundBlock = await this.blockchain.block.tryGet({
627 hashOrIndex: block.hash,
628 });
629 if (foundBlock === undefined) {
630 await monitor.withData({ [labels.NEO_BLOCK_INDEX]: block.index }).captureSpanLog(async (span) => {
631 await this.blockchain.persistBlock({ monitor: span, block });
632 if (this.mutableConsensus !== undefined) {
633 this.mutableConsensus.onPersistBlock();
634 }
635 const peer = this.mutableBestPeer;
636 if (peer !== undefined && block.index > peer.data.startHeight) {
637 this.relay(this.createMessage({
638 command: Command.inv,
639 payload: new InvPayload({
640 type: InventoryType.Block,
641 hashes: [block.hash],
642 }),
643 }));
644 }
645 }, {
646 name: 'neo_relay_block',
647 level: { log: 'verbose', span: 'info' },
648 trace: true,
649 });
650 }
651 this.mutableKnownBlockHashes.add(block.hash);
652 this.mutableKnownHeaderHashes.add(block.hash);
653 block.transactions.forEach((transaction) => {
654 // tslint:disable-next-line no-dynamic-delete
655 delete this.mutableMemPool[transaction.hashHex];
656 this.mutableKnownTransactionHashes.add(transaction.hash);
657 });
658 NEO_PROTOCOL_MEMPOOL_SIZE.set(Object.keys(this.mutableMemPool).length);
659 }
660 finally {
661 this.tempKnownBlockHashes.delete(block.hashHex);
662 }
663 }
664 }
665 async onConsensusMessageReceived(monitor, payload) {
666 const { consensus } = this;
667 if (consensus !== undefined) {
668 await this.blockchain.verifyConsensusPayload(payload, monitor);
669 consensus.onConsensusPayloadReceived(payload);
670 }
671 }
672 onFilterAddMessageReceived(_monitor, peer, filterAdd) {
673 if (peer.data.mutableBloomFilter !== undefined) {
674 peer.data.mutableBloomFilter.insert(filterAdd.data);
675 }
676 }
677 onFilterClearMessageReceived(_monitor, peer) {
678 // tslint:disable-next-line no-object-mutation
679 peer.data.mutableBloomFilter = undefined;
680 }
681 onFilterLoadMessageReceived(_monitor, peer, filterLoad) {
682 // tslint:disable-next-line no-object-mutation
683 peer.data.mutableBloomFilter = createPeerBloomFilter(filterLoad);
684 }
685 onGetAddrMessageReceived(_monitor, peer) {
686 const addresses = _.take(_.shuffle(this.network.connectedPeers.map((connectedPeer) => connectedPeer.data.address).filter(commonUtils.notNull)), GET_ADDR_PEER_COUNT);
687 if (addresses.length > 0) {
688 this.sendMessage(peer, this.createMessage({
689 command: Command.addr,
690 payload: new AddrPayload({ addresses }),
691 }));
692 }
693 }
694 async onGetBlocksMessageReceived(_monitor, peer, getBlocks) {
695 const headers = await this.getHeaders(getBlocks, this.blockchain.currentBlockIndex);
696 this.sendMessage(peer, this.createMessage({
697 command: Command.inv,
698 payload: new InvPayload({
699 type: InventoryType.Block,
700 hashes: headers.map((header) => header.hash),
701 }),
702 }));
703 }
704 async onGetDataMessageReceived(_monitor, peer, getData) {
705 switch (getData.type) {
706 case InventoryType.Transaction:
707 await Promise.all(getData.hashes.map(async (hash) => {
708 let transaction = this.mutableMemPool[common.uInt256ToHex(hash)];
709 if (transaction === undefined) {
710 transaction = await this.blockchain.transaction.tryGet({ hash });
711 }
712 if (transaction !== undefined) {
713 this.sendMessage(peer, this.createMessage({
714 command: Command.tx,
715 payload: transaction,
716 }));
717 }
718 }));
719 break;
720 case InventoryType.Block: // Block
721 await Promise.all(getData.hashes.map(async (hash) => {
722 const block = await this.blockchain.block.tryGet({
723 hashOrIndex: hash,
724 });
725 if (block !== undefined) {
726 if (peer.data.mutableBloomFilter === undefined) {
727 this.sendMessage(peer, this.createMessage({
728 command: Command.block,
729 payload: block,
730 }));
731 }
732 else {
733 this.sendMessage(peer, this.createMessage({
734 command: Command.merkleblock,
735 payload: this.createMerkleBlockPayload({
736 block,
737 flags: block.transactions.map((transaction) => this.testFilter(peer.data.mutableBloomFilter, transaction)),
738 }),
739 }));
740 }
741 }
742 }));
743 break;
744 case InventoryType.Consensus: // Consensus
745 getData.hashes.forEach((hash) => {
746 const payload = this.consensusCache.get(common.uInt256ToHex(hash));
747 if (payload !== undefined) {
748 this.sendMessage(peer, this.createMessage({
749 command: Command.consensus,
750 payload,
751 }));
752 }
753 });
754 break;
755 default:
756 commonUtils.assertNever(getData.type);
757 }
758 }
759 async onGetHeadersMessageReceived(_monitor, peer, getBlocks) {
760 const headers = await this.getHeaders(getBlocks, this.blockchain.currentHeader.index);
761 this.sendMessage(peer, this.createMessage({
762 command: Command.headers,
763 payload: new HeadersPayload({ headers }),
764 }));
765 }
766 async onHeadersMessageReceived(_monitor, peer, headersPayload) {
767 const headers = headersPayload.headers.filter((header) => !this.mutableKnownHeaderHashes.has(header.hash) && !this.tempKnownHeaderHashes.has(header.hashHex));
768 if (headers.length > 0) {
769 headers.forEach((header) => {
770 this.tempKnownHeaderHashes.add(header.hashHex);
771 });
772 try {
773 await this.blockchain.persistHeaders(headers);
774 headers.forEach((header) => {
775 this.mutableKnownHeaderHashes.add(header.hash);
776 });
777 }
778 finally {
779 headers.forEach((header) => {
780 this.tempKnownHeaderHashes.delete(header.hashHex);
781 });
782 }
783 }
784 if (this.blockchain.currentHeader.index < peer.data.startHeight) {
785 this.sendMessage(peer, this.createMessage({
786 command: Command.getheaders,
787 payload: new GetBlocksPayload({
788 hashStart: [this.blockchain.currentHeader.hash],
789 }),
790 }));
791 }
792 }
793 onInvMessageReceived(_monitor, peer, inv) {
794 let hashes;
795 switch (inv.type) {
796 case InventoryType.Transaction: // Transaction
797 hashes = inv.hashes.filter((hash) => !this.mutableKnownTransactionHashes.has(hash) &&
798 !this.tempKnownTransactionHashes.has(common.uInt256ToHex(hash)));
799 break;
800 case InventoryType.Block: // Block
801 hashes = inv.hashes.filter((hash) => !this.mutableKnownBlockHashes.has(hash) && !this.tempKnownBlockHashes.has(common.uInt256ToHex(hash)));
802 break;
803 case InventoryType.Consensus: // Consensus
804 hashes = inv.hashes;
805 break;
806 default:
807 commonUtils.assertNever(inv.type);
808 hashes = [];
809 }
810 if (hashes.length > 0) {
811 this.sendMessage(peer, this.createMessage({
812 command: Command.getdata,
813 payload: new InvPayload({ type: inv.type, hashes }),
814 }));
815 }
816 }
817 onMemPoolMessageReceived(_monitor, peer) {
818 this.sendMessage(peer, this.createMessage({
819 command: Command.inv,
820 payload: new InvPayload({
821 type: InventoryType.Transaction,
822 hashes: Object.values(this.mutableMemPool).map((transaction) => transaction.hash),
823 }),
824 }));
825 }
826 async onTransactionReceived(_monitor, transaction) {
827 if (this.ready()) {
828 if (transaction.type === TransactionType.Miner) {
829 if (this.mutableConsensus !== undefined) {
830 this.mutableConsensus.onTransactionReceived(transaction);
831 }
832 }
833 else {
834 await this.relayTransaction(transaction);
835 }
836 }
837 }
838 onVerackMessageReceived(_monitor, peer) {
839 peer.close();
840 }
841 onVersionMessageReceived(_monitor, peer) {
842 peer.close();
843 }
844 async getHeaders(getBlocks, maxHeight) {
845 let hashStopIndexPromise = Promise.resolve(maxHeight);
846 if (!getBlocks.hashStop.equals(common.ZERO_UINT256)) {
847 hashStopIndexPromise = this.blockchain.header
848 .tryGet({ hashOrIndex: getBlocks.hashStop })
849 .then((hashStopHeader) => hashStopHeader === undefined ? maxHeight : Math.min(hashStopHeader.index, maxHeight));
850 }
851 const [hashStartHeaders, hashEnd] = await Promise.all([
852 Promise.all(getBlocks.hashStart.map(async (hash) => this.blockchain.header.tryGet({ hashOrIndex: hash }))),
853 hashStopIndexPromise,
854 ]);
855 const hashStartHeader = _.head(_.orderBy(hashStartHeaders.filter(commonUtils.notNull), [(header) => header.index]));
856 if (hashStartHeader === undefined) {
857 return [];
858 }
859 const hashStart = hashStartHeader.index + 1;
860 if (hashStart > maxHeight) {
861 return [];
862 }
863 return Promise.all(_.range(hashStart, Math.min(hashStart + GET_BLOCKS_COUNT, hashEnd)).map(async (index) => this.blockchain.header.get({ hashOrIndex: index })));
864 }
865 testFilter(bloomFilterIn, transaction) {
866 const bloomFilter = bloomFilterIn;
867 if (bloomFilter === undefined) {
868 return true;
869 }
870 return (bloomFilter.contains(transaction.hash) ||
871 transaction.outputs.some((output) => bloomFilter.contains(output.address)) ||
872 transaction.inputs.some((input) => bloomFilter.contains(input.serializeWire())) ||
873 transaction.scripts.some((script) => bloomFilter.contains(crypto.toScriptHash(script.verification))) ||
874 (transaction.type === TransactionType.Register &&
875 transaction instanceof RegisterTransaction &&
876 bloomFilter.contains(transaction.asset.admin)));
877 }
878 createMerkleBlockPayload({ block, flags, }) {
879 const tree = new MerkleTree(block.transactions.map((transaction) => transaction.hash)).trim(flags);
880 const mutableBuffer = Buffer.allocUnsafe(Math.floor((flags.length + 7) / 8));
881 // tslint:disable-next-line no-loop-statement
882 for (let i = 0; i < flags.length; i += 1) {
883 if (flags[i]) {
884 // tslint:disable-next-line no-bitwise
885 mutableBuffer[Math.floor(i / 8)] |= 1 << i % 8;
886 }
887 }
888 return new MerkleBlockPayload({
889 version: block.version,
890 previousHash: block.previousHash,
891 merkleRoot: block.merkleRoot,
892 timestamp: block.timestamp,
893 index: block.index,
894 consensusData: block.consensusData,
895 nextConsensus: block.nextConsensus,
896 script: block.script,
897 transactionCount: block.transactions.length,
898 hashes: tree.toHashArray(),
899 flags: mutableBuffer,
900 });
901 }
902 createMessage(value) {
903 return new Message({
904 magic: this.blockchain.settings.messageMagic,
905 value,
906 });
907 }
908}
909
910//# sourceMappingURL=data:application/json;charset=utf8;base64,