All files / lib/gossip GossipQueriesSync.ts

34.78% Statements 16/46
18.18% Branches 2/11
16.67% Functions 1/6
34.78% Lines 16/46

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99    1x 1x   1x 1x 1x   1x 1x 1x   1x 1x 1x 1x 1x 1x 1x     1x                                                                                                                                                        
import { ILogger } from '@node-dlc/logger';
 
import { ChannelAnnouncementMessage } from '../messages/ChannelAnnouncementMessage';
import { ChannelUpdateMessage } from '../messages/ChannelUpdateMessage';
import { IWireMessage } from '../messages/IWireMessage';
import { NodeAnnouncementMessage } from '../messages/NodeAnnouncementMessage';
import { ReplyChannelRangeMessage } from '../messages/ReplyChannelRangeMessage';
import { ReplyShortChannelIdsEndMessage } from '../messages/ReplyShortChannelIdsEndMessage';
import { IMessageSender } from '../Peer';
import { ChannelRangeQuery } from './ChannelRangeQuery';
import { ChannelsQuery } from './ChannelsQuery';
import { GossipSyncWatcher } from './GossipSyncWatcher';
 
export enum GossipQueriesSyncState {
  Idle,
  AwaitingChannelRange,
  AwaitingChannels,
  AwaitingMessages,
  Complete,
  Failed,
}
 
export class GossipQueriesSync {
  private _state: GossipQueriesSyncState;
  private _error: Error;
  private _rangeQuery: ChannelRangeQuery;
  private _channelsQuery: ChannelsQuery;
  private _syncWatcher: GossipSyncWatcher;
 
  constructor(
    readonly chainHash: Buffer,
    readonly peer: IMessageSender,
    readonly logger: ILogger,
  ) {
    this._state = GossipQueriesSyncState.Idle;
    this._rangeQuery = new ChannelRangeQuery(
      this.chainHash,
      this.peer,
      this.logger,
    );
    this._channelsQuery = new ChannelsQuery(
      this.chainHash,
      this.peer,
      this.logger,
    );
    this._syncWatcher = new GossipSyncWatcher(this.logger);
  }
 
  public get state(): GossipQueriesSyncState {
    return this._state;
  }
 
  public get error(): Error {
    return this._error;
  }
 
  public async queryRange(
    firstBlock?: number,
    numBlocks?: number,
  ): Promise<void> {
    try {
      this.logger.info('synchronization starting');
      this._state = GossipQueriesSyncState.AwaitingChannelRange;
      const scids = await this._rangeQuery.queryRange(firstBlock, numBlocks);
      this._state = GossipQueriesSyncState.AwaitingChannels;
      await this._channelsQuery.query(...scids);
 
      this._state = GossipQueriesSyncState.AwaitingMessages;
      await this._syncWatcher.watch();
 
      this.logger.info('synchronization complete!');
      this._state = GossipQueriesSyncState.Complete;
    } catch (ex) {
      this.logger.error(
        'synchronization failed with error',
        (ex as Error).message,
      );
      this._state = GossipQueriesSyncState.Failed;
      this._error = ex as Error;
      throw ex;
    }
  }
 
  public handleWireMessage(msg: IWireMessage): void {
    if (msg instanceof ReplyChannelRangeMessage) {
      this._rangeQuery.handleReplyChannelRange(msg);
      return;
    } else if (msg instanceof ReplyShortChannelIdsEndMessage) {
      this._channelsQuery.handleReplyShortChannelIdsEnd(msg);
    } else if (
      msg instanceof ChannelAnnouncementMessage ||
      msg instanceof ChannelUpdateMessage ||
      msg instanceof NodeAnnouncementMessage
    ) {
      this._syncWatcher.onGossipMessage(msg);
    }
  }
}