import { Processor, ProcessLineResult } from "./Processor"; import P from "bluebird"; import { prepareData } from "./dataClean"; import getEol from "./getEol"; import { stringToLines } from "./fileline"; import { bufFromString, filterArray,trimLeft } from "./util"; import { RowSplit } from "./rowSplit"; import lineToJson from "./lineToJson"; import { ParseRuntime } from "./ParseRuntime"; import CSVError from "./CSVError"; export class ProcessorLocal extends Processor { flush(): P { if (this.runtime.csvLineBuffer && this.runtime.csvLineBuffer.length > 0) { const buf = this.runtime.csvLineBuffer; this.runtime.csvLineBuffer = undefined; return this.process(buf, true) .then((res) => { if (this.runtime.csvLineBuffer && this.runtime.csvLineBuffer.length > 0) { return P.reject(CSVError.unclosed_quote(this.runtime.parsedLineNumber, this.runtime.csvLineBuffer.toString())) } else { return P.resolve(res); } }) } else { return P.resolve([]); } } destroy(): P { return P.resolve(); } private rowSplit: RowSplit = new RowSplit(this.converter); private eolEmitted = false; private _needEmitEol?: boolean = undefined; private get needEmitEol() { if (this._needEmitEol === undefined) { this._needEmitEol = this.converter.listeners("eol").length > 0; } return this._needEmitEol; } private headEmitted = false; private _needEmitHead?: boolean = undefined; private get needEmitHead() { if (this._needEmitHead === undefined) { this._needEmitHead = this.converter.listeners("header").length > 0; } return this._needEmitHead; } process(chunk: Buffer, finalChunk = false): P { let csvString: string; if (finalChunk) { csvString = chunk.toString(); } else { csvString = prepareData(chunk, this.converter.parseRuntime); } return P.resolve() .then(() => { if (this.runtime.preRawDataHook) { return this.runtime.preRawDataHook(csvString); } else { return csvString; } }) .then((csv) => { if (csv && csv.length > 0) { return this.processCSV(csv, finalChunk); } else { return P.resolve([]); } }) } private processCSV(csv: string, finalChunk: boolean): P { const params = this.params; const runtime = this.runtime; if (!runtime.eol) { getEol(csv, runtime); } if (this.needEmitEol && !this.eolEmitted && runtime.eol) { this.converter.emit("eol", runtime.eol); this.eolEmitted = true; } // trim csv file has initial blank lines. if (params.ignoreEmpty && !runtime.started) { csv = trimLeft(csv); } const stringToLineResult = stringToLines(csv, runtime); if (!finalChunk) { this.prependLeftBuf(bufFromString(stringToLineResult.partial)); } else { stringToLineResult.lines.push(stringToLineResult.partial); stringToLineResult.partial = ""; } if (stringToLineResult.lines.length > 0) { let prom: P; if (runtime.preFileLineHook) { prom = this.runPreLineHook(stringToLineResult.lines); } else { prom = P.resolve(stringToLineResult.lines); } return prom.then((lines) => { if (!runtime.started && !this.runtime.headers ) { return this.processDataWithHead(lines); } else { return this.processCSVBody(lines); } }) } else { return P.resolve([]); } } private processDataWithHead(lines: string[]): ProcessLineResult[] { if (this.params.noheader) { if (this.params.headers) { this.runtime.headers = this.params.headers; } else { this.runtime.headers = []; } } else { let left = ""; let headerRow: string[] = []; while (lines.length) { const line = left + lines.shift(); const row = this.rowSplit.parse(line); if (row.closed) { headerRow = row.cells; left = ""; break; } else { left = line + getEol(line, this.runtime); } } this.prependLeftBuf(bufFromString(left)); if (headerRow.length === 0) { return []; } if (this.params.headers) { this.runtime.headers = this.params.headers; } else { this.runtime.headers = headerRow; } } if (this.runtime.needProcessIgnoreColumn || this.runtime.needProcessIncludeColumn) { this.filterHeader(); } if (this.needEmitHead && !this.headEmitted) { this.converter.emit("header", this.runtime.headers); this.headEmitted = true; } return this.processCSVBody(lines); } private filterHeader() { this.runtime.selectedColumns = []; if (this.runtime.headers) { const headers = this.runtime.headers; for (let i = 0; i < headers.length; i++) { if (this.params.ignoreColumns) { if (this.params.ignoreColumns.test(headers[i])) { if (this.params.includeColumns && this.params.includeColumns.test(headers[i])) { this.runtime.selectedColumns.push(i); } else { continue; } } else { this.runtime.selectedColumns.push(i); } } else if (this.params.includeColumns) { if (this.params.includeColumns.test(headers[i])) { this.runtime.selectedColumns.push(i); } } else { this.runtime.selectedColumns.push(i); } // if (this.params.includeColumns && this.params.includeColumns.test(headers[i])){ // this.runtime.selectedColumns.push(i); // }else{ // if (this.params.ignoreColumns && this.params.ignoreColumns.test(headers[i])){ // continue; // }else{ // if (this.params.ignoreColumns && !this.params.includeColumns){ // this.runtime.selectedColumns.push(i); // } // } // } } this.runtime.headers = filterArray(this.runtime.headers, this.runtime.selectedColumns); } } private processCSVBody(lines: string[]): ProcessLineResult[] { if (this.params.output === "line") { return lines; } else { const result = this.rowSplit.parseMultiLines(lines); this.prependLeftBuf(bufFromString(result.partial)); if (this.params.output === "csv") { return result.rowsCells; } else { return lineToJson(result.rowsCells, this.converter); } } // var jsonArr = linesToJson(lines.lines, params, this.recordNum); // this.processResult(jsonArr); // this.lastIndex += jsonArr.length; // this.recordNum += jsonArr.length; } private prependLeftBuf(buf: Buffer) { if (buf) { if (this.runtime.csvLineBuffer) { this.runtime.csvLineBuffer = Buffer.concat([buf, this.runtime.csvLineBuffer]); } else { this.runtime.csvLineBuffer = buf; } } } private runPreLineHook(lines: string[]): P { return new P((resolve, reject) => { processLineHook(lines, this.runtime, 0, (err) => { if (err) { reject(err); } else { resolve(lines); } }) }); } } function processLineHook(lines: string[], runtime: ParseRuntime, offset: number, cb: (err?) => void ) { if (offset >= lines.length) { cb(); } else { if (runtime.preFileLineHook) { const line = lines[offset]; const res = runtime.preFileLineHook(line, runtime.parsedLineNumber + offset); offset++; if (res && (res as PromiseLike).then) { (res as PromiseLike).then((value) => { lines[offset - 1] = value; processLineHook(lines, runtime, offset, cb); }); } else { lines[offset - 1] = res as string; while (offset < lines.length) { lines[offset] = runtime.preFileLineHook(lines[offset], runtime.parsedLineNumber + offset) as string; offset++; } cb(); } } else { cb(); } } }