UNPKG

21.3 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const tslib_1 = require("tslib");
4const lodash_1 = tslib_1.__importDefault(require("lodash"));
5const FETCH_TIMEOUT_MS = 20000;
6const QUEUE_SIZE = 1000;
7const BATCH_SIZE = 10;
8class AsyncBlockIterator {
9 constructor({ client, options: { indexStart, indexStop, monitor }, fetchTimeoutMS = FETCH_TIMEOUT_MS, batchSize = BATCH_SIZE, }) {
10 this.client = client;
11 this.mutableItems = [];
12 this.mutableResolvers = [];
13 this.mutableDone = false;
14 this.mutableCurrentIndex = indexStart;
15 this.mutableFetching = false;
16 this.indexStop = indexStop;
17 this.fetchTimeoutMS = fetchTimeoutMS;
18 this.batchSize = batchSize;
19 this.monitor = monitor === undefined ? undefined : monitor.at('async_block_iterator');
20 }
21 [Symbol.asyncIterator]() {
22 return this;
23 }
24 async next() {
25 if (!this.mutableDone) {
26 this.fetch();
27 }
28 const item = this.mutableItems.shift();
29 if (item !== undefined) {
30 if (item.type === 'error') {
31 return Promise.reject(item.error);
32 }
33 return Promise.resolve({ done: false, value: item.value });
34 }
35 if (this.mutableDone) {
36 // tslint:disable-next-line no-any
37 return Promise.resolve({ done: true });
38 }
39 // tslint:disable-next-line promise-must-complete
40 return new Promise((resolve, reject) => {
41 this.mutableResolvers.push({ resolve, reject });
42 });
43 }
44 write(value) {
45 this.push({ type: 'value', value });
46 }
47 error(error) {
48 this.push({ type: 'error', error });
49 }
50 push(item) {
51 if (this.mutableDone) {
52 /* istanbul ignore next */
53 throw new Error('AsyncBlockIterator already ended');
54 }
55 const resolver = this.mutableResolvers.shift();
56 if (resolver !== undefined) {
57 const { resolve, reject } = resolver;
58 if (item.type === 'error') {
59 reject(item.error);
60 }
61 else {
62 resolve({ done: false, value: item.value });
63 }
64 }
65 else {
66 this.mutableItems.push(item);
67 }
68 }
69 done() {
70 // tslint:disable-next-line no-any
71 this.mutableResolvers.forEach(({ resolve }) => resolve({ done: true }));
72 // tslint:disable-next-line no-any
73 this.mutableResolvers = [];
74 this.mutableDone = true;
75 }
76 fetch() {
77 if (this.mutableFetching) {
78 return;
79 }
80 this.mutableFetching = true;
81 this.asyncFetch()
82 .then(() => {
83 this.mutableFetching = false;
84 })
85 .catch((error) => {
86 this.mutableFetching = false;
87 this.error(error);
88 });
89 }
90 async asyncFetch() {
91 let startHeight = this.mutableStartHeight;
92 let indexIn = this.mutableCurrentIndex;
93 if (startHeight === undefined || indexIn === undefined) {
94 const blockCount = await this.client.getBlockCount(this.monitor);
95 if (startHeight === undefined) {
96 startHeight = blockCount - 1;
97 this.mutableStartHeight = startHeight;
98 }
99 if (indexIn === undefined) {
100 indexIn = blockCount - 1;
101 this.mutableCurrentIndex = indexIn;
102 }
103 }
104 const index = indexIn;
105 const incIndex = (value) => {
106 if (this.mutableCurrentIndex === undefined) {
107 throw new Error('Something went wrong!');
108 }
109 this.mutableCurrentIndex += value;
110 };
111 if (this.indexStop !== undefined && index >= this.indexStop) {
112 this.done();
113 }
114 else if (index >= startHeight) {
115 const [block, newStartHeight] = await Promise.all([
116 this.fetchOne(index),
117 // Refresh the block count in case we got behind somehow
118 this.client.getBlockCount(this.monitor),
119 ]);
120 incIndex(1);
121 this.write(block);
122 this.mutableStartHeight = newStartHeight;
123 }
124 else {
125 let toFetch = Math.min(QUEUE_SIZE - this.mutableItems.length, startHeight - index);
126 if (this.indexStop !== undefined) {
127 toFetch = Math.min(toFetch, this.indexStop - index);
128 }
129 // tslint:disable-next-line no-loop-statement
130 for (const chunk of lodash_1.default.chunk(lodash_1.default.range(0, toFetch), this.batchSize)) {
131 const blocks = await Promise.all(chunk.map(async (offset) => this.fetchOne(index + offset, true)));
132 incIndex(chunk.length);
133 blocks.forEach((block) => this.write(block));
134 }
135 }
136 }
137 async fetchOne(index, isBatch = false) {
138 try {
139 // tslint:disable-next-line no-unnecessary-local-variable prefer-immediate-return
140 const block = await this.client.getBlock(index, isBatch
141 ? {
142 monitor: this.monitor,
143 }
144 : {
145 timeoutMS: this.fetchTimeoutMS,
146 monitor: this.monitor,
147 });
148 // tslint:disable-next-line no-var-before-return
149 return block;
150 }
151 catch (error) {
152 if (error.code === 'UNKNOWN_BLOCK') {
153 return this.fetchOne(index, isBatch);
154 }
155 throw error;
156 }
157 }
158}
159exports.AsyncBlockIterator = AsyncBlockIterator;
160
161//# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["AsyncBlockIterator.ts"],"names":[],"mappings":";;;AAEA,4DAAuB;AAoBvB,MAAM,gBAAgB,GAAG,KAAK,CAAC;AAC/B,MAAM,UAAU,GAAG,IAAI,CAAC;AACxB,MAAM,UAAU,GAAG,EAAE,CAAC;AAEtB,MAAa,kBAAkB;IAa7B,YAAmB,EACjB,MAAM,EACN,OAAO,EAAE,EAAE,UAAU,EAAE,SAAS,EAAE,OAAO,EAAE,EAC3C,cAAc,GAAG,gBAAgB,EACjC,SAAS,GAAG,UAAU,GACI;QAC1B,IAAI,CAAC,MAAM,GAAG,MAAM,CAAC;QACrB,IAAI,CAAC,YAAY,GAAG,EAAE,CAAC;QACvB,IAAI,CAAC,gBAAgB,GAAG,EAAE,CAAC;QAC3B,IAAI,CAAC,WAAW,GAAG,KAAK,CAAC;QACzB,IAAI,CAAC,mBAAmB,GAAG,UAAU,CAAC;QACtC,IAAI,CAAC,eAAe,GAAG,KAAK,CAAC;QAC7B,IAAI,CAAC,SAAS,GAAG,SAAS,CAAC;QAC3B,IAAI,CAAC,cAAc,GAAG,cAAc,CAAC;QACrC,IAAI,CAAC,SAAS,GAAG,SAAS,CAAC;QAC3B,IAAI,CAAC,OAAO,GAAG,OAAO,KAAK,SAAS,CAAC,CAAC,CAAC,SAAS,CAAC,CAAC,CAAC,OAAO,CAAC,EAAE,CAAC,sBAAsB,CAAC,CAAC;IACxF,CAAC;IAEM,CAAC,MAAM,CAAC,aAAa,CAAC;QAC3B,OAAO,IAAI,CAAC;IACd,CAAC;IAEM,KAAK,CAAC,IAAI;QACf,IAAI,CAAC,IAAI,CAAC,WAAW,EAAE;YACrB,IAAI,CAAC,KAAK,EAAE,CAAC;SACd;QAED,MAAM,IAAI,GAAG,IAAI,CAAC,YAAY,CAAC,KAAK,EAAE,CAAC;QACvC,IAAI,IAAI,KAAK,SAAS,EAAE;YACtB,IAAI,IAAI,CAAC,IAAI,KAAK,OAAO,EAAE;gBACzB,OAAO,OAAO,CAAC,MAAM,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;aACnC;YAED,OAAO,OAAO,CAAC,OAAO,CAAC,EAAE,IAAI,EAAE,KAAK,EAAE,KAAK,EAAE,IAAI,CAAC,KAAK,EAAE,CAAC,CAAC;SAC5D;QAED,IAAI,IAAI,CAAC,WAAW,EAAE;YACpB,kCAAkC;YAClC,OAAO,OAAO,CAAC,OAAO,CAAC,EAAE,IAAI,EAAE,IAAI,EAAS,CAAC,CAAC;SAC/C;QAED,iDAAiD;QACjD,OAAO,IAAI,OAAO,CAAwB,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YAC5D,IAAI,CAAC,gBAAgB,CAAC,IAAI,CAAC,EAAE,OAAO,EAAE,MAAM,EAAE,CAAC,CAAC;QAClD,CAAC,CAAC,CAAC;IACL,CAAC;IAEO,KAAK,CAAC,KAAY;QACxB,IAAI,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,KAAK,EAAE,CAAC,CAAC;IACtC,CAAC;IAEO,KAAK,CAAC,KAAY;QACxB,IAAI,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,OAAO,EAAE,KAAK,EAAE,CAAC,CAAC;IACtC,CAAC;IAEO,IAAI,CAAC,IAAU;QACrB,IAAI,IAAI,CAAC,WAAW,EAAE;YACpB,0BAA0B;YAC1B,MAAM,IAAI,KAAK,CAAC,kCAAkC,CAAC,CAAC;SACrD;QAED,MAAM,QAAQ,GAAG,IAAI,CAAC,gBAAgB,CAAC,KAAK,EAAE,CAAC;QAC/C,IAAI,QAAQ,KAAK,SAAS,EAAE;YAC1B,MAAM,EAAE,OAAO,EAAE,MAAM,EAAE,GAAG,QAAQ,CAAC;YACrC,IAAI,IAAI,CAAC,IAAI,KAAK,OAAO,EAAE;gBACzB,MAAM,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;aACpB;iBAAM;gBACL,OAAO,CAAC,EAAE,IAAI,EAAE,KAAK,EAAE,KAAK,EAAE,IAAI,CAAC,KAAK,EAAE,CAAC,CAAC;aAC7C;SACF;aAAM;YACL,IAAI,CAAC,YAAY,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;SAC9B;IACH,CAAC;IAEO,IAAI;QACV,kCAAkC;QAClC,IAAI,CAAC,gBAAgB,CAAC,OAAO,CAAC,CAAC,EAAE,OAAO,EAAE,EAAE,EAAE,CAAC,OAAO,CAAC,EAAE,IAAI,EAAE,IAAI,EAAS,CAAC,CAAC,CAAC;QAC/E,kCAAkC;QAClC,IAAI,CAAC,gBAAgB,GAAG,EAAE,CAAC;QAC3B,IAAI,CAAC,WAAW,GAAG,IAAI,CAAC;IAC1B,CAAC;IAEO,KAAK;QACX,IAAI,IAAI,CAAC,eAAe,EAAE;YACxB,OAAO;SACR;QACD,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC;QAC5B,IAAI,CAAC,UAAU,EAAE;aACd,IAAI,CAAC,GAAG,EAAE;YACT,IAAI,CAAC,eAAe,GAAG,KAAK,CAAC;QAC/B,CAAC,CAAC;aACD,KAAK,CAAC,CAAC,KAAK,EAAE,EAAE;YACf,IAAI,CAAC,eAAe,GAAG,KAAK,CAAC;YAC7B,IAAI,CAAC,KAAK,CAAC,KAAK,CAAC,CAAC;QACpB,CAAC,CAAC,CAAC;IACP,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,IAAI,WAAW,GAAG,IAAI,CAAC,kBAAkB,CAAC;QAC1C,IAAI,OAAO,GAAG,IAAI,CAAC,mBAAmB,CAAC;QACvC,IAAI,WAAW,KAAK,SAAS,IAAI,OAAO,KAAK,SAAS,EAAE;YACtD,MAAM,UAAU,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,aAAa,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;YACjE,IAAI,WAAW,KAAK,SAAS,EAAE;gBAC7B,WAAW,GAAG,UAAU,GAAG,CAAC,CAAC;gBAC7B,IAAI,CAAC,kBAAkB,GAAG,WAAW,CAAC;aACvC;YACD,IAAI,OAAO,KAAK,SAAS,EAAE;gBACzB,OAAO,GAAG,UAAU,GAAG,CAAC,CAAC;gBACzB,IAAI,CAAC,mBAAmB,GAAG,OAAO,CAAC;aACpC;SACF;QACD,MAAM,KAAK,GAAG,OAAO,CAAC;QAEtB,MAAM,QAAQ,GAAG,CAAC,KAAa,EAAE,EAAE;YACjC,IAAI,IAAI,CAAC,mBAAmB,KAAK,SAAS,EAAE;gBAC1C,MAAM,IAAI,KAAK,CAAC,uBAAuB,CAAC,CAAC;aAC1C;YACD,IAAI,CAAC,mBAAmB,IAAI,KAAK,CAAC;QACpC,CAAC,CAAC;QAEF,IAAI,IAAI,CAAC,SAAS,KAAK,SAAS,IAAI,KAAK,IAAI,IAAI,CAAC,SAAS,EAAE;YAC3D,IAAI,CAAC,IAAI,EAAE,CAAC;SACb;aAAM,IAAI,KAAK,IAAI,WAAW,EAAE;YAC/B,MAAM,CAAC,KAAK,EAAE,cAAc,CAAC,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC;gBAChD,IAAI,CAAC,QAAQ,CAAC,KAAK,CAAC;gBACpB,wDAAwD;gBACxD,IAAI,CAAC,MAAM,CAAC,aAAa,CAAC,IAAI,CAAC,OAAO,CAAC;aACxC,CAAC,CAAC;YAEH,QAAQ,CAAC,CAAC,CAAC,CAAC;YACZ,IAAI,CAAC,KAAK,CAAC,KAAK,CAAC,CAAC;YAClB,IAAI,CAAC,kBAAkB,GAAG,cAAc,CAAC;SAC1C;aAAM;YACL,IAAI,OAAO,GAAG,IAAI,CAAC,GAAG,CAAC,UAAU,GAAG,IAAI,CAAC,YAAY,CAAC,MAAM,EAAE,WAAW,GAAG,KAAK,CAAC,CAAC;YAEnF,IAAI,IAAI,CAAC,SAAS,KAAK,SAAS,EAAE;gBAChC,OAAO,GAAG,IAAI,CAAC,GAAG,CAAC,OAAO,EAAE,IAAI,CAAC,SAAS,GAAG,KAAK,CAAC,CAAC;aACrD;YAED,6CAA6C;YAC7C,KAAK,MAAM,KAAK,IAAI,gBAAC,CAAC,KAAK,CAAC,gBAAC,CAAC,KAAK,CAAC,CAAC,EAAE,OAAO,CAAC,EAAE,IAAI,CAAC,SAAS,CAAC,EAAE;gBAChE,MAAM,MAAM,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC,KAAK,CAAC,GAAG,CAAC,KAAK,EAAE,MAAM,EAAE,EAAE,CAAC,IAAI,CAAC,QAAQ,CAAC,KAAK,GAAG,MAAM,EAAE,IAAI,CAAC,CAAC,CAAC,CAAC;gBAEnG,QAAQ,CAAC,KAAK,CAAC,MAAM,CAAC,CAAC;gBACvB,MAAM,CAAC,OAAO,CAAC,CAAC,KAAK,EAAE,EAAE,CAAC,IAAI,CAAC,KAAK,CAAC,KAAK,CAAC,CAAC,CAAC;aAC9C;SACF;IACH,CAAC;IAEO,KAAK,CAAC,QAAQ,CAAC,KAAa,EAAE,OAAO,GAAG,KAAK;QACnD,IAAI;YACF,iFAAiF;YACjF,MAAM,KAAK,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,QAAQ,CACtC,KAAK,EACL,OAAO;gBACL,CAAC,CAAC;oBACE,OAAO,EAAE,IAAI,CAAC,OAAO;iBACtB;gBACH,CAAC,CAAC;oBACE,SAAS,EAAE,IAAI,CAAC,cAAc;oBAC9B,OAAO,EAAE,IAAI,CAAC,OAAO;iBACtB,CACN,CAAC;YAEF,gDAAgD;YAChD,OAAO,KAAK,CAAC;SACd;QAAC,OAAO,KAAK,EAAE;YACd,IAAI,KAAK,CAAC,IAAI,KAAK,eAAe,EAAE;gBAClC,OAAO,IAAI,CAAC,QAAQ,CAAC,KAAK,EAAE,OAAO,CAAC,CAAC;aACtC;YAED,MAAM,KAAc,CAAC;SACtB;IACH,CAAC;CACF;AA3LD,gDA2LC","file":"neo-one-client-core/src/AsyncBlockIterator.js","sourcesContent":["import { Block, GetOptions, IterOptions } from '@neo-one/client-common';\nimport { Monitor } from '@neo-one/monitor';\nimport _ from 'lodash';\n\ntype Item = { readonly type: 'value'; readonly value: Block } | { readonly type: 'error'; readonly error: Error };\ninterface Resolver {\n  readonly resolve: (value: IteratorResult<Block>) => void;\n  readonly reject: (reason: Error) => void;\n}\n\ninterface Client {\n  readonly getBlockCount: (monitor?: Monitor) => Promise<number>;\n  readonly getBlock: (index: number, options?: GetOptions) => Promise<Block>;\n}\n\ninterface AsyncBlockIteratorOptions {\n  readonly client: Client;\n  readonly options: IterOptions;\n  readonly fetchTimeoutMS?: number;\n  readonly batchSize?: number;\n}\n\nconst FETCH_TIMEOUT_MS = 20000;\nconst QUEUE_SIZE = 1000;\nconst BATCH_SIZE = 10;\n\nexport class AsyncBlockIterator implements AsyncIterator<Block> {\n  private readonly client: Client;\n  private readonly mutableItems: Item[];\n  private mutableResolvers: Resolver[];\n  private mutableDone: boolean;\n  private mutableCurrentIndex: number | undefined;\n  private mutableFetching: boolean;\n  private mutableStartHeight: number | undefined;\n  private readonly indexStop: number | undefined;\n  private readonly fetchTimeoutMS: number;\n  private readonly batchSize: number;\n  private readonly monitor: Monitor | undefined;\n\n  public constructor({\n    client,\n    options: { indexStart, indexStop, monitor },\n    fetchTimeoutMS = FETCH_TIMEOUT_MS,\n    batchSize = BATCH_SIZE,\n  }: AsyncBlockIteratorOptions) {\n    this.client = client;\n    this.mutableItems = [];\n    this.mutableResolvers = [];\n    this.mutableDone = false;\n    this.mutableCurrentIndex = indexStart;\n    this.mutableFetching = false;\n    this.indexStop = indexStop;\n    this.fetchTimeoutMS = fetchTimeoutMS;\n    this.batchSize = batchSize;\n    this.monitor = monitor === undefined ? undefined : monitor.at('async_block_iterator');\n  }\n\n  public [Symbol.asyncIterator]() {\n    return this;\n  }\n\n  public async next(): Promise<IteratorResult<Block>> {\n    if (!this.mutableDone) {\n      this.fetch();\n    }\n\n    const item = this.mutableItems.shift();\n    if (item !== undefined) {\n      if (item.type === 'error') {\n        return Promise.reject(item.error);\n      }\n\n      return Promise.resolve({ done: false, value: item.value });\n    }\n\n    if (this.mutableDone) {\n      // tslint:disable-next-line no-any\n      return Promise.resolve({ done: true } as any);\n    }\n\n    // tslint:disable-next-line promise-must-complete\n    return new Promise<IteratorResult<Block>>((resolve, reject) => {\n      this.mutableResolvers.push({ resolve, reject });\n    });\n  }\n\n  private write(value: Block): void {\n    this.push({ type: 'value', value });\n  }\n\n  private error(error: Error): void {\n    this.push({ type: 'error', error });\n  }\n\n  private push(item: Item): void {\n    if (this.mutableDone) {\n      /* istanbul ignore next */\n      throw new Error('AsyncBlockIterator already ended');\n    }\n\n    const resolver = this.mutableResolvers.shift();\n    if (resolver !== undefined) {\n      const { resolve, reject } = resolver;\n      if (item.type === 'error') {\n        reject(item.error);\n      } else {\n        resolve({ done: false, value: item.value });\n      }\n    } else {\n      this.mutableItems.push(item);\n    }\n  }\n\n  private done(): void {\n    // tslint:disable-next-line no-any\n    this.mutableResolvers.forEach(({ resolve }) => resolve({ done: true } as any));\n    // tslint:disable-next-line no-any\n    this.mutableResolvers = [];\n    this.mutableDone = true;\n  }\n\n  private fetch(): void {\n    if (this.mutableFetching) {\n      return;\n    }\n    this.mutableFetching = true;\n    this.asyncFetch()\n      .then(() => {\n        this.mutableFetching = false;\n      })\n      .catch((error) => {\n        this.mutableFetching = false;\n        this.error(error);\n      });\n  }\n\n  private async asyncFetch(): Promise<void> {\n    let startHeight = this.mutableStartHeight;\n    let indexIn = this.mutableCurrentIndex;\n    if (startHeight === undefined || indexIn === undefined) {\n      const blockCount = await this.client.getBlockCount(this.monitor);\n      if (startHeight === undefined) {\n        startHeight = blockCount - 1;\n        this.mutableStartHeight = startHeight;\n      }\n      if (indexIn === undefined) {\n        indexIn = blockCount - 1;\n        this.mutableCurrentIndex = indexIn;\n      }\n    }\n    const index = indexIn;\n\n    const incIndex = (value: number) => {\n      if (this.mutableCurrentIndex === undefined) {\n        throw new Error('Something went wrong!');\n      }\n      this.mutableCurrentIndex += value;\n    };\n\n    if (this.indexStop !== undefined && index >= this.indexStop) {\n      this.done();\n    } else if (index >= startHeight) {\n      const [block, newStartHeight] = await Promise.all([\n        this.fetchOne(index),\n        // Refresh the block count in case we got behind somehow\n        this.client.getBlockCount(this.monitor),\n      ]);\n\n      incIndex(1);\n      this.write(block);\n      this.mutableStartHeight = newStartHeight;\n    } else {\n      let toFetch = Math.min(QUEUE_SIZE - this.mutableItems.length, startHeight - index);\n\n      if (this.indexStop !== undefined) {\n        toFetch = Math.min(toFetch, this.indexStop - index);\n      }\n\n      // tslint:disable-next-line no-loop-statement\n      for (const chunk of _.chunk(_.range(0, toFetch), this.batchSize)) {\n        const blocks = await Promise.all(chunk.map(async (offset) => this.fetchOne(index + offset, true)));\n\n        incIndex(chunk.length);\n        blocks.forEach((block) => this.write(block));\n      }\n    }\n  }\n\n  private async fetchOne(index: number, isBatch = false): Promise<Block> {\n    try {\n      // tslint:disable-next-line no-unnecessary-local-variable prefer-immediate-return\n      const block = await this.client.getBlock(\n        index,\n        isBatch\n          ? {\n              monitor: this.monitor,\n            }\n          : {\n              timeoutMS: this.fetchTimeoutMS,\n              monitor: this.monitor,\n            },\n      );\n\n      // tslint:disable-next-line no-var-before-return\n      return block;\n    } catch (error) {\n      if (error.code === 'UNKNOWN_BLOCK') {\n        return this.fetchOne(index, isBatch);\n      }\n\n      throw error as Error;\n    }\n  }\n}\n"]}