1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.batchProcess = void 0;
|
4 | const tslib_1 = require("tslib");
|
5 | const result_1 = require("../result");
|
6 | /**
|
7 | * TODO auto adjust concurrent size to optimize concurrency
|
8 | * currently solely relay on maxConcurrent
|
9 | *
|
10 | * data loading can be in arbitrary order,
|
11 | * data processing must be in order
|
12 | *
|
13 | * this impl is faster than TaskPool for large number of keys
|
14 | *
|
15 | * @param args.maxConcurrent: manually adjust to avoid out of memory
|
16 | * */
|
17 | function batchProcess(args) {
|
18 | return tslib_1.__awaiter(this, void 0, void 0, function* () {
|
19 | const { keys, loader, processor } = args;
|
20 | const maxConcurrent = args.maxConcurrent || Number.MAX_SAFE_INTEGER;
|
21 | if (maxConcurrent < 1) {
|
22 | throw new Error('require at least 1 maxConcurrent');
|
23 | }
|
24 | /*
|
25 | // this is bad, because it will hold all the data in memory before consuming them
|
26 | // also, it waste the processor resources by waiting for all IO to finish before processing
|
27 | return Promise.all(keys.map(key => loader(key))).then(data => data.forEach(datum => processor(datum)));
|
28 | */
|
29 | return new Promise((resolve, reject) => {
|
30 | const fail = (e) => {
|
31 | reject(e);
|
32 | };
|
33 | let nextLoadIndex = 0;
|
34 | let nextProcessIndex = 0;
|
35 | const loadedDataBuffer = new Map();
|
36 | const onLoad = (datum, key) => {
|
37 | result_1.then(processor(datum, key), () => {
|
38 | // finished processing
|
39 | nextProcessIndex++;
|
40 | if (nextProcessIndex >= keys.length) {
|
41 | resolve();
|
42 | }
|
43 | else {
|
44 | const record = loadedDataBuffer.get(nextProcessIndex);
|
45 | if (record) {
|
46 | const { key, datum } = record;
|
47 | loadedDataBuffer.delete(nextProcessIndex);
|
48 | onLoad(datum, key);
|
49 | }
|
50 | }
|
51 | }, fail);
|
52 | };
|
53 | const load = () => {
|
54 | if (nextLoadIndex >= keys.length) {
|
55 | return;
|
56 | }
|
57 | const loadingIndex = nextLoadIndex;
|
58 | nextLoadIndex++;
|
59 | const key = keys[loadingIndex];
|
60 | loader(key)
|
61 | .then(datum => {
|
62 | if (loadingIndex === nextProcessIndex) {
|
63 | // can process immediately
|
64 | onLoad(datum, key);
|
65 | }
|
66 | else {
|
67 | // cannot process yet, store to buffer
|
68 | loadedDataBuffer.set(loadingIndex, { key, datum });
|
69 | }
|
70 | load();
|
71 | })
|
72 | .catch(fail);
|
73 | };
|
74 | for (let i = 0; i < maxConcurrent; i++) {
|
75 | load();
|
76 | }
|
77 | });
|
78 | });
|
79 | }
|
80 | exports.batchProcess = batchProcess;
|
81 | //# sourceMappingURL=data-processor.js.map |
\ | No newline at end of file |