Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | 1x 1x 1x 1x 1x 1x 1x 1x | import { ShortChannelId } from '@node-dlc/common';
import { ILogger } from '@node-dlc/logger';
import { ChannelUpdateChannelFlags } from '../flags/ChanneUpdateChannelFlags';
import { ChannelAnnouncementMessage } from '../messages/ChannelAnnouncementMessage';
import { ChannelUpdateMessage } from '../messages/ChannelUpdateMessage';
import { IWireMessage } from '../messages/IWireMessage';
import { NodeAnnouncementMessage } from '../messages/NodeAnnouncementMessage';
import { IPeer } from '../Peer';
/**
* Interface for the sub-system for handling the gossip message relay,
* also known as rumor mongering.This system is responsible for
* periodically flushing messages to connected peers and makes a best
* effort to not send message that have already been sent to a peer.
*
* The idea of rumor mongering is that a piece of information is hot. A
* node attempts to infect connected peers with this information by
* sending it to them. Once it has been sent, we no longer need to
* infect them with information.
*/
export interface IGossipRelay {
/**
* The current state of gossip relay
*/
state: GossipRelayState;
/**
* Starts gossip relay
*/
start(): void;
/**
* Stops gossip relay
*/
stop(): void;
/**
* Adds a new peer to relay messages to
* @param peer
*/
addPeer(peer: IPeer): void;
/**
* Removes the peer from relay
* @param peer
*/
removePeer(peer: IPeer): void;
/**
* Enqueues a message to be broadcast to peers
* @param msg
*/
enqueue(msg: IWireMessage): void;
}
/**
* The state of a IGossipRelay rumor mongerer
*/
export enum GossipRelayState {
/**
* Rumor mongering is not active
*/
Inactive,
/**
* Rumor mongering is active
*/
Active,
}
/**
* This is a basic implementation of IGossipRelay that enques all
* messages and maintaining an index of each peer in the queue. When
* messages are flushed, only messages that haven't been sent to a peer
* are sent and the index position for that peer is updated. When the
* queue of messages has reached a maximum length, older messages are
* pruned and the index positions are updated.
*/
export class GossipRelay {
private _queue: IWireMessage[];
private _peers: Map<IPeer, number>;
private _timer: NodeJS.Timeout;
private _state: GossipRelayState;
constructor(
readonly logger: ILogger,
readonly relayPeriodMs = 60000,
readonly maxQueueLen = 10000,
) {
this._peers = new Map() as Map<IPeer, number>;
this._queue = [];
this._state = GossipRelayState.Inactive;
}
/**
* Gets the current state of gossip relay
*/
public get state(): GossipRelayState {
return this._state;
}
/**
* Starts relay to peers. This enables messages to be enqueued and
* periodically sent to the peers.
*/
public start(): void {
if (this._state === GossipRelayState.Active) return;
this.logger.info('starting gossip relay for all peers');
this._state = GossipRelayState.Active;
// eslint-disable-next-line @typescript-eslint/no-implied-eval
this._timer = setInterval(this._onTimer.bind(this), this.relayPeriodMs);
}
/**
* Stops relay to peers.
*/
public stop(): void {
if (this._state === GossipRelayState.Inactive) return;
this.logger.info('stopping gossip relay for all peers');
clearTimeout(this._timer);
this._state = GossipRelayState.Inactive;
}
/**
* Adds a new peer to relay messages to
* @param peer
*/
public addPeer(peer: IPeer): void {
this._peers.set(peer, this._queue.length);
}
/**
* Removes the peer from relay
* @param peer
*/
public removePeer(peer: IPeer): void {
this._peers.delete(peer);
}
/**
* Enqueues a message to be broadcast to peers.
* @param msg
*/
public enqueue(msg: IWireMessage): void {
if (this.state !== GossipRelayState.Active) return;
// For chan_ann messages there is never an update so we only
// need to check if the chan_ann exists and add it if it doesn't
if (msg instanceof ChannelAnnouncementMessage) {
const existing = this._findChanAnn(msg.shortChannelId);
// adds to the queue if there is no existing message
if (!existing) {
this.logger.trace(
'adding channel_announcement',
msg.shortChannelId.toString(),
);
this._queue.push(msg);
return;
}
}
// For chan_update messages we will add messages that don't exist
// or update an existing update message if one already exists
if (msg instanceof ChannelUpdateMessage) {
const existing = this._findChanUpd(
msg.shortChannelId,
msg.channelFlags.isSet(ChannelUpdateChannelFlags.direction),
);
// Adds to the queue if there is no existing message
if (!existing) {
this.logger.trace(
'adding channel_update',
msg.shortChannelId.toString(),
msg.channelFlags.isSet(ChannelUpdateChannelFlags.direction),
);
this._queue.push(msg);
return;
}
// Removes the existing message and replaces with a newer
// message by adding the new message to the back of the queue
if (existing && existing.timestamp < msg.timestamp) {
this.logger.trace(
'updating channel_update',
msg.shortChannelId.toString(),
msg.channelFlags.isSet(ChannelUpdateChannelFlags.direction),
);
const idx = this._queue.indexOf(existing);
this._queue.splice(idx, 1);
this._queue.push(msg);
return;
}
}
// For node_ann messages we look for the existing messages and
// abort if the new msg is older than the current node_ann we
// have in the message queue.
if (msg instanceof NodeAnnouncementMessage) {
const existing = this._findNodeAnn(msg.nodeId);
// Adds to the queue if there is no existing message
if (!existing) {
this.logger.trace(
'adding node_announcement',
msg.nodeId.toString('hex'),
);
this._queue.push(msg);
return;
}
// Removes the existing message and replaces with a newer
// message by adding the new message to the back of the queue
if (existing && existing.timestamp < msg.timestamp) {
this.logger.trace(
'updating node_announcement',
msg.nodeId.toString('hex'),
);
const idx = this._queue.indexOf(existing);
this._queue.splice(idx, 1);
this._queue.push(msg);
return;
}
}
}
/**
* Finds a channel_announcement message based on the short_channel_id
* @param scid
*/
private _findChanAnn(scid: ShortChannelId): ChannelAnnouncementMessage {
return this._queue.find(
(p) =>
p instanceof ChannelAnnouncementMessage &&
p.shortChannelId.toNumber() === scid.toNumber(),
) as ChannelAnnouncementMessage;
}
/**
* Finds a channel_update message based on the short_channel_id and
* direction. The found message can then be compared to an inbound
* message to determine if the new message is newer.
* @param scid
* @param direction
*/
private _findChanUpd(
scid: ShortChannelId,
direction: boolean,
): ChannelUpdateMessage {
return this._queue.find(
(p) =>
p instanceof ChannelUpdateMessage &&
p.shortChannelId.toNumber() === scid.toNumber() &&
p.channelFlags.isSet(ChannelUpdateChannelFlags.direction) === direction,
) as ChannelUpdateMessage;
}
/**
* Finds a node_announcement message based on the node_id. The
* returned message can be compared to newer messages using the
* timestamp.
* @param nodeId
*/
private _findNodeAnn(nodeId: Buffer): NodeAnnouncementMessage {
return this._queue.find(
(p) => p instanceof NodeAnnouncementMessage && p.nodeId.equals(nodeId),
) as NodeAnnouncementMessage;
}
/**
* Fires when the timer ticks and will flush messages to peers and
* prune the queue
*/
private _onTimer() {
this.logger.debug(`periodic flush, ${this._peers.size} peers, ${this._queue.length} hot messages`); // prettier-ignore
for (const peer of this._peers.keys()) {
this._flushToPeer(peer);
}
this._pruneQueue();
}
/**
* Flushes message to a peer based on the index of messages that
* the peer has received.
* @param peer
*/
private _flushToPeer(peer: IPeer) {
for (let i = this._peers.get(peer); i < this._queue.length; i++) {
const message = this._queue[i];
peer.sendMessage(message);
this._peers.set(peer, this._queue.length);
}
}
/**
* Prunes excess message
*/
private _pruneQueue() {
// calculate the delete count based on the current queue length
// and the max allowed queue length
const deleteCount = Math.max(0, this._queue.length - this.maxQueueLen);
// do nothing if we don't need to prune any items
if (deleteCount === 0) return;
// pruning the excess items from the start of the queue
this._queue.splice(0, deleteCount);
// adjust all of the peers by reducing their index position by
// the delete count
for (const [peer, index] of this._peers.entries()) {
this._peers.set(peer, index - deleteCount);
}
this.logger.debug(`pruned ${deleteCount} old messages`);
}
}
|