UNPKG

1.17 kBPlain TextView Raw
1import * as fs from 'fs'
2import { createUnzip } from 'zlib'
3import { AsyncMapper, ErrorMode } from '@naturalcycles/js-lib'
4import {
5 requireFileToExist,
6 transformJsonParse,
7 transformLogProgress,
8 TransformLogProgressOptions,
9 transformMap,
10 TransformMapOptions,
11 transformSplit,
12 writableVoid,
13 _pipeline,
14} from '../..'
15
16export interface NDJSONStreamForEachOptions<IN = any>
17 extends TransformMapOptions<IN, void>,
18 TransformLogProgressOptions<IN> {
19 inputFilePath: string
20}
21
22/**
23 * Convenience function to `forEach` through an ndjson file.
24 */
25export async function ndjsonStreamForEach<T>(
26 mapper: AsyncMapper<T, void>,
27 opt: NDJSONStreamForEachOptions<T>,
28): Promise<void> {
29 requireFileToExist(opt.inputFilePath)
30
31 const transformUnzip = opt.inputFilePath.endsWith('.gz') ? [createUnzip()] : []
32
33 await _pipeline([
34 fs.createReadStream(opt.inputFilePath),
35 ...transformUnzip,
36 transformSplit(),
37 transformJsonParse(),
38 transformMap<T, any>(mapper, {
39 errorMode: ErrorMode.THROW_AGGREGATED,
40 ...opt,
41 predicate: () => true, // to log progress properly
42 }),
43 transformLogProgress(opt),
44 writableVoid(),
45 ])
46}