1 | import * as fs from 'fs'
|
2 | import { createUnzip } from 'zlib'
|
3 | import { AsyncMapper, ErrorMode } from '@naturalcycles/js-lib'
|
4 | import {
|
5 | requireFileToExist,
|
6 | transformJsonParse,
|
7 | transformLogProgress,
|
8 | TransformLogProgressOptions,
|
9 | transformMap,
|
10 | TransformMapOptions,
|
11 | transformSplit,
|
12 | writableVoid,
|
13 | _pipeline,
|
14 | } from '../..'
|
15 |
|
16 | export interface NDJSONStreamForEachOptions<IN = any>
|
17 | extends TransformMapOptions<IN, void>,
|
18 | TransformLogProgressOptions<IN> {
|
19 | inputFilePath: string
|
20 | }
|
21 |
|
22 |
|
23 |
|
24 |
|
25 | export async function ndjsonStreamForEach<T>(
|
26 | mapper: AsyncMapper<T, void>,
|
27 | opt: NDJSONStreamForEachOptions<T>,
|
28 | ): Promise<void> {
|
29 | requireFileToExist(opt.inputFilePath)
|
30 |
|
31 | const transformUnzip = opt.inputFilePath.endsWith('.gz') ? [createUnzip()] : []
|
32 |
|
33 | await _pipeline([
|
34 | fs.createReadStream(opt.inputFilePath),
|
35 | ...transformUnzip,
|
36 | transformSplit(),
|
37 | transformJsonParse(),
|
38 | transformMap<T, any>(mapper, {
|
39 | errorMode: ErrorMode.THROW_AGGREGATED,
|
40 | ...opt,
|
41 | predicate: () => true,
|
42 | }),
|
43 | transformLogProgress(opt),
|
44 | writableVoid(),
|
45 | ])
|
46 | }
|