1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | /// <reference types="@reactivex/ix-es2015-cjs" />
|
4 | const client_common_1 = require("@neo-one/client-common");
|
5 | const utils_1 = require("@neo-one/utils");
|
6 | const asynciterablex_1 = require("@reactivex/ix-es2015-cjs/asynciterable/asynciterablex");
|
7 | const scan_1 = require("@reactivex/ix-es2015-cjs/asynciterable/pipe/scan");
|
8 | const operators_1 = require("rxjs/operators");
|
9 | const common_1 = require("./common");
|
10 | const ConsensusContext_1 = require("./ConsensusContext");
|
11 | const ConsensusQueue_1 = require("./ConsensusQueue");
|
12 | const handleConsensusPayload_1 = require("./handleConsensusPayload");
|
13 | const handlePersistBlock_1 = require("./handlePersistBlock");
|
14 | const handleTransactionReceived_1 = require("./handleTransactionReceived");
|
15 | const runConsensus_1 = require("./runConsensus");
|
16 | const MS_IN_SECOND = 1000;
|
17 | class Consensus {
|
18 | constructor({ options$, node, monitor, }) {
|
19 | this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
|
20 | this.options$ = options$.pipe(operators_1.map((options) => {
|
21 | const privateKey = client_common_1.common.stringToPrivateKey(options.privateKey);
|
22 | const publicKey = client_common_1.crypto.privateKeyToPublicKey(privateKey);
|
23 | const feeAddress = client_common_1.crypto.publicKeyToScriptHash(publicKey);
|
24 | return {
|
25 | privateKey,
|
26 | publicKey,
|
27 | feeAddress,
|
28 | privateNet: options.privateNet,
|
29 | };
|
30 | }));
|
31 | this.node = node;
|
32 | this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
|
33 | this.monitor = monitor.at('node_consensus');
|
34 | }
|
35 | start$() {
|
36 | return this.options$.pipe(utils_1.mergeScanLatest(async (_, options) => {
|
37 | await this.pause();
|
38 | this.doStart(options);
|
39 | }), utils_1.finalize(async () => {
|
40 | await this.pause();
|
41 | }));
|
42 | }
|
43 | onPersistBlock() {
|
44 | this.mutableQueue.write({ type: 'handlePersistBlock' });
|
45 | }
|
46 | onConsensusPayloadReceived(payload) {
|
47 | this.mutableQueue.write({
|
48 | type: 'handleConsensusPayload',
|
49 | payload,
|
50 | });
|
51 | }
|
52 | onTransactionReceived(transaction) {
|
53 | this.mutableQueue.write({
|
54 | type: 'handleTransactionReceived',
|
55 | transaction,
|
56 | });
|
57 | }
|
58 | async runConsensusNow() {
|
59 | const options = await this.options$.pipe(operators_1.take(1)).toPromise();
|
60 | if (options.privateNet) {
|
61 | this.mutableQueue.write({ type: 'timer' });
|
62 | }
|
63 | else {
|
64 | throw new Error('Can only force consensus on a private network.');
|
65 | }
|
66 | }
|
67 | nowSeconds() {
|
68 | return this.mutableConsensusContext.nowSeconds();
|
69 | }
|
70 | async fastForwardOffset(seconds) {
|
71 | const options = await this.options$.pipe(operators_1.take(1)).toPromise();
|
72 | if (options.privateNet) {
|
73 | this.mutableConsensusContext.fastForwardOffset(seconds);
|
74 | }
|
75 | else {
|
76 | throw new Error('Can only fast forward on a private network.');
|
77 | }
|
78 | }
|
79 | async fastForwardToTime(seconds) {
|
80 | const options = await this.options$.pipe(operators_1.take(1)).toPromise();
|
81 | if (options.privateNet) {
|
82 | this.mutableConsensusContext.fastForwardToTime(seconds);
|
83 | }
|
84 | else {
|
85 | throw new Error('Can only fast forward on a private network.');
|
86 | }
|
87 | }
|
88 | async pause() {
|
89 | this.clearTimer();
|
90 | this.mutableQueue.done();
|
91 | this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
|
92 | if (this.mutableStartPromise !== undefined) {
|
93 | await this.mutableStartPromise;
|
94 | }
|
95 | }
|
96 | async reset() {
|
97 | this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
|
98 | }
|
99 | async resume() {
|
100 | const options = await this.options$.pipe(operators_1.take(1)).toPromise();
|
101 | // tslint:disable-next-line no-floating-promises
|
102 | this.doStart(options);
|
103 | }
|
104 | doStart(options) {
|
105 | let completed = false;
|
106 | const mutableStartPromise = this.start(options).then(() => {
|
107 | completed = true;
|
108 | this.mutableStartPromise = undefined;
|
109 | });
|
110 | if (!completed) {
|
111 | this.mutableStartPromise = mutableStartPromise;
|
112 | }
|
113 | }
|
114 | async start(options) {
|
115 | this.monitor.log({
|
116 | name: 'neo_consensus_start',
|
117 | message: 'Consensus started.',
|
118 | level: 'verbose',
|
119 | });
|
120 | const initialResult = await common_1.initializeNewConsensus({
|
121 | blockchain: this.node.blockchain,
|
122 | publicKey: options.publicKey,
|
123 | consensusContext: this.mutableConsensusContext,
|
124 | });
|
125 | await asynciterablex_1.AsyncIterableX.from(this.mutableQueue)
|
126 | .pipe(scan_1.scan(async (context, event) => {
|
127 | let result;
|
128 | switch (event.type) {
|
129 | case 'handlePersistBlock':
|
130 | result = await handlePersistBlock_1.handlePersistBlock({
|
131 | blockchain: this.node.blockchain,
|
132 | publicKey: options.publicKey,
|
133 | consensusContext: this.mutableConsensusContext,
|
134 | });
|
135 | break;
|
136 | case 'handleConsensusPayload':
|
137 | result = await handleConsensusPayload_1.handleConsensusPayload({
|
138 | context,
|
139 | node: this.node,
|
140 | privateKey: options.privateKey,
|
141 | payload: event.payload,
|
142 | consensusContext: this.mutableConsensusContext,
|
143 | });
|
144 | break;
|
145 | case 'handleTransactionReceived':
|
146 | result = await handleTransactionReceived_1.handleTransactionReceived({
|
147 | context,
|
148 | node: this.node,
|
149 | privateKey: options.privateKey,
|
150 | transaction: event.transaction,
|
151 | consensusContext: this.mutableConsensusContext,
|
152 | });
|
153 | break;
|
154 | case 'timer':
|
155 | result = await runConsensus_1.runConsensus({
|
156 | context,
|
157 | node: this.node,
|
158 | options,
|
159 | consensusContext: this.mutableConsensusContext,
|
160 | });
|
161 | break;
|
162 | default:
|
163 | utils_1.utils.assertNever(event);
|
164 | throw new Error('For TS');
|
165 | }
|
166 | return this.handleResult(result);
|
167 | }, this.handleResult(initialResult)))
|
168 | .forEach(() => {
|
169 | // do nothing
|
170 | });
|
171 | this.monitor.log({
|
172 | name: 'neo_consensus_stop',
|
173 | message: 'Consensus stopped.',
|
174 | level: 'verbose',
|
175 | });
|
176 | }
|
177 | handleResult(result) {
|
178 | if (result.timerSeconds !== undefined) {
|
179 | this.handleTimer(result.timerSeconds);
|
180 | }
|
181 | return result.context;
|
182 | }
|
183 | handleTimer(mutableTimerSeconds) {
|
184 | this.clearTimer();
|
185 | this.mutableTimer = setTimeout(() => this.mutableQueue.write({ type: 'timer' }), mutableTimerSeconds * MS_IN_SECOND);
|
186 | }
|
187 | clearTimer() {
|
188 | if (this.mutableTimer !== undefined) {
|
189 | clearTimeout(this.mutableTimer);
|
190 | this.mutableTimer = undefined;
|
191 | }
|
192 | }
|
193 | }
|
194 | exports.Consensus = Consensus;
|
195 |
|
196 | //# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["Consensus.ts"],"names":[],"mappings":";;AAAA,kDAAkD;AAClD,0DAAsF;AAGtF,0CAAiF;AACjF,0FAAuF;AACvF,2EAAwE;AAExE,8CAA2C;AAC3C,qCAAkD;AAClD,yDAAsD;AACtD,qDAAkD;AAElD,qEAAkE;AAClE,6DAA0D;AAC1D,2EAAwE;AACxE,iDAA8C;AAc9C,MAAM,YAAY,GAAG,IAAI,CAAC;AAE1B,MAAa,SAAS;IASpB,YAAmB,EACjB,QAAQ,EACR,IAAI,EACJ,OAAO,GAKR;QACC,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QACzC,IAAI,CAAC,QAAQ,GAAG,QAAQ,CAAC,IAAI,CAC3B,eAAG,CAAC,CAAC,OAAO,EAAE,EAAE;YACd,MAAM,UAAU,GAAG,sBAAM,CAAC,kBAAkB,CAAC,OAAO,CAAC,UAAU,CAAC,CAAC;YACjE,MAAM,SAAS,GAAG,sBAAM,CAAC,qBAAqB,CAAC,UAAU,CAAC,CAAC;YAC3D,MAAM,UAAU,GAAG,sBAAM,CAAC,qBAAqB,CAAC,SAAS,CAAC,CAAC;YAE3D,OAAO;gBACL,UAAU;gBACV,SAAS;gBACT,UAAU;gBACV,UAAU,EAAE,OAAO,CAAC,UAAU;aAC/B,CAAC;QACJ,CAAC,CAAC,CACH,CAAC;QAEF,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;QACtD,IAAI,CAAC,OAAO,GAAG,OAAO,CAAC,EAAE,CAAC,gBAAgB,CAAC,CAAC;IAC9C,CAAC;IAEM,MAAM;QACX,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,CACvB,uBAAe,CAAwB,KAAK,EAAE,CAAC,EAAE,OAAO,EAAE,EAAE;YAC1D,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;YACnB,IAAI,CAAC,OAAO,CAAC,OAAO,CAAC,CAAC;QACxB,CAAC,CAAC,EACF,gBAAQ,CAAC,KAAK,IAAI,EAAE;YAClB,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;QACrB,CAAC,CAAC,CACH,CAAC;IACJ,CAAC;IAEM,cAAc;QACnB,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,oBAAoB,EAAE,CAAC,CAAC;IAC1D,CAAC;IAEM,0BAA0B,CAAC,OAAyB;QACzD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,wBAAwB;YAC9B,OAAO;SACR,CAAC,CAAC;IACL,CAAC;IAEM,qBAAqB,CAAC,WAAwB;QACnD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,2BAA2B;YACjC,WAAW;SACZ,CAAC,CAAC;IACL,CAAC;IAEM,KAAK,CAAC,eAAe;QAC1B,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,CAAC,CAAC;SAC5C;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,gDAAgD,CAAC,CAAC;SACnE;IACH,CAAC;IAEM,UAAU;QACf,OAAO,IAAI,CAAC,uBAAuB,CAAC,UAAU,EAAE,CAAC;IACnD,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,CAAC,IAAI,EAAE,CAAC;QACzB,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QACzC,IAAI,IAAI,CAAC,mBAAmB,KAAK,SAAS,EAAE;YAC1C,MAAM,IAAI,CAAC,mBAAmB,CAAC;SAChC;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;IACxD,CAAC;IAEM,KAAK,CAAC,MAAM;QACjB,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,gDAAgD;QAChD,IAAI,CAAC,OAAO,CAAC,OAAO,CAAC,CAAC;IACxB,CAAC;IAEO,OAAO,CAAC,OAAwB;QACtC,IAAI,SAAS,GAAG,KAAK,CAAC;QACtB,MAAM,mBAAmB,GAAG,IAAI,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,IAAI,CAAC,GAAG,EAAE;YACxD,SAAS,GAAG,IAAI,CAAC;YACjB,IAAI,CAAC,mBAAmB,GAAG,SAAS,CAAC;QACvC,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE;YACd,IAAI,CAAC,mBAAmB,GAAG,mBAAmB,CAAC;SAChD;IACH,CAAC;IAEO,KAAK,CAAC,KAAK,CAAC,OAAwB;QAC1C,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC;YACf,IAAI,EAAE,qBAAqB;YAC3B,OAAO,EAAE,oBAAoB;YAC7B,KAAK,EAAE,SAAS;SACjB,CAAC,CAAC;QAEH,MAAM,aAAa,GAAG,MAAM,+BAAsB,CAAC;YACjD,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;YAChC,SAAS,EAAE,OAAO,CAAC,SAAS;YAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;SAC/C,CAAC,CAAC;QAEH,MAAM,+BAAc,CAAC,IAAI,CAAC,IAAI,CAAC,YAAY,CAAC;aACzC,IAAI,CACH,WAAI,CAAC,KAAK,EAAE,OAAgB,EAAE,KAAY,EAAE,EAAE;YAC5C,IAAI,MAAM,CAAC;YACX,QAAQ,KAAK,CAAC,IAAI,EAAE;gBAClB,KAAK,oBAAoB;oBACvB,MAAM,GAAG,MAAM,uCAAkB,CAAC;wBAChC,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;wBAChC,SAAS,EAAE,OAAO,CAAC,SAAS;wBAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,wBAAwB;oBAC3B,MAAM,GAAG,MAAM,+CAAsB,CAAC;wBACpC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,OAAO,EAAE,KAAK,CAAC,OAAO;wBACtB,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,2BAA2B;oBAC9B,MAAM,GAAG,MAAM,qDAAyB,CAAC;wBACvC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,WAAW,EAAE,KAAK,CAAC,WAAW;wBAC9B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,OAAO;oBACV,MAAM,GAAG,MAAM,2BAAY,CAAC;wBAC1B,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,OAAO;wBACP,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR;oBACE,aAAW,CAAC,WAAW,CAAC,KAAK,CAAC,CAAC;oBAC/B,MAAM,IAAI,KAAK,CAAC,QAAQ,CAAC,CAAC;aAC7B;YAED,OAAO,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,CAAC;QACnC,CAAC,EAAE,IAAI,CAAC,YAAY,CAAC,aAAa,CAAC,CAAC,CACrC;aACA,OAAO,CAAC,GAAG,EAAE;YACZ,aAAa;QACf,CAAC,CAAC,CAAC;QAEL,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC;YACf,IAAI,EAAE,oBAAoB;YAC1B,OAAO,EAAE,oBAAoB;YAC7B,KAAK,EAAE,SAAS;SACjB,CAAC,CAAC;IACL,CAAC;IAEO,YAAY,CAAC,MAAuB;QAC1C,IAAI,MAAM,CAAC,YAAY,KAAK,SAAS,EAAE;YACrC,IAAI,CAAC,WAAW,CAAC,MAAM,CAAC,YAAY,CAAC,CAAC;SACvC;QAED,OAAO,MAAM,CAAC,OAAO,CAAC;IACxB,CAAC;IAEO,WAAW,CAAC,mBAA2B;QAC7C,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,GAAG,UAAU,CAC5B,GAAG,EAAE,CAAC,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,CAAC,EAChD,mBAAmB,GAAG,YAAY,CAE5B,CAAC;IACX,CAAC;IAEO,UAAU;QAChB,IAAI,IAAI,CAAC,YAAY,KAAK,SAAS,EAAE;YACnC,YAAY,CAAC,IAAI,CAAC,YAAY,CAAC,CAAC;YAChC,IAAI,CAAC,YAAY,GAAG,SAAS,CAAC;SAC/B;IACH,CAAC;CACF;AAnOD,8BAmOC","file":"neo-one-node-consensus/src/Consensus.js","sourcesContent":["/// <reference types=\"@reactivex/ix-es2015-cjs\" />\nimport { common, crypto, ECPoint, PrivateKey, UInt160 } from '@neo-one/client-common';\nimport { Monitor } from '@neo-one/monitor';\nimport { ConsensusPayload, Node, Transaction } from '@neo-one/node-core';\nimport { finalize, mergeScanLatest, utils as commonUtils } from '@neo-one/utils';\nimport { AsyncIterableX } from '@reactivex/ix-es2015-cjs/asynciterable/asynciterablex';\nimport { scan } from '@reactivex/ix-es2015-cjs/asynciterable/pipe/scan';\nimport { Observable } from 'rxjs';\nimport { map, take } from 'rxjs/operators';\nimport { initializeNewConsensus } from './common';\nimport { ConsensusContext } from './ConsensusContext';\nimport { ConsensusQueue } from './ConsensusQueue';\nimport { Context } from './context';\nimport { handleConsensusPayload } from './handleConsensusPayload';\nimport { handlePersistBlock } from './handlePersistBlock';\nimport { handleTransactionReceived } from './handleTransactionReceived';\nimport { runConsensus } from './runConsensus';\nimport { Event, Result } from './types';\n\nexport interface Options {\n  readonly privateKey: string;\n  readonly privateNet: boolean;\n}\nexport interface InternalOptions {\n  readonly privateKey: PrivateKey;\n  readonly publicKey: ECPoint;\n  readonly feeAddress: UInt160;\n  readonly privateNet: boolean;\n}\n\nconst MS_IN_SECOND = 1000;\n\nexport class Consensus {\n  private mutableQueue: ConsensusQueue;\n  private mutableTimer: number | undefined;\n  private readonly options$: Observable<InternalOptions>;\n  private readonly node: Node;\n  private mutableConsensusContext: ConsensusContext;\n  private readonly monitor: Monitor;\n  private mutableStartPromise: Promise<void> | undefined;\n\n  public constructor({\n    options$,\n    node,\n    monitor,\n  }: {\n    readonly options$: Observable<Options>;\n    readonly node: Node;\n    readonly monitor: Monitor;\n  }) {\n    this.mutableQueue = new ConsensusQueue();\n    this.options$ = options$.pipe(\n      map((options) => {\n        const privateKey = common.stringToPrivateKey(options.privateKey);\n        const publicKey = crypto.privateKeyToPublicKey(privateKey);\n        const feeAddress = crypto.publicKeyToScriptHash(publicKey);\n\n        return {\n          privateKey,\n          publicKey,\n          feeAddress,\n          privateNet: options.privateNet,\n        };\n      }),\n    );\n\n    this.node = node;\n    this.mutableConsensusContext = new ConsensusContext();\n    this.monitor = monitor.at('node_consensus');\n  }\n\n  public start$(): Observable<void> {\n    return this.options$.pipe(\n      mergeScanLatest<InternalOptions, void>(async (_, options) => {\n        await this.pause();\n        this.doStart(options);\n      }),\n      finalize(async () => {\n        await this.pause();\n      }),\n    );\n  }\n\n  public onPersistBlock(): void {\n    this.mutableQueue.write({ type: 'handlePersistBlock' });\n  }\n\n  public onConsensusPayloadReceived(payload: ConsensusPayload): void {\n    this.mutableQueue.write({\n      type: 'handleConsensusPayload',\n      payload,\n    });\n  }\n\n  public onTransactionReceived(transaction: Transaction): void {\n    this.mutableQueue.write({\n      type: 'handleTransactionReceived',\n      transaction,\n    });\n  }\n\n  public async runConsensusNow(): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableQueue.write({ type: 'timer' });\n    } else {\n      throw new Error('Can only force consensus on a private network.');\n    }\n  }\n\n  public nowSeconds(): number {\n    return this.mutableConsensusContext.nowSeconds();\n  }\n\n  public async fastForwardOffset(seconds: number): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableConsensusContext.fastForwardOffset(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async fastForwardToTime(seconds: number): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableConsensusContext.fastForwardToTime(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async pause(): Promise<void> {\n    this.clearTimer();\n    this.mutableQueue.done();\n    this.mutableQueue = new ConsensusQueue();\n    if (this.mutableStartPromise !== undefined) {\n      await this.mutableStartPromise;\n    }\n  }\n\n  public async reset(): Promise<void> {\n    this.mutableConsensusContext = new ConsensusContext();\n  }\n\n  public async resume(): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    // tslint:disable-next-line no-floating-promises\n    this.doStart(options);\n  }\n\n  private doStart(options: InternalOptions): void {\n    let completed = false;\n    const mutableStartPromise = this.start(options).then(() => {\n      completed = true;\n      this.mutableStartPromise = undefined;\n    });\n    if (!completed) {\n      this.mutableStartPromise = mutableStartPromise;\n    }\n  }\n\n  private async start(options: InternalOptions): Promise<void> {\n    this.monitor.log({\n      name: 'neo_consensus_start',\n      message: 'Consensus started.',\n      level: 'verbose',\n    });\n\n    const initialResult = await initializeNewConsensus({\n      blockchain: this.node.blockchain,\n      publicKey: options.publicKey,\n      consensusContext: this.mutableConsensusContext,\n    });\n\n    await AsyncIterableX.from(this.mutableQueue)\n      .pipe(\n        scan(async (context: Context, event: Event) => {\n          let result;\n          switch (event.type) {\n            case 'handlePersistBlock':\n              result = await handlePersistBlock({\n                blockchain: this.node.blockchain,\n                publicKey: options.publicKey,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleConsensusPayload':\n              result = await handleConsensusPayload({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                payload: event.payload,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleTransactionReceived':\n              result = await handleTransactionReceived({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                transaction: event.transaction,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'timer':\n              result = await runConsensus({\n                context,\n                node: this.node,\n                options,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            default:\n              commonUtils.assertNever(event);\n              throw new Error('For TS');\n          }\n\n          return this.handleResult(result);\n        }, this.handleResult(initialResult)),\n      )\n      .forEach(() => {\n        // do nothing\n      });\n\n    this.monitor.log({\n      name: 'neo_consensus_stop',\n      message: 'Consensus stopped.',\n      level: 'verbose',\n    });\n  }\n\n  private handleResult(result: Result<Context>): Context {\n    if (result.timerSeconds !== undefined) {\n      this.handleTimer(result.timerSeconds);\n    }\n\n    return result.context;\n  }\n\n  private handleTimer(mutableTimerSeconds: number): void {\n    this.clearTimer();\n    this.mutableTimer = setTimeout(\n      () => this.mutableQueue.write({ type: 'timer' }),\n      mutableTimerSeconds * MS_IN_SECOND,\n      // tslint:disable-next-line no-any\n    ) as any;\n  }\n\n  private clearTimer(): void {\n    if (this.mutableTimer !== undefined) {\n      clearTimeout(this.mutableTimer);\n      this.mutableTimer = undefined;\n    }\n  }\n}\n"]}
|