1 | import { createGzip, ZlibOptions } from 'zlib'
|
2 | import { AppError } from '@naturalcycles/js-lib'
|
3 | import * as fs from 'fs-extra'
|
4 | import { transformTap, _pipeline } from '../..'
|
5 | import { grey } from '../../colors'
|
6 | import { NDJsonStats } from './ndjson.model'
|
7 | import { transformToNDJson, TransformToNDJsonOptions } from './transformToNDJson'
|
8 |
|
9 | export interface PipelineToNDJsonFileOptions extends TransformToNDJsonOptions {
|
10 | filePath: string
|
11 |
|
12 | |
13 |
|
14 |
|
15 |
|
16 | protectFromOverwrite?: boolean
|
17 |
|
18 | |
19 |
|
20 |
|
21 | gzip?: boolean
|
22 |
|
23 | |
24 |
|
25 |
|
26 | zlibOptions?: ZlibOptions
|
27 | }
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 | export async function pipelineToNDJsonFile(
|
35 | streams: (NodeJS.ReadableStream | NodeJS.WritableStream)[],
|
36 | opt: PipelineToNDJsonFileOptions,
|
37 | ): Promise<NDJsonStats> {
|
38 | const { filePath, gzip, protectFromOverwrite = false } = opt
|
39 |
|
40 | if (protectFromOverwrite && fs.pathExistsSync(filePath)) {
|
41 | throw new AppError(`pipelineToNDJsonFile: output file exists: ${filePath}`)
|
42 | }
|
43 |
|
44 | const started = Date.now()
|
45 | let rows = 0
|
46 |
|
47 | fs.ensureFileSync(filePath)
|
48 |
|
49 | console.log(`>> ${grey(filePath)} started...`)
|
50 |
|
51 | await _pipeline([
|
52 | ...streams,
|
53 | transformTap(() => rows++),
|
54 | transformToNDJson(opt),
|
55 | ...(gzip ? [createGzip(opt.zlibOptions)] : []),
|
56 | fs.createWriteStream(filePath),
|
57 | ])
|
58 |
|
59 | const { size: sizeBytes } = fs.statSync(filePath)
|
60 |
|
61 | const stats = NDJsonStats.create({
|
62 | tookMillis: Date.now() - started,
|
63 | rows,
|
64 | sizeBytes,
|
65 | })
|
66 |
|
67 | console.log(`>> ${grey(filePath)}\n` + stats.toPretty())
|
68 |
|
69 | return stats
|
70 | }
|