1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.transformLogProgress = void 0;
|
4 | const stream_1 = require("stream");
|
5 | const util_1 = require("util");
|
6 | const js_lib_1 = require("@naturalcycles/js-lib");
|
7 | const colors_1 = require("../../colors");
|
8 | const colors_2 = require("../../colors/colors");
|
9 | const sizeStack_1 = require("../sizeStack");
|
10 | const inspectOpt = {
|
11 | colors: colors_2.hasColors,
|
12 | breakLength: 300,
|
13 | };
|
14 |
|
15 |
|
16 |
|
17 | function transformLogProgress(opt = {}) {
|
18 | const { metric = 'progress', heapTotal: logHeapTotal = false, heapUsed: logHeapUsed = false, rss: logRss = true, peakRSS: logPeakRSS = true, logRPS = true, logEvery = 1000, logSizes = false, logSizesBuffer = 100000, logZippedSizes = false, batchSize = 1, extra, logger = console, } = opt;
|
19 | const logProgress = opt.logProgress !== false && logEvery !== 0;
|
20 | const logEvery10 = logEvery * 10;
|
21 | const started = Date.now();
|
22 | let lastSecondStarted = Date.now();
|
23 | const sma = new js_lib_1.SimpleMovingAverage(10);
|
24 | let processedLastSecond = 0;
|
25 | let progress = 0;
|
26 | let peakRSS = 0;
|
27 | const sizes = logSizes ? new sizeStack_1.SizeStack('json', logSizesBuffer) : undefined;
|
28 | const sizesZipped = logZippedSizes ? new sizeStack_1.SizeStack('json.gz', logSizesBuffer) : undefined;
|
29 | logStats();
|
30 | return new stream_1.Transform({
|
31 | objectMode: true,
|
32 | ...opt,
|
33 | transform(chunk, _, cb) {
|
34 | progress++;
|
35 | processedLastSecond++;
|
36 | if (sizes) {
|
37 |
|
38 | void sizeStack_1.SizeStack.countItem(chunk, logger, sizes, sizesZipped);
|
39 | }
|
40 | if (logProgress && progress % logEvery === 0) {
|
41 | logStats(chunk, false, progress % logEvery10 === 0);
|
42 | }
|
43 | cb(null, chunk);
|
44 | },
|
45 | final(cb) {
|
46 | logStats(undefined, true);
|
47 | cb();
|
48 | },
|
49 | });
|
50 | function logStats(chunk, final = false, tenx = false) {
|
51 | if (!logProgress)
|
52 | return;
|
53 | const mem = process.memoryUsage();
|
54 | const now = Date.now();
|
55 | const batchedProgress = progress * batchSize;
|
56 | const lastRPS = (processedLastSecond * batchSize) / ((now - lastSecondStarted) / 1000) || 0;
|
57 | const rpsTotal = Math.round(batchedProgress / ((now - started) / 1000)) || 0;
|
58 | lastSecondStarted = now;
|
59 | processedLastSecond = 0;
|
60 | const rps10 = Math.round(sma.push(lastRPS));
|
61 | if (mem.rss > peakRSS)
|
62 | peakRSS = mem.rss;
|
63 | const o = {
|
64 | [final ? `${metric}_final` : metric]: batchedProgress,
|
65 | };
|
66 | if (extra)
|
67 | Object.assign(o, extra(chunk, progress));
|
68 | if (logHeapUsed)
|
69 | o.heapUsed = (0, js_lib_1._mb)(mem.heapUsed);
|
70 | if (logHeapTotal)
|
71 | o.heapTotal = (0, js_lib_1._mb)(mem.heapTotal);
|
72 | if (logRss)
|
73 | o.rss = (0, js_lib_1._mb)(mem.rss);
|
74 | if (logPeakRSS)
|
75 | o.peakRSS = (0, js_lib_1._mb)(peakRSS);
|
76 | if (opt.rssMinusHeap)
|
77 | o.rssMinusHeap = (0, js_lib_1._mb)(mem.rss - mem.heapTotal);
|
78 | if (opt.external)
|
79 | o.external = (0, js_lib_1._mb)(mem.external);
|
80 | if (opt.arrayBuffers)
|
81 | o.arrayBuffers = (0, js_lib_1._mb)(mem.arrayBuffers || 0);
|
82 | if (logRPS)
|
83 | Object.assign(o, { rps10, rpsTotal });
|
84 | logger.log((0, util_1.inspect)(o, inspectOpt));
|
85 | if (sizes?.items.length) {
|
86 | logger.log(sizes.getStats());
|
87 | if (sizesZipped?.items.length) {
|
88 | logger.log(sizesZipped.getStats());
|
89 | }
|
90 | }
|
91 | if (tenx) {
|
92 | let perHour = Math.round((batchedProgress * 1000 * 60 * 60) / (now - started)) || 0;
|
93 | if (perHour > 900) {
|
94 | perHour = Math.round(perHour / 1000) + 'K';
|
95 | }
|
96 | logger.log(`${(0, colors_1.dimGrey)((0, js_lib_1.localTime)().toPretty())} ${(0, colors_1.white)(metric)} took ${(0, colors_1.yellow)((0, js_lib_1._since)(started))} so far to process ${(0, colors_1.yellow)(batchedProgress)} rows, ~${(0, colors_1.yellow)(perHour)}/hour`);
|
97 | }
|
98 | else if (final) {
|
99 | logger.log(`${(0, colors_1.boldWhite)(metric)} took ${(0, colors_1.yellow)((0, js_lib_1._since)(started))} to process ${(0, colors_1.yellow)(batchedProgress)} rows with total RPS of ${(0, colors_1.yellow)(rpsTotal)}`);
|
100 | }
|
101 | }
|
102 | }
|
103 | exports.transformLogProgress = transformLogProgress;
|