Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import { HashByteOrder, OutPoint } from '@node-dlc/bitcoin';
import { ShortChannelId } from '@node-dlc/common';
import { ILogger } from '@node-dlc/logger';
import { EventEmitter } from 'events';
import { ChannelAnnouncementMessage } from '../messages/ChannelAnnouncementMessage';
import { ExtendedChannelAnnouncementMessage } from '../messages/ExtendedChannelAnnouncementMessage';
import { IWireMessage } from '../messages/IWireMessage';
import { MessageType } from '../MessageType';
import { Peer } from '../Peer';
import { PeerState } from '../PeerState';
import { WireError, WireErrorCode } from '../WireError';
import { GossipFilter } from './GossipFilter';
import { GossipPeer } from './GossipPeer';
import { GossipRelay, IGossipRelay } from './GossipRelay';
import { IGossipStore } from './GossipStore';
import { IGossipFilterChainClient } from './IGossipFilterChainClient';
export enum SyncState {
Unsynced,
Syncing,
Synced,
}
// tslint:disable-next-line: interface-name
export declare interface GossipManager {
on(event: 'message', fn: (msg: IWireMessage) => void): this;
on(event: 'error', fn: (err: Error) => void): this;
on(event: 'flushed', fn: () => void): this;
off(event: 'restored', fn: (block: number) => void): this;
off(event: 'message', fn: (msg: IWireMessage) => void): this;
off(event: 'error', fn: (err: Error) => void): this;
off(event: 'flushed', fn: () => void): this;
}
/**
* GossipManager provides is a facade for many parts of gossip. It
* orchestrates for validating, storing, and emitting
* routing gossip traffic obtained by peers.
*/
export class GossipManager extends EventEmitter {
public blockHeight: number;
public started: boolean;
public syncState: SyncState;
public isSynchronizing: boolean;
public gossipRelay: IGossipRelay;
public readonly peers: Set<GossipPeer>;
public readonly logger: ILogger;
constructor(
logger: ILogger,
readonly gossipStore: IGossipStore,
readonly pendingStore: IGossipStore,
readonly chainClient?: IGossipFilterChainClient,
) {
super();
this.logger = logger.sub('gspmgr');
this.peers = new Set<GossipPeer>();
this.syncState = SyncState.Unsynced;
this.gossipRelay = new GossipRelay(logger.sub('gsprel'), 60000, 2000);
}
/**
* The number of peers managed by the PeerManager
*/
public get peerCount(): number {
return this.peers.size;
}
/**
* Starts the gossip manager. This method will load information
* from the gossip store, determine when the last information
* was obtained, validate the existing messages (to see if any
* channels have closed), and finally emit all messages that
* exist in the system.
*/
public async start(): Promise<void> {
this.logger.info('starting gossip manager');
// wait for chain sync to complete
if (this.chainClient) {
this.logger.info('waiting for chain sync');
await this.chainClient.waitForSync();
this.logger.info('chain sync complete');
}
await this._restoreState();
// emit all restored messages
for await (const msg of this.allMessages()) {
this.emit('message', msg);
}
// start the gossip relay manager
this.gossipRelay.start();
// flag that the manager has now started
this.started = true;
}
/**
* Adds a new peer to the GossipManager and subscribes to events that will
* allow it to iteract with other sub-systems managed by the GossipManager.
*/
public addPeer(peer: Peer): void {
if (!this.started)
throw new WireError(WireErrorCode.gossipManagerNotStarted);
peer.on('ready', () => this._onPeerReady(peer));
if (peer.state === PeerState.Ready) {
this._onPeerReady(peer);
}
}
/**
* Removes the channel from storage by the gossip manager. This
* will likely be called by a chain-monitoring service.
*/
public async removeChannel(scid: ShortChannelId): Promise<void> {
this.logger.debug('removing channel %s', scid.toString());
await this.gossipStore.deleteChannelAnnouncement(scid);
}
/**
* Removes the channel from storage by the gossip manager. This will
* likely be called by a chain-monitoring service.
* @param outpoint
*/
public async removeChannelByOutpoint(outpoint: OutPoint): Promise<void> {
const chanAnn = await this.gossipStore.findChannelAnnouncementByOutpoint(
outpoint,
);
if (!chanAnn) return;
await this.removeChannel(chanAnn.shortChannelId);
}
/**
* Retrieves the valid chan_ann, chan_update, node_ann messages
* while making sure to not send duplicate node_ann messages.
*
* @remarks
* For now we are going to buffer messages into memory. We could
* return a stream and yield messages as they are streamed from
* the gossip_store.
*/
public async *allMessages(): AsyncGenerator<IWireMessage, void, unknown> {
this.logger.debug('fetching all messages');
// maintain a set of node ids that we have already seen so that
// we do no rebroadcast node announcements. This set stores the
// nodeid pubkey as a hex string, which through testing is the
// fastest way to perfrom set operations.
const seenNodeIds: Set<string> = new Set();
// obtain full list of channel announcements
const chanAnns = await this.gossipStore.findChannelAnnouncemnts();
for (const chanAnn of chanAnns) {
yield chanAnn;
// load and add the node1 channel_update
const update1 = await this.gossipStore.findChannelUpdate(
chanAnn.shortChannelId,
0,
);
if (update1) yield update1;
// load and add the nod2 channel_update
const update2 = await this.gossipStore.findChannelUpdate(
chanAnn.shortChannelId,
1,
);
if (update2) yield update2;
// optionally load node1 announcement
const nodeId1 = chanAnn.nodeId1.toString('hex');
if (!seenNodeIds.has(nodeId1)) {
seenNodeIds.add(nodeId1);
const nodeAnn = await this.gossipStore.findNodeAnnouncement(
chanAnn.nodeId1,
);
if (nodeAnn) yield nodeAnn;
}
// optionally load node2 announcement
const nodeId2 = chanAnn.nodeId2.toString('hex');
if (!seenNodeIds.has(nodeId2)) {
seenNodeIds.add(nodeId2);
const nodeAnn = await this.gossipStore.findNodeAnnouncement(
chanAnn.nodeId2,
);
if (nodeAnn) yield nodeAnn;
}
}
// Broadcast unattached node announcements. These may have been orphaned
// from previously closed channels, or if the node allows node_ann messages
// without channels.
const nodeAnns = await this.gossipStore.findNodeAnnouncements();
for (const nodeAnn of nodeAnns) {
if (!seenNodeIds.has(nodeAnn.nodeId.toString('hex'))) yield nodeAnn;
}
}
/**
* Handles when a peer has been added to the manager and it is finally
* ready and has negotiated the gossip technique.
* @param peer
*/
private _onPeerReady(peer: Peer) {
// Construct a gossip filter for use by the specific GossipPeer. This
// filter will be internally used by the GossipPeer to validate and
// capture gossip messages
const filter = new GossipFilter(
this.gossipStore,
this.pendingStore,
this.chainClient,
);
// Construct the gossip Peer and add it to the collection of Peers
// that are currently being managed by the GossipPeer
const gossipPeer = new GossipPeer(peer, filter, this.logger);
// Attach events from the gossipPeer
gossipPeer.on('readable', this._onPeerReadable.bind(this, gossipPeer));
gossipPeer.on('gossip_error', this._onGossipError.bind(this));
// Add peer to the list of peers
this.peers.add(gossipPeer);
// Add event handler for a beer closing
peer.on('close', this._onPeerClose.bind(this, gossipPeer));
// Add the peer to the relay manager
this.gossipRelay.addPeer(gossipPeer);
// If we have not yet performed a full synchronization then we can
// perform the full gossip state restore from this node
if (this.syncState === SyncState.Unsynced) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this._syncPeer(gossipPeer);
}
// If we've already synced, simply enable gossip receiving for the peer
else {
gossipPeer.enableGossip();
}
}
/**
* Handles when a peer closes
* @param gossipPeer
*/
private _onPeerClose(gossipPeer: GossipPeer) {
if (this.gossipRelay) {
this.gossipRelay.removePeer(gossipPeer);
}
this.peers.delete(gossipPeer);
}
/**
* Handles when a peer becomes readable
* @param peer
*/
private _onPeerReadable(peer: GossipPeer) {
// eslint-disable-next-line no-constant-condition
while (true) {
const msg = peer.read() as IWireMessage;
if (msg) this._onGossipMessage(msg);
else return;
}
}
/**
* Handles receieved gossip messages
* @param msg
*/
private _onGossipMessage(msg: IWireMessage) {
if (msg.type === MessageType.ChannelAnnouncement) {
this.blockHeight = Math.max(
this.blockHeight,
(msg as ChannelAnnouncementMessage).shortChannelId.block,
);
}
// enqueue the message into the relayer
this.gossipRelay.enqueue(msg);
// emit the message generally
this.emit('message', msg);
}
/**
* Handles Gossip Errors
*/
private _onGossipError(err: Error) {
this.emit('error', err);
}
/**
* Synchronize the peer using the peer's synchronization mechanism.
* @param peer
*/
private async _syncPeer(peer: GossipPeer) {
// Disable gossip relay
this.gossipRelay.stop();
this.logger.trace("sync status now 'syncing'");
this.syncState = SyncState.Syncing;
try {
// perform synchronization
await peer.syncRange();
// finally transition to sync complete status
this.logger.trace("sync status now 'synced'");
this.syncState = SyncState.Synced;
// enable gossip for all the peers
this.logger.trace('enabling gossip for all peers');
for (const gossipPeer of this.peers) {
gossipPeer.enableGossip();
}
} catch (ex) {
// TODO select next peer
this.syncState = SyncState.Unsynced;
}
// Enable gossip relay now that sync is complete
this.gossipRelay.start();
}
private async _restoreState() {
this.logger.info('retrieving gossip state from store');
this.blockHeight = 0;
const chanAnns = await this.gossipStore.findChannelAnnouncemnts();
// find best block height
for (const chanAnn of chanAnns) {
this.blockHeight = Math.max(
this.blockHeight,
chanAnn.shortChannelId.block,
);
}
this.logger.info("highest block %d found from %d channels", this.blockHeight, chanAnns.length); // prettier-ignore
// validate all utxos
await this._validateUtxos(chanAnns);
}
private async _validateUtxos(chanAnns: ChannelAnnouncementMessage[]) {
if (!this.chainClient) {
this.logger.info('skipping utxo validation, no chain_client configured');
return;
}
const extChanAnnCount = chanAnns.reduce(
(acc, msg) =>
acc + (msg instanceof ExtendedChannelAnnouncementMessage ? 1 : 0),
0,
);
this.logger.info('validating %d utxos', extChanAnnCount);
if (!extChanAnnCount) return;
const oct = Math.trunc(extChanAnnCount / 8);
for (let i = 0; i < chanAnns.length; i++) {
const chanAnn = chanAnns[i];
if ((i + 1) % oct === 0) {
this.logger.info(
'validating utxos %s% complete',
(((i + 1) / extChanAnnCount) * 100).toFixed(2),
);
}
if (chanAnn instanceof ExtendedChannelAnnouncementMessage) {
const utxo = await this.chainClient.getUtxo(
chanAnn.outpoint.txid.toString(HashByteOrder.RPC),
chanAnn.outpoint.outputIndex,
);
if (!utxo) {
await this.removeChannel(chanAnn.shortChannelId);
}
}
}
}
}
|