UNPKG

1.52 kBPlain TextView Raw
1import { createUnzip, ZlibOptions } from 'zlib'
2import { _hb } from '@naturalcycles/js-lib'
3import * as fs from 'fs-extra'
4import { transformTap, _pipeline, transformSplit } from '../..'
5import { dimWhite, grey } from '../../colors'
6import { NDJsonStats } from './ndjson.model'
7import { transformJsonParse, TransformJsonParseOptions } from './transformJsonParse'
8
9export interface PipelineFromNDJsonFileOptions extends TransformJsonParseOptions {
10 filePath: string
11
12 /**
13 * @default `\n`
14 */
15 separator?: string
16
17 /**
18 * @default false
19 */
20 gzip?: boolean
21
22 /**
23 * Only applicable if `gzip` is enabled
24 */
25 zlibOptions?: ZlibOptions
26}
27
28/**
29 * Convenience pipeline that starts from reading NDJSON file.
30 */
31export async function pipelineFromNDJsonFile(
32 streams: NodeJS.WritableStream[],
33 opt: PipelineFromNDJsonFileOptions,
34): Promise<NDJsonStats> {
35 const { filePath, gzip, separator } = opt
36
37 const started = Date.now()
38 let rows = 0
39
40 const { size: sizeBytes } = fs.statSync(filePath)
41
42 console.log(`<< ${grey(filePath)} ${dimWhite(_hb(sizeBytes))} started...`)
43
44 await _pipeline([
45 fs.createReadStream(filePath),
46 ...(gzip ? [createUnzip(opt.zlibOptions)] : []),
47 transformSplit(separator), // splits by separator
48 transformJsonParse(opt),
49 transformTap(() => rows++),
50 ...streams,
51 ])
52
53 const stats = NDJsonStats.create({
54 tookMillis: Date.now() - started,
55 rows,
56 sizeBytes,
57 })
58
59 console.log(`<< ${grey(filePath)}\n` + stats.toPretty())
60
61 return stats
62}