1 | import { createUnzip, ZlibOptions } from 'zlib'
|
2 | import { _hb } from '@naturalcycles/js-lib'
|
3 | import * as fs from 'fs-extra'
|
4 | import { transformTap, _pipeline, transformSplit } from '../..'
|
5 | import { dimWhite, grey } from '../../colors'
|
6 | import { NDJsonStats } from './ndjson.model'
|
7 | import { transformJsonParse, TransformJsonParseOptions } from './transformJsonParse'
|
8 |
|
9 | export interface PipelineFromNDJsonFileOptions extends TransformJsonParseOptions {
|
10 | filePath: string
|
11 |
|
12 | |
13 |
|
14 |
|
15 | separator?: string
|
16 |
|
17 | |
18 |
|
19 |
|
20 | gzip?: boolean
|
21 |
|
22 | |
23 |
|
24 |
|
25 | zlibOptions?: ZlibOptions
|
26 | }
|
27 |
|
28 |
|
29 |
|
30 |
|
31 | export async function pipelineFromNDJsonFile(
|
32 | streams: NodeJS.WritableStream[],
|
33 | opt: PipelineFromNDJsonFileOptions,
|
34 | ): Promise<NDJsonStats> {
|
35 | const { filePath, gzip, separator } = opt
|
36 |
|
37 | const started = Date.now()
|
38 | let rows = 0
|
39 |
|
40 | const { size: sizeBytes } = fs.statSync(filePath)
|
41 |
|
42 | console.log(`<< ${grey(filePath)} ${dimWhite(_hb(sizeBytes))} started...`)
|
43 |
|
44 | await _pipeline([
|
45 | fs.createReadStream(filePath),
|
46 | ...(gzip ? [createUnzip(opt.zlibOptions)] : []),
|
47 | transformSplit(separator),
|
48 | transformJsonParse(opt),
|
49 | transformTap(() => rows++),
|
50 | ...streams,
|
51 | ])
|
52 |
|
53 | const stats = NDJsonStats.create({
|
54 | tookMillis: Date.now() - started,
|
55 | rows,
|
56 | sizeBytes,
|
57 | })
|
58 |
|
59 | console.log(`<< ${grey(filePath)}\n` + stats.toPretty())
|
60 |
|
61 | return stats
|
62 | }
|