UNPKG

25.9 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3/// <reference types="@reactivex/ix-es2015-cjs" />
4const client_common_1 = require("@neo-one/client-common");
5const utils_1 = require("@neo-one/utils");
6const asynciterablex_1 = require("@reactivex/ix-es2015-cjs/asynciterable/asynciterablex");
7const scan_1 = require("@reactivex/ix-es2015-cjs/asynciterable/pipe/scan");
8const operators_1 = require("rxjs/operators");
9const common_1 = require("./common");
10const ConsensusContext_1 = require("./ConsensusContext");
11const ConsensusQueue_1 = require("./ConsensusQueue");
12const handleConsensusPayload_1 = require("./handleConsensusPayload");
13const handlePersistBlock_1 = require("./handlePersistBlock");
14const handleTransactionReceived_1 = require("./handleTransactionReceived");
15const runConsensus_1 = require("./runConsensus");
16const MS_IN_SECOND = 1000;
17class Consensus {
18 constructor({ options$, node, monitor, }) {
19 this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
20 this.options$ = options$.pipe(operators_1.map((options) => {
21 const privateKey = client_common_1.common.stringToPrivateKey(options.privateKey);
22 const publicKey = client_common_1.crypto.privateKeyToPublicKey(privateKey);
23 const feeAddress = client_common_1.crypto.publicKeyToScriptHash(publicKey);
24 return {
25 privateKey,
26 publicKey,
27 feeAddress,
28 privateNet: options.privateNet,
29 };
30 }));
31 this.node = node;
32 this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
33 this.monitor = monitor.at('node_consensus');
34 }
35 start$() {
36 return this.options$.pipe(utils_1.mergeScanLatest(async (_, options) => {
37 await this.pause();
38 this.doStart(options);
39 }), utils_1.finalize(async () => {
40 await this.pause();
41 }));
42 }
43 onPersistBlock() {
44 this.mutableQueue.write({ type: 'handlePersistBlock' });
45 }
46 onConsensusPayloadReceived(payload) {
47 this.mutableQueue.write({
48 type: 'handleConsensusPayload',
49 payload,
50 });
51 }
52 onTransactionReceived(transaction) {
53 this.mutableQueue.write({
54 type: 'handleTransactionReceived',
55 transaction,
56 });
57 }
58 async runConsensusNow() {
59 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
60 if (options.privateNet) {
61 this.mutableQueue.write({ type: 'timer' });
62 }
63 else {
64 throw new Error('Can only force consensus on a private network.');
65 }
66 }
67 nowSeconds() {
68 return this.mutableConsensusContext.nowSeconds();
69 }
70 async fastForwardOffset(seconds) {
71 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
72 if (options.privateNet) {
73 this.mutableConsensusContext.fastForwardOffset(seconds);
74 }
75 else {
76 throw new Error('Can only fast forward on a private network.');
77 }
78 }
79 async fastForwardToTime(seconds) {
80 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
81 if (options.privateNet) {
82 this.mutableConsensusContext.fastForwardToTime(seconds);
83 }
84 else {
85 throw new Error('Can only fast forward on a private network.');
86 }
87 }
88 async pause() {
89 this.clearTimer();
90 this.mutableQueue.done();
91 this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
92 if (this.mutableStartPromise !== undefined) {
93 await this.mutableStartPromise;
94 }
95 }
96 async reset() {
97 this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
98 }
99 async resume() {
100 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
101 // tslint:disable-next-line no-floating-promises
102 this.doStart(options);
103 }
104 doStart(options) {
105 let completed = false;
106 const mutableStartPromise = this.start(options).then(() => {
107 completed = true;
108 this.mutableStartPromise = undefined;
109 });
110 if (!completed) {
111 this.mutableStartPromise = mutableStartPromise;
112 }
113 }
114 async start(options) {
115 this.monitor.log({
116 name: 'neo_consensus_start',
117 message: 'Consensus started.',
118 level: 'verbose',
119 });
120 const initialResult = await common_1.initializeNewConsensus({
121 blockchain: this.node.blockchain,
122 publicKey: options.publicKey,
123 consensusContext: this.mutableConsensusContext,
124 });
125 await asynciterablex_1.AsyncIterableX.from(this.mutableQueue)
126 .pipe(scan_1.scan(async (context, event) => {
127 let result;
128 switch (event.type) {
129 case 'handlePersistBlock':
130 result = await handlePersistBlock_1.handlePersistBlock({
131 blockchain: this.node.blockchain,
132 publicKey: options.publicKey,
133 consensusContext: this.mutableConsensusContext,
134 });
135 break;
136 case 'handleConsensusPayload':
137 result = await handleConsensusPayload_1.handleConsensusPayload({
138 context,
139 node: this.node,
140 privateKey: options.privateKey,
141 payload: event.payload,
142 consensusContext: this.mutableConsensusContext,
143 });
144 break;
145 case 'handleTransactionReceived':
146 result = await handleTransactionReceived_1.handleTransactionReceived({
147 context,
148 node: this.node,
149 privateKey: options.privateKey,
150 transaction: event.transaction,
151 consensusContext: this.mutableConsensusContext,
152 });
153 break;
154 case 'timer':
155 result = await runConsensus_1.runConsensus({
156 context,
157 node: this.node,
158 options,
159 consensusContext: this.mutableConsensusContext,
160 });
161 break;
162 default:
163 utils_1.utils.assertNever(event);
164 throw new Error('For TS');
165 }
166 return this.handleResult(result);
167 }, this.handleResult(initialResult)))
168 .forEach(() => {
169 // do nothing
170 });
171 this.monitor.log({
172 name: 'neo_consensus_stop',
173 message: 'Consensus stopped.',
174 level: 'verbose',
175 });
176 }
177 handleResult(result) {
178 if (result.timerSeconds !== undefined) {
179 this.handleTimer(result.timerSeconds);
180 }
181 return result.context;
182 }
183 handleTimer(mutableTimerSeconds) {
184 this.clearTimer();
185 this.mutableTimer = setTimeout(() => this.mutableQueue.write({ type: 'timer' }), mutableTimerSeconds * MS_IN_SECOND);
186 }
187 clearTimer() {
188 if (this.mutableTimer !== undefined) {
189 clearTimeout(this.mutableTimer);
190 this.mutableTimer = undefined;
191 }
192 }
193}
194exports.Consensus = Consensus;
195
196//# sourceMappingURL=data:application/json;charset=utf8;base64,