UNPKG

4.64 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.transformLogProgress = void 0;
4const stream_1 = require("stream");
5const util_1 = require("util");
6const js_lib_1 = require("@naturalcycles/js-lib");
7const colors_1 = require("../../colors");
8const colors_2 = require("../../colors/colors");
9const sizeStack_1 = require("../sizeStack");
10const inspectOpt = {
11 colors: colors_2.hasColors,
12 breakLength: 300,
13};
14/**
15 * Pass-through transform that optionally logs progress.
16 */
17function transformLogProgress(opt = {}) {
18 const { metric = 'progress', heapTotal: logHeapTotal = false, heapUsed: logHeapUsed = false, rss: logRss = true, peakRSS: logPeakRSS = true, logRPS = true, logEvery = 1000, logSizes = false, logSizesBuffer = 100000, logZippedSizes = false, batchSize = 1, extra, logger = console, } = opt;
19 const logProgress = opt.logProgress !== false && logEvery !== 0; // true by default
20 const logEvery10 = logEvery * 10;
21 const started = Date.now();
22 let lastSecondStarted = Date.now();
23 const sma = new js_lib_1.SimpleMovingAverage(10); // over last 10 seconds
24 let processedLastSecond = 0;
25 let progress = 0;
26 let peakRSS = 0;
27 const sizes = logSizes ? new sizeStack_1.SizeStack('json', logSizesBuffer) : undefined;
28 const sizesZipped = logZippedSizes ? new sizeStack_1.SizeStack('json.gz', logSizesBuffer) : undefined;
29 logStats(); // initial
30 return new stream_1.Transform({
31 objectMode: true,
32 ...opt,
33 transform(chunk, _, cb) {
34 progress++;
35 processedLastSecond++;
36 if (sizes) {
37 // Check it, cause gzipping might be delayed here..
38 void sizeStack_1.SizeStack.countItem(chunk, logger, sizes, sizesZipped);
39 }
40 if (logProgress && progress % logEvery === 0) {
41 logStats(chunk, false, progress % logEvery10 === 0);
42 }
43 cb(null, chunk); // pass-through
44 },
45 final(cb) {
46 logStats(undefined, true);
47 cb();
48 },
49 });
50 function logStats(chunk, final = false, tenx = false) {
51 if (!logProgress)
52 return;
53 const mem = process.memoryUsage();
54 const now = Date.now();
55 const batchedProgress = progress * batchSize;
56 const lastRPS = (processedLastSecond * batchSize) / ((now - lastSecondStarted) / 1000) || 0;
57 const rpsTotal = Math.round(batchedProgress / ((now - started) / 1000)) || 0;
58 lastSecondStarted = now;
59 processedLastSecond = 0;
60 const rps10 = Math.round(sma.push(lastRPS));
61 if (mem.rss > peakRSS)
62 peakRSS = mem.rss;
63 const o = {
64 [final ? `${metric}_final` : metric]: batchedProgress,
65 };
66 if (extra)
67 Object.assign(o, extra(chunk, progress));
68 if (logHeapUsed)
69 o.heapUsed = (0, js_lib_1._mb)(mem.heapUsed);
70 if (logHeapTotal)
71 o.heapTotal = (0, js_lib_1._mb)(mem.heapTotal);
72 if (logRss)
73 o.rss = (0, js_lib_1._mb)(mem.rss);
74 if (logPeakRSS)
75 o.peakRSS = (0, js_lib_1._mb)(peakRSS);
76 if (opt.rssMinusHeap)
77 o.rssMinusHeap = (0, js_lib_1._mb)(mem.rss - mem.heapTotal);
78 if (opt.external)
79 o.external = (0, js_lib_1._mb)(mem.external);
80 if (opt.arrayBuffers)
81 o.arrayBuffers = (0, js_lib_1._mb)(mem.arrayBuffers || 0);
82 if (logRPS)
83 Object.assign(o, { rps10, rpsTotal });
84 logger.log((0, util_1.inspect)(o, inspectOpt));
85 if (sizes?.items.length) {
86 logger.log(sizes.getStats());
87 if (sizesZipped?.items.length) {
88 logger.log(sizesZipped.getStats());
89 }
90 }
91 if (tenx) {
92 let perHour = Math.round((batchedProgress * 1000 * 60 * 60) / (now - started)) || 0;
93 if (perHour > 900) {
94 perHour = Math.round(perHour / 1000) + 'K';
95 }
96 logger.log(`${(0, colors_1.dimGrey)((0, js_lib_1.localTime)().toPretty())} ${(0, colors_1.white)(metric)} took ${(0, colors_1.yellow)((0, js_lib_1._since)(started))} so far to process ${(0, colors_1.yellow)(batchedProgress)} rows, ~${(0, colors_1.yellow)(perHour)}/hour`);
97 }
98 else if (final) {
99 logger.log(`${(0, colors_1.boldWhite)(metric)} took ${(0, colors_1.yellow)((0, js_lib_1._since)(started))} to process ${(0, colors_1.yellow)(batchedProgress)} rows with total RPS of ${(0, colors_1.yellow)(rpsTotal)}`);
100 }
101 }
102}
103exports.transformLogProgress = transformLogProgress;