UNPKG

21.3 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const tslib_1 = require("tslib");
4const lodash_1 = tslib_1.__importDefault(require("lodash"));
5const FETCH_TIMEOUT_MS = 20000;
6const QUEUE_SIZE = 1000;
7const BATCH_SIZE = 10;
8class AsyncBlockIterator {
9 constructor({ client, options: { indexStart, indexStop, monitor }, fetchTimeoutMS = FETCH_TIMEOUT_MS, batchSize = BATCH_SIZE, }) {
10 this.client = client;
11 this.mutableItems = [];
12 this.mutableResolvers = [];
13 this.mutableDone = false;
14 this.mutableCurrentIndex = indexStart;
15 this.mutableFetching = false;
16 this.indexStop = indexStop;
17 this.fetchTimeoutMS = fetchTimeoutMS;
18 this.batchSize = batchSize;
19 this.monitor = monitor === undefined ? undefined : monitor.at('async_block_iterator');
20 }
21 [Symbol.asyncIterator]() {
22 return this;
23 }
24 async next() {
25 if (!this.mutableDone) {
26 this.fetch();
27 }
28 const item = this.mutableItems.shift();
29 if (item !== undefined) {
30 if (item.type === 'error') {
31 return Promise.reject(item.error);
32 }
33 return Promise.resolve({ done: false, value: item.value });
34 }
35 if (this.mutableDone) {
36 // tslint:disable-next-line no-any
37 return Promise.resolve({ done: true });
38 }
39 // tslint:disable-next-line promise-must-complete
40 return new Promise((resolve, reject) => {
41 this.mutableResolvers.push({ resolve, reject });
42 });
43 }
44 write(value) {
45 this.push({ type: 'value', value });
46 }
47 error(error) {
48 this.push({ type: 'error', error });
49 }
50 push(item) {
51 if (this.mutableDone) {
52 /* istanbul ignore next */
53 throw new Error('AsyncBlockIterator already ended');
54 }
55 const resolver = this.mutableResolvers.shift();
56 if (resolver !== undefined) {
57 const { resolve, reject } = resolver;
58 if (item.type === 'error') {
59 reject(item.error);
60 }
61 else {
62 resolve({ done: false, value: item.value });
63 }
64 }
65 else {
66 this.mutableItems.push(item);
67 }
68 }
69 done() {
70 // tslint:disable-next-line no-any
71 this.mutableResolvers.forEach(({ resolve }) => resolve({ done: true }));
72 // tslint:disable-next-line no-any
73 this.mutableResolvers = [];
74 this.mutableDone = true;
75 }
76 fetch() {
77 if (this.mutableFetching) {
78 return;
79 }
80 this.mutableFetching = true;
81 this.asyncFetch()
82 .then(() => {
83 this.mutableFetching = false;
84 })
85 .catch((error) => {
86 this.mutableFetching = false;
87 this.error(error);
88 });
89 }
90 async asyncFetch() {
91 let startHeight = this.mutableStartHeight;
92 let indexIn = this.mutableCurrentIndex;
93 if (startHeight === undefined || indexIn === undefined) {
94 const blockCount = await this.client.getBlockCount(this.monitor);
95 if (startHeight === undefined) {
96 startHeight = blockCount - 1;
97 this.mutableStartHeight = startHeight;
98 }
99 if (indexIn === undefined) {
100 indexIn = blockCount - 1;
101 this.mutableCurrentIndex = indexIn;
102 }
103 }
104 const index = indexIn;
105 const incIndex = (value) => {
106 if (this.mutableCurrentIndex === undefined) {
107 throw new Error('Something went wrong!');
108 }
109 this.mutableCurrentIndex += value;
110 };
111 if (this.indexStop !== undefined && index >= this.indexStop) {
112 this.done();
113 }
114 else if (index >= startHeight) {
115 const [block, newStartHeight] = await Promise.all([
116 this.fetchOne(index),
117 // Refresh the block count in case we got behind somehow
118 this.client.getBlockCount(this.monitor),
119 ]);
120 incIndex(1);
121 this.write(block);
122 this.mutableStartHeight = newStartHeight;
123 }
124 else {
125 let toFetch = Math.min(QUEUE_SIZE - this.mutableItems.length, startHeight - index);
126 if (this.indexStop !== undefined) {
127 toFetch = Math.min(toFetch, this.indexStop - index);
128 }
129 // tslint:disable-next-line no-loop-statement
130 for (const chunk of lodash_1.default.chunk(lodash_1.default.range(0, toFetch), this.batchSize)) {
131 const blocks = await Promise.all(chunk.map(async (offset) => this.fetchOne(index + offset, true)));
132 incIndex(chunk.length);
133 blocks.forEach((block) => this.write(block));
134 }
135 }
136 }
137 async fetchOne(index, isBatch = false) {
138 try {
139 // tslint:disable-next-line no-unnecessary-local-variable prefer-immediate-return
140 const block = await this.client.getBlock(index, isBatch
141 ? {
142 monitor: this.monitor,
143 }
144 : {
145 timeoutMS: this.fetchTimeoutMS,
146 monitor: this.monitor,
147 });
148 // tslint:disable-next-line no-var-before-return
149 return block;
150 }
151 catch (error) {
152 if (error.code === 'UNKNOWN_BLOCK') {
153 return this.fetchOne(index, isBatch);
154 }
155 throw error;
156 }
157 }
158}
159exports.AsyncBlockIterator = AsyncBlockIterator;
160
161//# sourceMappingURL=data:application/json;charset=utf8;base64,