UNPKG

25.9 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3/// <reference types="@reactivex/ix-es2015-cjs" />
4const client_common_1 = require("@neo-one/client-common");
5const utils_1 = require("@neo-one/utils");
6const asynciterablex_1 = require("@reactivex/ix-es2015-cjs/asynciterable/asynciterablex");
7const scan_1 = require("@reactivex/ix-es2015-cjs/asynciterable/pipe/scan");
8const operators_1 = require("rxjs/operators");
9const common_1 = require("./common");
10const ConsensusContext_1 = require("./ConsensusContext");
11const ConsensusQueue_1 = require("./ConsensusQueue");
12const handleConsensusPayload_1 = require("./handleConsensusPayload");
13const handlePersistBlock_1 = require("./handlePersistBlock");
14const handleTransactionReceived_1 = require("./handleTransactionReceived");
15const runConsensus_1 = require("./runConsensus");
16const MS_IN_SECOND = 1000;
17class Consensus {
18 constructor({ options$, node, monitor, }) {
19 this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
20 this.options$ = options$.pipe(operators_1.map((options) => {
21 const privateKey = client_common_1.common.stringToPrivateKey(options.privateKey);
22 const publicKey = client_common_1.crypto.privateKeyToPublicKey(privateKey);
23 const feeAddress = client_common_1.crypto.publicKeyToScriptHash(publicKey);
24 return {
25 privateKey,
26 publicKey,
27 feeAddress,
28 privateNet: options.privateNet,
29 };
30 }));
31 this.node = node;
32 this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
33 this.monitor = monitor.at('node_consensus');
34 }
35 start$() {
36 return this.options$.pipe(utils_1.mergeScanLatest(async (_, options) => {
37 await this.pause();
38 this.doStart(options);
39 }), utils_1.finalize(async () => {
40 await this.pause();
41 }));
42 }
43 onPersistBlock() {
44 this.mutableQueue.write({ type: 'handlePersistBlock' });
45 }
46 onConsensusPayloadReceived(payload) {
47 this.mutableQueue.write({
48 type: 'handleConsensusPayload',
49 payload,
50 });
51 }
52 onTransactionReceived(transaction) {
53 this.mutableQueue.write({
54 type: 'handleTransactionReceived',
55 transaction,
56 });
57 }
58 async runConsensusNow() {
59 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
60 if (options.privateNet) {
61 this.mutableQueue.write({ type: 'timer' });
62 }
63 else {
64 throw new Error('Can only force consensus on a private network.');
65 }
66 }
67 nowSeconds() {
68 return this.mutableConsensusContext.nowSeconds();
69 }
70 async fastForwardOffset(seconds) {
71 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
72 if (options.privateNet) {
73 this.mutableConsensusContext.fastForwardOffset(seconds);
74 }
75 else {
76 throw new Error('Can only fast forward on a private network.');
77 }
78 }
79 async fastForwardToTime(seconds) {
80 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
81 if (options.privateNet) {
82 this.mutableConsensusContext.fastForwardToTime(seconds);
83 }
84 else {
85 throw new Error('Can only fast forward on a private network.');
86 }
87 }
88 async pause() {
89 this.clearTimer();
90 this.mutableQueue.done();
91 this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
92 if (this.mutableStartPromise !== undefined) {
93 await this.mutableStartPromise;
94 }
95 }
96 async reset() {
97 this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
98 }
99 async resume() {
100 const options = await this.options$.pipe(operators_1.take(1)).toPromise();
101 // tslint:disable-next-line no-floating-promises
102 this.doStart(options);
103 }
104 doStart(options) {
105 let completed = false;
106 const mutableStartPromise = this.start(options).then(() => {
107 completed = true;
108 this.mutableStartPromise = undefined;
109 });
110 if (!completed) {
111 this.mutableStartPromise = mutableStartPromise;
112 }
113 }
114 async start(options) {
115 this.monitor.log({
116 name: 'neo_consensus_start',
117 message: 'Consensus started.',
118 level: 'verbose',
119 });
120 const initialResult = await common_1.initializeNewConsensus({
121 blockchain: this.node.blockchain,
122 publicKey: options.publicKey,
123 consensusContext: this.mutableConsensusContext,
124 });
125 await asynciterablex_1.AsyncIterableX.from(this.mutableQueue)
126 .pipe(scan_1.scan(async (context, event) => {
127 let result;
128 switch (event.type) {
129 case 'handlePersistBlock':
130 result = await handlePersistBlock_1.handlePersistBlock({
131 blockchain: this.node.blockchain,
132 publicKey: options.publicKey,
133 consensusContext: this.mutableConsensusContext,
134 });
135 break;
136 case 'handleConsensusPayload':
137 result = await handleConsensusPayload_1.handleConsensusPayload({
138 context,
139 node: this.node,
140 privateKey: options.privateKey,
141 payload: event.payload,
142 consensusContext: this.mutableConsensusContext,
143 });
144 break;
145 case 'handleTransactionReceived':
146 result = await handleTransactionReceived_1.handleTransactionReceived({
147 context,
148 node: this.node,
149 privateKey: options.privateKey,
150 transaction: event.transaction,
151 consensusContext: this.mutableConsensusContext,
152 });
153 break;
154 case 'timer':
155 result = await runConsensus_1.runConsensus({
156 context,
157 node: this.node,
158 options,
159 consensusContext: this.mutableConsensusContext,
160 });
161 break;
162 default:
163 utils_1.utils.assertNever(event);
164 throw new Error('For TS');
165 }
166 return this.handleResult(result);
167 }, this.handleResult(initialResult)))
168 .forEach(() => {
169 // do nothing
170 });
171 this.monitor.log({
172 name: 'neo_consensus_stop',
173 message: 'Consensus stopped.',
174 level: 'verbose',
175 });
176 }
177 handleResult(result) {
178 if (result.timerSeconds !== undefined) {
179 this.handleTimer(result.timerSeconds);
180 }
181 return result.context;
182 }
183 handleTimer(mutableTimerSeconds) {
184 this.clearTimer();
185 this.mutableTimer = setTimeout(() => this.mutableQueue.write({ type: 'timer' }), mutableTimerSeconds * MS_IN_SECOND);
186 }
187 clearTimer() {
188 if (this.mutableTimer !== undefined) {
189 clearTimeout(this.mutableTimer);
190 this.mutableTimer = undefined;
191 }
192 }
193}
194exports.Consensus = Consensus;
195
196//# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["Consensus.ts"],"names":[],"mappings":";;AAAA,kDAAkD;AAClD,0DAAsF;AAGtF,0CAAiF;AACjF,0FAAuF;AACvF,2EAAwE;AAExE,8CAA2C;AAC3C,qCAAkD;AAClD,yDAAsD;AACtD,qDAAkD;AAElD,qEAAkE;AAClE,6DAA0D;AAC1D,2EAAwE;AACxE,iDAA8C;AAc9C,MAAM,YAAY,GAAG,IAAI,CAAC;AAE1B,MAAa,SAAS;IASpB,YAAmB,EACjB,QAAQ,EACR,IAAI,EACJ,OAAO,GAKR;QACC,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QACzC,IAAI,CAAC,QAAQ,GAAG,QAAQ,CAAC,IAAI,CAC3B,eAAG,CAAC,CAAC,OAAO,EAAE,EAAE;YACd,MAAM,UAAU,GAAG,sBAAM,CAAC,kBAAkB,CAAC,OAAO,CAAC,UAAU,CAAC,CAAC;YACjE,MAAM,SAAS,GAAG,sBAAM,CAAC,qBAAqB,CAAC,UAAU,CAAC,CAAC;YAC3D,MAAM,UAAU,GAAG,sBAAM,CAAC,qBAAqB,CAAC,SAAS,CAAC,CAAC;YAE3D,OAAO;gBACL,UAAU;gBACV,SAAS;gBACT,UAAU;gBACV,UAAU,EAAE,OAAO,CAAC,UAAU;aAC/B,CAAC;QACJ,CAAC,CAAC,CACH,CAAC;QAEF,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;QACtD,IAAI,CAAC,OAAO,GAAG,OAAO,CAAC,EAAE,CAAC,gBAAgB,CAAC,CAAC;IAC9C,CAAC;IAEM,MAAM;QACX,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,CACvB,uBAAe,CAAwB,KAAK,EAAE,CAAC,EAAE,OAAO,EAAE,EAAE;YAC1D,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;YACnB,IAAI,CAAC,OAAO,CAAC,OAAO,CAAC,CAAC;QACxB,CAAC,CAAC,EACF,gBAAQ,CAAC,KAAK,IAAI,EAAE;YAClB,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;QACrB,CAAC,CAAC,CACH,CAAC;IACJ,CAAC;IAEM,cAAc;QACnB,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,oBAAoB,EAAE,CAAC,CAAC;IAC1D,CAAC;IAEM,0BAA0B,CAAC,OAAyB;QACzD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,wBAAwB;YAC9B,OAAO;SACR,CAAC,CAAC;IACL,CAAC;IAEM,qBAAqB,CAAC,WAAwB;QACnD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,2BAA2B;YACjC,WAAW;SACZ,CAAC,CAAC;IACL,CAAC;IAEM,KAAK,CAAC,eAAe;QAC1B,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,CAAC,CAAC;SAC5C;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,gDAAgD,CAAC,CAAC;SACnE;IACH,CAAC;IAEM,UAAU;QACf,OAAO,IAAI,CAAC,uBAAuB,CAAC,UAAU,EAAE,CAAC;IACnD,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,IAAI,OAAO,CAAC,UAAU,EAAE;YACtB,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,CAAC,IAAI,EAAE,CAAC;QACzB,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QACzC,IAAI,IAAI,CAAC,mBAAmB,KAAK,SAAS,EAAE;YAC1C,MAAM,IAAI,CAAC,mBAAmB,CAAC;SAChC;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;IACxD,CAAC;IAEM,KAAK,CAAC,MAAM;QACjB,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,gBAAI,CAAC,CAAC,CAAC,CAAC,CAAC,SAAS,EAAE,CAAC;QAC9D,gDAAgD;QAChD,IAAI,CAAC,OAAO,CAAC,OAAO,CAAC,CAAC;IACxB,CAAC;IAEO,OAAO,CAAC,OAAwB;QACtC,IAAI,SAAS,GAAG,KAAK,CAAC;QACtB,MAAM,mBAAmB,GAAG,IAAI,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,IAAI,CAAC,GAAG,EAAE;YACxD,SAAS,GAAG,IAAI,CAAC;YACjB,IAAI,CAAC,mBAAmB,GAAG,SAAS,CAAC;QACvC,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE;YACd,IAAI,CAAC,mBAAmB,GAAG,mBAAmB,CAAC;SAChD;IACH,CAAC;IAEO,KAAK,CAAC,KAAK,CAAC,OAAwB;QAC1C,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC;YACf,IAAI,EAAE,qBAAqB;YAC3B,OAAO,EAAE,oBAAoB;YAC7B,KAAK,EAAE,SAAS;SACjB,CAAC,CAAC;QAEH,MAAM,aAAa,GAAG,MAAM,+BAAsB,CAAC;YACjD,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;YAChC,SAAS,EAAE,OAAO,CAAC,SAAS;YAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;SAC/C,CAAC,CAAC;QAEH,MAAM,+BAAc,CAAC,IAAI,CAAC,IAAI,CAAC,YAAY,CAAC;aACzC,IAAI,CACH,WAAI,CAAC,KAAK,EAAE,OAAgB,EAAE,KAAY,EAAE,EAAE;YAC5C,IAAI,MAAM,CAAC;YACX,QAAQ,KAAK,CAAC,IAAI,EAAE;gBAClB,KAAK,oBAAoB;oBACvB,MAAM,GAAG,MAAM,uCAAkB,CAAC;wBAChC,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;wBAChC,SAAS,EAAE,OAAO,CAAC,SAAS;wBAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,wBAAwB;oBAC3B,MAAM,GAAG,MAAM,+CAAsB,CAAC;wBACpC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,OAAO,EAAE,KAAK,CAAC,OAAO;wBACtB,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,2BAA2B;oBAC9B,MAAM,GAAG,MAAM,qDAAyB,CAAC;wBACvC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,WAAW,EAAE,KAAK,CAAC,WAAW;wBAC9B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,OAAO;oBACV,MAAM,GAAG,MAAM,2BAAY,CAAC;wBAC1B,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,OAAO;wBACP,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR;oBACE,aAAW,CAAC,WAAW,CAAC,KAAK,CAAC,CAAC;oBAC/B,MAAM,IAAI,KAAK,CAAC,QAAQ,CAAC,CAAC;aAC7B;YAED,OAAO,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,CAAC;QACnC,CAAC,EAAE,IAAI,CAAC,YAAY,CAAC,aAAa,CAAC,CAAC,CACrC;aACA,OAAO,CAAC,GAAG,EAAE;YACZ,aAAa;QACf,CAAC,CAAC,CAAC;QAEL,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC;YACf,IAAI,EAAE,oBAAoB;YAC1B,OAAO,EAAE,oBAAoB;YAC7B,KAAK,EAAE,SAAS;SACjB,CAAC,CAAC;IACL,CAAC;IAEO,YAAY,CAAC,MAAuB;QAC1C,IAAI,MAAM,CAAC,YAAY,KAAK,SAAS,EAAE;YACrC,IAAI,CAAC,WAAW,CAAC,MAAM,CAAC,YAAY,CAAC,CAAC;SACvC;QAED,OAAO,MAAM,CAAC,OAAO,CAAC;IACxB,CAAC;IAEO,WAAW,CAAC,mBAA2B;QAC7C,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,GAAG,UAAU,CAC5B,GAAG,EAAE,CAAC,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,CAAC,EAChD,mBAAmB,GAAG,YAAY,CAE5B,CAAC;IACX,CAAC;IAEO,UAAU;QAChB,IAAI,IAAI,CAAC,YAAY,KAAK,SAAS,EAAE;YACnC,YAAY,CAAC,IAAI,CAAC,YAAY,CAAC,CAAC;YAChC,IAAI,CAAC,YAAY,GAAG,SAAS,CAAC;SAC/B;IACH,CAAC;CACF;AAnOD,8BAmOC","file":"neo-one-node-consensus/src/Consensus.js","sourcesContent":["/// <reference types=\"@reactivex/ix-es2015-cjs\" />\nimport { common, crypto, ECPoint, PrivateKey, UInt160 } from '@neo-one/client-common';\nimport { Monitor } from '@neo-one/monitor';\nimport { ConsensusPayload, Node, Transaction } from '@neo-one/node-core';\nimport { finalize, mergeScanLatest, utils as commonUtils } from '@neo-one/utils';\nimport { AsyncIterableX } from '@reactivex/ix-es2015-cjs/asynciterable/asynciterablex';\nimport { scan } from '@reactivex/ix-es2015-cjs/asynciterable/pipe/scan';\nimport { Observable } from 'rxjs';\nimport { map, take } from 'rxjs/operators';\nimport { initializeNewConsensus } from './common';\nimport { ConsensusContext } from './ConsensusContext';\nimport { ConsensusQueue } from './ConsensusQueue';\nimport { Context } from './context';\nimport { handleConsensusPayload } from './handleConsensusPayload';\nimport { handlePersistBlock } from './handlePersistBlock';\nimport { handleTransactionReceived } from './handleTransactionReceived';\nimport { runConsensus } from './runConsensus';\nimport { Event, Result } from './types';\n\nexport interface Options {\n  readonly privateKey: string;\n  readonly privateNet: boolean;\n}\nexport interface InternalOptions {\n  readonly privateKey: PrivateKey;\n  readonly publicKey: ECPoint;\n  readonly feeAddress: UInt160;\n  readonly privateNet: boolean;\n}\n\nconst MS_IN_SECOND = 1000;\n\nexport class Consensus {\n  private mutableQueue: ConsensusQueue;\n  private mutableTimer: number | undefined;\n  private readonly options$: Observable<InternalOptions>;\n  private readonly node: Node;\n  private mutableConsensusContext: ConsensusContext;\n  private readonly monitor: Monitor;\n  private mutableStartPromise: Promise<void> | undefined;\n\n  public constructor({\n    options$,\n    node,\n    monitor,\n  }: {\n    readonly options$: Observable<Options>;\n    readonly node: Node;\n    readonly monitor: Monitor;\n  }) {\n    this.mutableQueue = new ConsensusQueue();\n    this.options$ = options$.pipe(\n      map((options) => {\n        const privateKey = common.stringToPrivateKey(options.privateKey);\n        const publicKey = crypto.privateKeyToPublicKey(privateKey);\n        const feeAddress = crypto.publicKeyToScriptHash(publicKey);\n\n        return {\n          privateKey,\n          publicKey,\n          feeAddress,\n          privateNet: options.privateNet,\n        };\n      }),\n    );\n\n    this.node = node;\n    this.mutableConsensusContext = new ConsensusContext();\n    this.monitor = monitor.at('node_consensus');\n  }\n\n  public start$(): Observable<void> {\n    return this.options$.pipe(\n      mergeScanLatest<InternalOptions, void>(async (_, options) => {\n        await this.pause();\n        this.doStart(options);\n      }),\n      finalize(async () => {\n        await this.pause();\n      }),\n    );\n  }\n\n  public onPersistBlock(): void {\n    this.mutableQueue.write({ type: 'handlePersistBlock' });\n  }\n\n  public onConsensusPayloadReceived(payload: ConsensusPayload): void {\n    this.mutableQueue.write({\n      type: 'handleConsensusPayload',\n      payload,\n    });\n  }\n\n  public onTransactionReceived(transaction: Transaction): void {\n    this.mutableQueue.write({\n      type: 'handleTransactionReceived',\n      transaction,\n    });\n  }\n\n  public async runConsensusNow(): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableQueue.write({ type: 'timer' });\n    } else {\n      throw new Error('Can only force consensus on a private network.');\n    }\n  }\n\n  public nowSeconds(): number {\n    return this.mutableConsensusContext.nowSeconds();\n  }\n\n  public async fastForwardOffset(seconds: number): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableConsensusContext.fastForwardOffset(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async fastForwardToTime(seconds: number): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    if (options.privateNet) {\n      this.mutableConsensusContext.fastForwardToTime(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async pause(): Promise<void> {\n    this.clearTimer();\n    this.mutableQueue.done();\n    this.mutableQueue = new ConsensusQueue();\n    if (this.mutableStartPromise !== undefined) {\n      await this.mutableStartPromise;\n    }\n  }\n\n  public async reset(): Promise<void> {\n    this.mutableConsensusContext = new ConsensusContext();\n  }\n\n  public async resume(): Promise<void> {\n    const options = await this.options$.pipe(take(1)).toPromise();\n    // tslint:disable-next-line no-floating-promises\n    this.doStart(options);\n  }\n\n  private doStart(options: InternalOptions): void {\n    let completed = false;\n    const mutableStartPromise = this.start(options).then(() => {\n      completed = true;\n      this.mutableStartPromise = undefined;\n    });\n    if (!completed) {\n      this.mutableStartPromise = mutableStartPromise;\n    }\n  }\n\n  private async start(options: InternalOptions): Promise<void> {\n    this.monitor.log({\n      name: 'neo_consensus_start',\n      message: 'Consensus started.',\n      level: 'verbose',\n    });\n\n    const initialResult = await initializeNewConsensus({\n      blockchain: this.node.blockchain,\n      publicKey: options.publicKey,\n      consensusContext: this.mutableConsensusContext,\n    });\n\n    await AsyncIterableX.from(this.mutableQueue)\n      .pipe(\n        scan(async (context: Context, event: Event) => {\n          let result;\n          switch (event.type) {\n            case 'handlePersistBlock':\n              result = await handlePersistBlock({\n                blockchain: this.node.blockchain,\n                publicKey: options.publicKey,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleConsensusPayload':\n              result = await handleConsensusPayload({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                payload: event.payload,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleTransactionReceived':\n              result = await handleTransactionReceived({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                transaction: event.transaction,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'timer':\n              result = await runConsensus({\n                context,\n                node: this.node,\n                options,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            default:\n              commonUtils.assertNever(event);\n              throw new Error('For TS');\n          }\n\n          return this.handleResult(result);\n        }, this.handleResult(initialResult)),\n      )\n      .forEach(() => {\n        // do nothing\n      });\n\n    this.monitor.log({\n      name: 'neo_consensus_stop',\n      message: 'Consensus stopped.',\n      level: 'verbose',\n    });\n  }\n\n  private handleResult(result: Result<Context>): Context {\n    if (result.timerSeconds !== undefined) {\n      this.handleTimer(result.timerSeconds);\n    }\n\n    return result.context;\n  }\n\n  private handleTimer(mutableTimerSeconds: number): void {\n    this.clearTimer();\n    this.mutableTimer = setTimeout(\n      () => this.mutableQueue.write({ type: 'timer' }),\n      mutableTimerSeconds * MS_IN_SECOND,\n      // tslint:disable-next-line no-any\n    ) as any;\n  }\n\n  private clearTimer(): void {\n    if (this.mutableTimer !== undefined) {\n      clearTimeout(this.mutableTimer);\n      this.mutableTimer = undefined;\n    }\n  }\n}\n"]}