UNPKG

1.86 kBPlain TextView Raw
1import { createGzip, ZlibOptions } from 'zlib'
2import { AppError } from '@naturalcycles/js-lib'
3import * as fs from 'fs-extra'
4import { transformTap, _pipeline } from '../..'
5import { grey } from '../../colors'
6import { NDJsonStats } from './ndjson.model'
7import { transformToNDJson, TransformToNDJsonOptions } from './transformToNDJson'
8
9export interface PipelineToNDJsonFileOptions extends TransformToNDJsonOptions {
10 filePath: string
11
12 /**
13 * @default false
14 * If true - will fail if output file already exists.
15 */
16 protectFromOverwrite?: boolean
17
18 /**
19 * @default false
20 */
21 gzip?: boolean
22
23 /**
24 * Only applicable if `gzip` is enabled
25 */
26 zlibOptions?: ZlibOptions
27}
28
29/**
30 * Convenience pipeline to transform stream of objects into a file in NDJSON format.
31 *
32 * Does fs.ensureFile() before starting, which will create all needed directories and truncate the file if it existed.
33 */
34export async function pipelineToNDJsonFile(
35 streams: (NodeJS.ReadableStream | NodeJS.WritableStream)[],
36 opt: PipelineToNDJsonFileOptions,
37): Promise<NDJsonStats> {
38 const { filePath, gzip, protectFromOverwrite = false } = opt
39
40 if (protectFromOverwrite && fs.pathExistsSync(filePath)) {
41 throw new AppError(`pipelineToNDJsonFile: output file exists: ${filePath}`)
42 }
43
44 const started = Date.now()
45 let rows = 0
46
47 fs.ensureFileSync(filePath)
48
49 console.log(`>> ${grey(filePath)} started...`)
50
51 await _pipeline([
52 ...streams,
53 transformTap(() => rows++),
54 transformToNDJson(opt),
55 ...(gzip ? [createGzip(opt.zlibOptions)] : []), // optional gzip
56 fs.createWriteStream(filePath),
57 ])
58
59 const { size: sizeBytes } = fs.statSync(filePath)
60
61 const stats = NDJsonStats.create({
62 tookMillis: Date.now() - started,
63 rows,
64 sizeBytes,
65 })
66
67 console.log(`>> ${grey(filePath)}\n` + stats.toPretty())
68
69 return stats
70}