1 | const lodash = require('lodash');
|
2 | const { loop } = require('../utils');
|
3 | const LazyPromise = require('./LazyPromise');
|
4 |
|
5 | class LogIterator extends LazyPromise {
|
6 | constructor(cfx, func, [filter]) {
|
7 | super(func, [filter]);
|
8 |
|
9 | this.cfx = cfx;
|
10 | this.filter = filter;
|
11 |
|
12 | this._epoch = lodash.get(filter, 'fromEpoch', 0);
|
13 | this._count = 0;
|
14 | this._queue = [];
|
15 | }
|
16 |
|
17 | async _isConfirmed(epochNumber, threshold) {
|
18 | if (epochNumber === undefined) {
|
19 | return false;
|
20 | }
|
21 |
|
22 | const risk = await this.cfx.getRiskCoefficient(epochNumber);
|
23 | return risk < threshold;
|
24 | }
|
25 |
|
26 | async _popUnconfirmed(logs, threshold) {
|
27 | const unconfirmedSet = new Set();
|
28 |
|
29 | while (logs.length) {
|
30 | const { epochNumber } = lodash.last(logs);
|
31 | if (unconfirmedSet.has(epochNumber) || !await this._isConfirmed(epochNumber, threshold)) {
|
32 | logs.pop();
|
33 | } else {
|
34 | break;
|
35 | }
|
36 | }
|
37 |
|
38 | return logs;
|
39 | }
|
40 |
|
41 | async _readConfirmed({ threshold = 0.01, delta = 1000, timeout = 30 * 60 * 1000 } = {}) {
|
42 | if (this._epoch > this.filter.toEpoch || this._count >= this.filter.limit) {
|
43 | return [];
|
44 | }
|
45 |
|
46 | return loop({ delta, timeout }, async () => {
|
47 | const logs = await this.cfx.getLogs({
|
48 | ...this.filter,
|
49 | fromEpoch: this._epoch,
|
50 | limit: this.filter.limit === undefined ? undefined : this.filter.limit - this._count,
|
51 | });
|
52 |
|
53 | if (await this._isConfirmed(this.filter.toEpoch, threshold)) {
|
54 | this._epoch = Infinity;
|
55 | return logs;
|
56 | }
|
57 |
|
58 | await this._popUnconfirmed(logs, threshold);
|
59 |
|
60 | if (logs.length) {
|
61 | this._epoch = lodash.last(logs).epochNumber + 1;
|
62 | return logs;
|
63 | }
|
64 |
|
65 | return undefined;
|
66 | });
|
67 | }
|
68 |
|
69 | async next(options) {
|
70 | if (!this._queue.length) {
|
71 | const logs = await this._readConfirmed(options);
|
72 | this._queue.push(...logs);
|
73 | }
|
74 |
|
75 | return this._queue.shift();
|
76 | }
|
77 |
|
78 | [Symbol.asyncIterator]() {
|
79 | return {
|
80 | next: async () => {
|
81 | const value = await this.next();
|
82 | return { value, done: value === undefined };
|
83 | },
|
84 | };
|
85 | }
|
86 | }
|
87 |
|
88 | module.exports = LogIterator;
|