UNPKG

3.12 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.batchProcess = void 0;
4const tslib_1 = require("tslib");
5const result_1 = require("../result");
6/**
7 * TODO auto adjust concurrent size to optimize concurrency
8 * currently solely relay on maxConcurrent
9 *
10 * data loading can be in arbitrary order,
11 * data processing must be in order
12 *
13 * this impl is faster than TaskPool for large number of keys
14 *
15 * @param args.maxConcurrent: manually adjust to avoid out of memory
16 * */
17function batchProcess(args) {
18 return tslib_1.__awaiter(this, void 0, void 0, function* () {
19 const { keys, loader, processor } = args;
20 const maxConcurrent = args.maxConcurrent || Number.MAX_SAFE_INTEGER;
21 if (maxConcurrent < 1) {
22 throw new Error('require at least 1 maxConcurrent');
23 }
24 /*
25 // this is bad, because it will hold all the data in memory before consuming them
26 // also, it waste the processor resources by waiting for all IO to finish before processing
27 return Promise.all(keys.map(key => loader(key))).then(data => data.forEach(datum => processor(datum)));
28 */
29 return new Promise((resolve, reject) => {
30 const fail = (e) => {
31 reject(e);
32 };
33 let nextLoadIndex = 0;
34 let nextProcessIndex = 0;
35 const loadedDataBuffer = new Map();
36 const onLoad = (datum, key) => {
37 result_1.then(processor(datum, key), () => {
38 // finished processing
39 nextProcessIndex++;
40 if (nextProcessIndex >= keys.length) {
41 resolve();
42 }
43 else {
44 const record = loadedDataBuffer.get(nextProcessIndex);
45 if (record) {
46 const { key, datum } = record;
47 loadedDataBuffer.delete(nextProcessIndex);
48 onLoad(datum, key);
49 }
50 }
51 }, fail);
52 };
53 const load = () => {
54 if (nextLoadIndex >= keys.length) {
55 return;
56 }
57 const loadingIndex = nextLoadIndex;
58 nextLoadIndex++;
59 const key = keys[loadingIndex];
60 loader(key)
61 .then(datum => {
62 if (loadingIndex === nextProcessIndex) {
63 // can process immediately
64 onLoad(datum, key);
65 }
66 else {
67 // cannot process yet, store to buffer
68 loadedDataBuffer.set(loadingIndex, { key, datum });
69 }
70 load();
71 })
72 .catch(fail);
73 };
74 for (let i = 0; i < maxConcurrent; i++) {
75 load();
76 }
77 });
78 });
79}
80exports.batchProcess = batchProcess;
81//# sourceMappingURL=data-processor.js.map
\No newline at end of file