1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const client_common_1 = require("@neo-one/client-common");
|
4 | const logger_1 = require("@neo-one/logger");
|
5 | const utils_1 = require("@neo-one/utils");
|
6 | const asynciterablex_1 = require("@reactivex/ix-es2015-cjs/asynciterable/asynciterablex");
|
7 | const scan_1 = require("@reactivex/ix-es2015-cjs/asynciterable/pipe/scan");
|
8 | const common_1 = require("./common");
|
9 | const ConsensusContext_1 = require("./ConsensusContext");
|
10 | const ConsensusQueue_1 = require("./ConsensusQueue");
|
11 | const handleConsensusPayload_1 = require("./handleConsensusPayload");
|
12 | const handlePersistBlock_1 = require("./handlePersistBlock");
|
13 | const handleTransactionReceived_1 = require("./handleTransactionReceived");
|
14 | const runConsensus_1 = require("./runConsensus");
|
15 | const logger = logger_1.createChild(logger_1.nodeLogger, { component: 'consensus' });
|
16 | const MS_IN_SECOND = 1000;
|
17 | class Consensus {
|
18 | constructor({ options, node }) {
|
19 | this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
|
20 | const privateKey = client_common_1.common.stringToPrivateKey(options.privateKey);
|
21 | const publicKey = client_common_1.crypto.privateKeyToPublicKey(privateKey);
|
22 | const feeAddress = client_common_1.crypto.publicKeyToScriptHash(publicKey);
|
23 | this.options = {
|
24 | privateKey,
|
25 | publicKey,
|
26 | feeAddress,
|
27 | privateNet: options.privateNet,
|
28 | };
|
29 | this.node = node;
|
30 | this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
|
31 | }
|
32 | async start() {
|
33 | let disposable = utils_1.noopDisposable;
|
34 | try {
|
35 | await this.pause();
|
36 | this.doStart(this.options);
|
37 | disposable = utils_1.composeDisposables(disposable, async () => {
|
38 | await this.pause();
|
39 | });
|
40 | return disposable;
|
41 | }
|
42 | catch (err) {
|
43 | await disposable();
|
44 | throw err;
|
45 | }
|
46 | }
|
47 | onPersistBlock() {
|
48 | this.mutableQueue.write({ type: 'handlePersistBlock' });
|
49 | }
|
50 | onConsensusPayloadReceived(payload) {
|
51 | this.mutableQueue.write({
|
52 | type: 'handleConsensusPayload',
|
53 | payload,
|
54 | });
|
55 | }
|
56 | onTransactionReceived(transaction) {
|
57 | this.mutableQueue.write({
|
58 | type: 'handleTransactionReceived',
|
59 | transaction,
|
60 | });
|
61 | }
|
62 | async runConsensusNow() {
|
63 | if (this.options.privateNet) {
|
64 | await new Promise((resolve, reject) => {
|
65 | this.mutableQueue.write({ type: 'timer', promise: { resolve, reject } });
|
66 | });
|
67 | }
|
68 | else {
|
69 | throw new Error('Can only force consensus on a private network.');
|
70 | }
|
71 | }
|
72 | nowSeconds() {
|
73 | return this.mutableConsensusContext.nowSeconds();
|
74 | }
|
75 | async fastForwardOffset(seconds) {
|
76 | if (this.options.privateNet) {
|
77 | this.mutableConsensusContext.fastForwardOffset(seconds);
|
78 | }
|
79 | else {
|
80 | throw new Error('Can only fast forward on a private network.');
|
81 | }
|
82 | }
|
83 | async fastForwardToTime(seconds) {
|
84 | if (this.options.privateNet) {
|
85 | this.mutableConsensusContext.fastForwardToTime(seconds);
|
86 | }
|
87 | else {
|
88 | throw new Error('Can only fast forward on a private network.');
|
89 | }
|
90 | }
|
91 | async pause() {
|
92 | this.clearTimer();
|
93 | this.mutableQueue.done();
|
94 | this.mutableQueue = new ConsensusQueue_1.ConsensusQueue();
|
95 | if (this.mutableStartPromise !== undefined) {
|
96 | await this.mutableStartPromise;
|
97 | }
|
98 | }
|
99 | async reset() {
|
100 | this.mutableConsensusContext = new ConsensusContext_1.ConsensusContext();
|
101 | }
|
102 | async resume() {
|
103 | this.doStart(this.options);
|
104 | }
|
105 | doStart(options) {
|
106 | let completed = false;
|
107 | const mutableStartPromise = this.startInternal(options).then(() => {
|
108 | completed = true;
|
109 | this.mutableStartPromise = undefined;
|
110 | });
|
111 | if (!completed) {
|
112 | this.mutableStartPromise = mutableStartPromise;
|
113 | }
|
114 | }
|
115 | async startInternal(options) {
|
116 | logger.info({ name: 'neo_consensus_start' }, 'Consensus started.');
|
117 | const initialResult = await common_1.initializeNewConsensus({
|
118 | blockchain: this.node.blockchain,
|
119 | publicKey: options.publicKey,
|
120 | consensusContext: this.mutableConsensusContext,
|
121 | });
|
122 | await asynciterablex_1.AsyncIterableX.from(this.mutableQueue)
|
123 | .pipe(scan_1.scan(async (context, event) => {
|
124 | let result;
|
125 | switch (event.type) {
|
126 | case 'handlePersistBlock':
|
127 | result = await handlePersistBlock_1.handlePersistBlock({
|
128 | blockchain: this.node.blockchain,
|
129 | publicKey: options.publicKey,
|
130 | consensusContext: this.mutableConsensusContext,
|
131 | });
|
132 | break;
|
133 | case 'handleConsensusPayload':
|
134 | result = await handleConsensusPayload_1.handleConsensusPayload({
|
135 | context,
|
136 | node: this.node,
|
137 | privateKey: options.privateKey,
|
138 | payload: event.payload,
|
139 | consensusContext: this.mutableConsensusContext,
|
140 | });
|
141 | break;
|
142 | case 'handleTransactionReceived':
|
143 | result = await handleTransactionReceived_1.handleTransactionReceived({
|
144 | context,
|
145 | node: this.node,
|
146 | privateKey: options.privateKey,
|
147 | transaction: event.transaction,
|
148 | consensusContext: this.mutableConsensusContext,
|
149 | });
|
150 | break;
|
151 | case 'timer':
|
152 | result = await runConsensus_1.runConsensus({
|
153 | context,
|
154 | node: this.node,
|
155 | options,
|
156 | consensusContext: this.mutableConsensusContext,
|
157 | }).catch((err) => {
|
158 | if (event.promise !== undefined) {
|
159 | event.promise.reject(err);
|
160 | }
|
161 | throw err;
|
162 | });
|
163 | if (event.promise !== undefined) {
|
164 | event.promise.resolve();
|
165 | }
|
166 | break;
|
167 | default:
|
168 | utils_1.utils.assertNever(event);
|
169 | throw new Error('For TS');
|
170 | }
|
171 | return this.handleResult(result);
|
172 | }, this.handleResult(initialResult)))
|
173 | .forEach(() => {
|
174 | });
|
175 | logger.info({ name: 'neo_consensus_stop' }, 'Consensus stopped.');
|
176 | }
|
177 | handleResult(result) {
|
178 | if (result.timerSeconds !== undefined) {
|
179 | this.handleTimer(result.timerSeconds);
|
180 | }
|
181 | return result.context;
|
182 | }
|
183 | handleTimer(mutableTimerSeconds) {
|
184 | this.clearTimer();
|
185 | this.mutableTimer = setTimeout(() => this.mutableQueue.write({ type: 'timer' }), mutableTimerSeconds * MS_IN_SECOND);
|
186 | }
|
187 | clearTimer() {
|
188 | if (this.mutableTimer !== undefined) {
|
189 | clearTimeout(this.mutableTimer);
|
190 | this.mutableTimer = undefined;
|
191 | }
|
192 | }
|
193 | }
|
194 | exports.Consensus = Consensus;
|
195 |
|
196 | //# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["Consensus.ts"],"names":[],"mappings":";;AACA,0DAAsF;AACtF,4CAA0D;AAE1D,0CAAsG;AACtG,0FAAuF;AACvF,2EAAwE;AACxE,qCAAkD;AAClD,yDAAsD;AACtD,qDAAkD;AAElD,qEAAkE;AAClE,6DAA0D;AAC1D,2EAAwE;AACxE,iDAA8C;AAG9C,MAAM,MAAM,GAAG,oBAAW,CAAC,mBAAU,EAAE,EAAE,SAAS,EAAE,WAAW,EAAE,CAAC,CAAC;AAanE,MAAM,YAAY,GAAG,IAAI,CAAC;AAE1B,MAAa,SAAS;IAQpB,YAAmB,EAAE,OAAO,EAAE,IAAI,EAAsD;QACtF,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QAEzC,MAAM,UAAU,GAAG,sBAAM,CAAC,kBAAkB,CAAC,OAAO,CAAC,UAAU,CAAC,CAAC;QACjE,MAAM,SAAS,GAAG,sBAAM,CAAC,qBAAqB,CAAC,UAAU,CAAC,CAAC;QAC3D,MAAM,UAAU,GAAG,sBAAM,CAAC,qBAAqB,CAAC,SAAS,CAAC,CAAC;QAC3D,IAAI,CAAC,OAAO,GAAG;YACb,UAAU;YACV,SAAS;YACT,UAAU;YACV,UAAU,EAAE,OAAO,CAAC,UAAU;SAC/B,CAAC;QAEF,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;IACxD,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,UAAU,GAAG,sBAAc,CAAC;QAChC,IAAI;YACF,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;YACnB,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;YAE3B,UAAU,GAAG,0BAAkB,CAAC,UAAU,EAAE,KAAK,IAAI,EAAE;gBACrD,MAAM,IAAI,CAAC,KAAK,EAAE,CAAC;YACrB,CAAC,CAAC,CAAC;YAEH,OAAO,UAAU,CAAC;SACnB;QAAC,OAAO,GAAG,EAAE;YACZ,MAAM,UAAU,EAAE,CAAC;YACnB,MAAM,GAAG,CAAC;SACX;IACH,CAAC;IAEM,cAAc;QACnB,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,oBAAoB,EAAE,CAAC,CAAC;IAC1D,CAAC;IAEM,0BAA0B,CAAC,OAAyB;QACzD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,wBAAwB;YAC9B,OAAO;SACR,CAAC,CAAC;IACL,CAAC;IAEM,qBAAqB,CAAC,WAAwB;QACnD,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC;YACtB,IAAI,EAAE,2BAA2B;YACjC,WAAW;SACZ,CAAC,CAAC;IACL,CAAC;IAEM,KAAK,CAAC,eAAe;QAC1B,IAAI,IAAI,CAAC,OAAO,CAAC,UAAU,EAAE;YAE3B,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;gBACpC,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,OAAO,EAAE,EAAE,OAAO,EAAE,MAAM,EAAE,EAAE,CAAC,CAAC;YAC3E,CAAC,CAAC,CAAC;SACJ;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,gDAAgD,CAAC,CAAC;SACnE;IACH,CAAC;IAEM,UAAU;QACf,OAAO,IAAI,CAAC,uBAAuB,CAAC,UAAU,EAAE,CAAC;IACnD,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,IAAI,IAAI,CAAC,OAAO,CAAC,UAAU,EAAE;YAC3B,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,iBAAiB,CAAC,OAAe;QAC5C,IAAI,IAAI,CAAC,OAAO,CAAC,UAAU,EAAE;YAC3B,IAAI,CAAC,uBAAuB,CAAC,iBAAiB,CAAC,OAAO,CAAC,CAAC;SACzD;aAAM;YACL,MAAM,IAAI,KAAK,CAAC,6CAA6C,CAAC,CAAC;SAChE;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,CAAC,IAAI,EAAE,CAAC;QACzB,IAAI,CAAC,YAAY,GAAG,IAAI,+BAAc,EAAE,CAAC;QACzC,IAAI,IAAI,CAAC,mBAAmB,KAAK,SAAS,EAAE;YAC1C,MAAM,IAAI,CAAC,mBAAmB,CAAC;SAChC;IACH,CAAC;IAEM,KAAK,CAAC,KAAK;QAChB,IAAI,CAAC,uBAAuB,GAAG,IAAI,mCAAgB,EAAE,CAAC;IACxD,CAAC;IAEM,KAAK,CAAC,MAAM;QAEjB,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;IAC7B,CAAC;IAEO,OAAO,CAAC,OAAwB;QACtC,IAAI,SAAS,GAAG,KAAK,CAAC;QACtB,MAAM,mBAAmB,GAAG,IAAI,CAAC,aAAa,CAAC,OAAO,CAAC,CAAC,IAAI,CAAC,GAAG,EAAE;YAChE,SAAS,GAAG,IAAI,CAAC;YACjB,IAAI,CAAC,mBAAmB,GAAG,SAAS,CAAC;QACvC,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE;YACd,IAAI,CAAC,mBAAmB,GAAG,mBAAmB,CAAC;SAChD;IACH,CAAC;IAEO,KAAK,CAAC,aAAa,CAAC,OAAwB;QAClD,MAAM,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,qBAAqB,EAAE,EAAE,oBAAoB,CAAC,CAAC;QAEnE,MAAM,aAAa,GAAG,MAAM,+BAAsB,CAAC;YACjD,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;YAChC,SAAS,EAAE,OAAO,CAAC,SAAS;YAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;SAC/C,CAAC,CAAC;QAEH,MAAM,+BAAc,CAAC,IAAI,CAAC,IAAI,CAAC,YAAY,CAAC;aACzC,IAAI,CACH,WAAI,CAAC,KAAK,EAAE,OAAgB,EAAE,KAAY,EAAE,EAAE;YAC5C,IAAI,MAAM,CAAC;YACX,QAAQ,KAAK,CAAC,IAAI,EAAE;gBAClB,KAAK,oBAAoB;oBACvB,MAAM,GAAG,MAAM,uCAAkB,CAAC;wBAChC,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,UAAU;wBAChC,SAAS,EAAE,OAAO,CAAC,SAAS;wBAC5B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,wBAAwB;oBAC3B,MAAM,GAAG,MAAM,+CAAsB,CAAC;wBACpC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,OAAO,EAAE,KAAK,CAAC,OAAO;wBACtB,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,2BAA2B;oBAC9B,MAAM,GAAG,MAAM,qDAAyB,CAAC;wBACvC,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,UAAU,EAAE,OAAO,CAAC,UAAU;wBAC9B,WAAW,EAAE,KAAK,CAAC,WAAW;wBAC9B,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC;oBAEH,MAAM;gBACR,KAAK,OAAO;oBACV,MAAM,GAAG,MAAM,2BAAY,CAAC;wBAC1B,OAAO;wBACP,IAAI,EAAE,IAAI,CAAC,IAAI;wBACf,OAAO;wBACP,gBAAgB,EAAE,IAAI,CAAC,uBAAuB;qBAC/C,CAAC,CAAC,KAAK,CAAC,CAAC,GAAG,EAAE,EAAE;wBACf,IAAI,KAAK,CAAC,OAAO,KAAK,SAAS,EAAE;4BAC/B,KAAK,CAAC,OAAO,CAAC,MAAM,CAAC,GAAG,CAAC,CAAC;yBAC3B;wBACD,MAAM,GAAG,CAAC;oBACZ,CAAC,CAAC,CAAC;oBACH,IAAI,KAAK,CAAC,OAAO,KAAK,SAAS,EAAE;wBAC/B,KAAK,CAAC,OAAO,CAAC,OAAO,EAAE,CAAC;qBACzB;oBACD,MAAM;gBACR;oBACE,aAAW,CAAC,WAAW,CAAC,KAAK,CAAC,CAAC;oBAC/B,MAAM,IAAI,KAAK,CAAC,QAAQ,CAAC,CAAC;aAC7B;YAED,OAAO,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,CAAC;QACnC,CAAC,EAAE,IAAI,CAAC,YAAY,CAAC,aAAa,CAAC,CAAC,CACrC;aACA,OAAO,CAAC,GAAG,EAAE;QAEd,CAAC,CAAC,CAAC;QAEL,MAAM,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,oBAAoB,EAAE,EAAE,oBAAoB,CAAC,CAAC;IACpE,CAAC;IAEO,YAAY,CAAC,MAAuB;QAC1C,IAAI,MAAM,CAAC,YAAY,KAAK,SAAS,EAAE;YACrC,IAAI,CAAC,WAAW,CAAC,MAAM,CAAC,YAAY,CAAC,CAAC;SACvC;QAED,OAAO,MAAM,CAAC,OAAO,CAAC;IACxB,CAAC;IAEO,WAAW,CAAC,mBAA2B;QAC7C,IAAI,CAAC,UAAU,EAAE,CAAC;QAClB,IAAI,CAAC,YAAY,GAAG,UAAU,CAC5B,GAAG,EAAE,CAAC,IAAI,CAAC,YAAY,CAAC,KAAK,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,CAAC,EAChD,mBAAmB,GAAG,YAAY,CAE5B,CAAC;IACX,CAAC;IAEO,UAAU;QAChB,IAAI,IAAI,CAAC,YAAY,KAAK,SAAS,EAAE;YACnC,YAAY,CAAC,IAAI,CAAC,YAAY,CAAC,CAAC;YAChC,IAAI,CAAC,YAAY,GAAG,SAAS,CAAC;SAC/B;IACH,CAAC;CACF;AAxND,8BAwNC","file":"neo-one-node-consensus/src/Consensus.js","sourcesContent":["/// <reference types=\"@reactivex/ix-es2015-cjs\" />\nimport { common, crypto, ECPoint, PrivateKey, UInt160 } from '@neo-one/client-common';\nimport { createChild, nodeLogger } from '@neo-one/logger';\nimport { ConsensusPayload, Node, Transaction } from '@neo-one/node-core';\nimport { composeDisposables, Disposable, noopDisposable, utils as commonUtils } from '@neo-one/utils';\nimport { AsyncIterableX } from '@reactivex/ix-es2015-cjs/asynciterable/asynciterablex';\nimport { scan } from '@reactivex/ix-es2015-cjs/asynciterable/pipe/scan';\nimport { initializeNewConsensus } from './common';\nimport { ConsensusContext } from './ConsensusContext';\nimport { ConsensusQueue } from './ConsensusQueue';\nimport { Context } from './context';\nimport { handleConsensusPayload } from './handleConsensusPayload';\nimport { handlePersistBlock } from './handlePersistBlock';\nimport { handleTransactionReceived } from './handleTransactionReceived';\nimport { runConsensus } from './runConsensus';\nimport { Event, Result } from './types';\n\nconst logger = createChild(nodeLogger, { component: 'consensus' });\n\nexport interface Options {\n  readonly privateKey: string;\n  readonly privateNet: boolean;\n}\nexport interface InternalOptions {\n  readonly privateKey: PrivateKey;\n  readonly publicKey: ECPoint;\n  readonly feeAddress: UInt160;\n  readonly privateNet: boolean;\n}\n\nconst MS_IN_SECOND = 1000;\n\nexport class Consensus {\n  private mutableQueue: ConsensusQueue;\n  private mutableTimer: number | undefined;\n  private readonly options: InternalOptions;\n  private readonly node: Node;\n  private mutableConsensusContext: ConsensusContext;\n  private mutableStartPromise: Promise<void> | undefined;\n\n  public constructor({ options, node }: { readonly options: Options; readonly node: Node }) {\n    this.mutableQueue = new ConsensusQueue();\n\n    const privateKey = common.stringToPrivateKey(options.privateKey);\n    const publicKey = crypto.privateKeyToPublicKey(privateKey);\n    const feeAddress = crypto.publicKeyToScriptHash(publicKey);\n    this.options = {\n      privateKey,\n      publicKey,\n      feeAddress,\n      privateNet: options.privateNet,\n    };\n\n    this.node = node;\n    this.mutableConsensusContext = new ConsensusContext();\n  }\n\n  public async start(): Promise<Disposable> {\n    let disposable = noopDisposable;\n    try {\n      await this.pause();\n      this.doStart(this.options);\n\n      disposable = composeDisposables(disposable, async () => {\n        await this.pause();\n      });\n\n      return disposable;\n    } catch (err) {\n      await disposable();\n      throw err;\n    }\n  }\n\n  public onPersistBlock(): void {\n    this.mutableQueue.write({ type: 'handlePersistBlock' });\n  }\n\n  public onConsensusPayloadReceived(payload: ConsensusPayload): void {\n    this.mutableQueue.write({\n      type: 'handleConsensusPayload',\n      payload,\n    });\n  }\n\n  public onTransactionReceived(transaction: Transaction): void {\n    this.mutableQueue.write({\n      type: 'handleTransactionReceived',\n      transaction,\n    });\n  }\n\n  public async runConsensusNow(): Promise<void> {\n    if (this.options.privateNet) {\n      // tslint:disable-next-line promise-must-complete\n      await new Promise((resolve, reject) => {\n        this.mutableQueue.write({ type: 'timer', promise: { resolve, reject } });\n      });\n    } else {\n      throw new Error('Can only force consensus on a private network.');\n    }\n  }\n\n  public nowSeconds(): number {\n    return this.mutableConsensusContext.nowSeconds();\n  }\n\n  public async fastForwardOffset(seconds: number): Promise<void> {\n    if (this.options.privateNet) {\n      this.mutableConsensusContext.fastForwardOffset(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async fastForwardToTime(seconds: number): Promise<void> {\n    if (this.options.privateNet) {\n      this.mutableConsensusContext.fastForwardToTime(seconds);\n    } else {\n      throw new Error('Can only fast forward on a private network.');\n    }\n  }\n\n  public async pause(): Promise<void> {\n    this.clearTimer();\n    this.mutableQueue.done();\n    this.mutableQueue = new ConsensusQueue();\n    if (this.mutableStartPromise !== undefined) {\n      await this.mutableStartPromise;\n    }\n  }\n\n  public async reset(): Promise<void> {\n    this.mutableConsensusContext = new ConsensusContext();\n  }\n\n  public async resume(): Promise<void> {\n    // tslint:disable-next-line no-floating-promises\n    this.doStart(this.options);\n  }\n\n  private doStart(options: InternalOptions): void {\n    let completed = false;\n    const mutableStartPromise = this.startInternal(options).then(() => {\n      completed = true;\n      this.mutableStartPromise = undefined;\n    });\n    if (!completed) {\n      this.mutableStartPromise = mutableStartPromise;\n    }\n  }\n\n  private async startInternal(options: InternalOptions): Promise<void> {\n    logger.info({ name: 'neo_consensus_start' }, 'Consensus started.');\n\n    const initialResult = await initializeNewConsensus({\n      blockchain: this.node.blockchain,\n      publicKey: options.publicKey,\n      consensusContext: this.mutableConsensusContext,\n    });\n\n    await AsyncIterableX.from(this.mutableQueue)\n      .pipe(\n        scan(async (context: Context, event: Event) => {\n          let result;\n          switch (event.type) {\n            case 'handlePersistBlock':\n              result = await handlePersistBlock({\n                blockchain: this.node.blockchain,\n                publicKey: options.publicKey,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleConsensusPayload':\n              result = await handleConsensusPayload({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                payload: event.payload,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'handleTransactionReceived':\n              result = await handleTransactionReceived({\n                context,\n                node: this.node,\n                privateKey: options.privateKey,\n                transaction: event.transaction,\n                consensusContext: this.mutableConsensusContext,\n              });\n\n              break;\n            case 'timer':\n              result = await runConsensus({\n                context,\n                node: this.node,\n                options,\n                consensusContext: this.mutableConsensusContext,\n              }).catch((err) => {\n                if (event.promise !== undefined) {\n                  event.promise.reject(err);\n                }\n                throw err;\n              });\n              if (event.promise !== undefined) {\n                event.promise.resolve();\n              }\n              break;\n            default:\n              commonUtils.assertNever(event);\n              throw new Error('For TS');\n          }\n\n          return this.handleResult(result);\n        }, this.handleResult(initialResult)),\n      )\n      .forEach(() => {\n        // do nothing\n      });\n\n    logger.info({ name: 'neo_consensus_stop' }, 'Consensus stopped.');\n  }\n\n  private handleResult(result: Result<Context>): Context {\n    if (result.timerSeconds !== undefined) {\n      this.handleTimer(result.timerSeconds);\n    }\n\n    return result.context;\n  }\n\n  private handleTimer(mutableTimerSeconds: number): void {\n    this.clearTimer();\n    this.mutableTimer = setTimeout(\n      () => this.mutableQueue.write({ type: 'timer' }),\n      mutableTimerSeconds * MS_IN_SECOND,\n      // tslint:disable-next-line no-any\n    ) as any;\n  }\n\n  private clearTimer(): void {\n    if (this.mutableTimer !== undefined) {\n      clearTimeout(this.mutableTimer);\n      this.mutableTimer = undefined;\n    }\n  }\n}\n"]}
|