1 | import { Converter } from "./Converter";
|
2 | import { ProcessLineResult } from "./Processor";
|
3 | import P from "bluebird";
|
4 | import CSVError from "./CSVError";
|
5 | import { EOL } from "os";
|
6 | export class Result {
|
7 | private get needEmitLine(): boolean {
|
8 | return !!this.converter.parseRuntime.subscribe && !!this.converter.parseRuntime.subscribe.onNext || this.needPushDownstream
|
9 | }
|
10 | private _needPushDownstream?: boolean;
|
11 | private get needPushDownstream(): boolean {
|
12 | if (this._needPushDownstream === undefined) {
|
13 | this._needPushDownstream = this.converter.listeners("data").length > 0 || this.converter.listeners("readable").length > 0;
|
14 | }
|
15 | return this._needPushDownstream;
|
16 | }
|
17 | private get needEmitAll(): boolean {
|
18 | return !!this.converter.parseRuntime.then && this.converter.parseParam.needEmitAll;
|
19 |
|
20 | }
|
21 | private finalResult: any[] = [];
|
22 | constructor(private converter: Converter) { }
|
23 | processResult(resultLines: ProcessLineResult[]): P<any> {
|
24 | const startPos = this.converter.parseRuntime.parsedLineNumber;
|
25 | if (this.needPushDownstream && this.converter.parseParam.downstreamFormat === "array") {
|
26 | if (startPos === 0) {
|
27 | pushDownstream(this.converter, "[" + EOL);
|
28 | }
|
29 | }
|
30 |
|
31 | return new P((resolve, reject) => {
|
32 | if (this.needEmitLine) {
|
33 | processLineByLine(
|
34 | resultLines,
|
35 | this.converter,
|
36 | 0,
|
37 | this.needPushDownstream,
|
38 | (err) => {
|
39 | if (err) {
|
40 | reject(err);
|
41 | } else {
|
42 | this.appendFinalResult(resultLines);
|
43 | resolve();
|
44 | }
|
45 | },
|
46 | )
|
47 |
|
48 | } else {
|
49 | this.appendFinalResult(resultLines);
|
50 | resolve();
|
51 | }
|
52 | })
|
53 | }
|
54 | appendFinalResult(lines: any[]) {
|
55 | if (this.needEmitAll) {
|
56 | this.finalResult = this.finalResult.concat(lines);
|
57 | }
|
58 | this.converter.parseRuntime.parsedLineNumber += lines.length;
|
59 | }
|
60 | processError(err: CSVError) {
|
61 | if (this.converter.parseRuntime.subscribe && this.converter.parseRuntime.subscribe.onError) {
|
62 | this.converter.parseRuntime.subscribe.onError(err);
|
63 | }
|
64 | if (this.converter.parseRuntime.then && this.converter.parseRuntime.then.onrejected) {
|
65 | this.converter.parseRuntime.then.onrejected(err);
|
66 | }
|
67 | }
|
68 | endProcess() {
|
69 |
|
70 | if (this.converter.parseRuntime.then && this.converter.parseRuntime.then.onfulfilled) {
|
71 | if (this.needEmitAll) {
|
72 | this.converter.parseRuntime.then.onfulfilled(this.finalResult);
|
73 | }else{
|
74 | this.converter.parseRuntime.then.onfulfilled([]);
|
75 | }
|
76 | }
|
77 | if (this.converter.parseRuntime.subscribe && this.converter.parseRuntime.subscribe.onCompleted) {
|
78 | this.converter.parseRuntime.subscribe.onCompleted();
|
79 | }
|
80 | if (this.needPushDownstream && this.converter.parseParam.downstreamFormat === "array") {
|
81 | pushDownstream(this.converter, "]" + EOL);
|
82 | }
|
83 | }
|
84 | }
|
85 |
|
86 | function processLineByLine(
|
87 | lines: ProcessLineResult[],
|
88 |
|
89 | conv: Converter,
|
90 | offset: number,
|
91 | needPushDownstream: boolean,
|
92 | cb: (err?) => void,
|
93 | ) {
|
94 | if (offset >= lines.length) {
|
95 | cb();
|
96 | } else {
|
97 | if (conv.parseRuntime.subscribe && conv.parseRuntime.subscribe.onNext) {
|
98 | const hook = conv.parseRuntime.subscribe.onNext;
|
99 | const nextLine = lines[offset];
|
100 | const res = hook(nextLine, conv.parseRuntime.parsedLineNumber + offset);
|
101 | offset++;
|
102 |
|
103 | if (res && res.then) {
|
104 | res.then(function () {
|
105 | processRecursive(lines, hook, conv, offset, needPushDownstream, cb, nextLine);
|
106 | }, cb);
|
107 | } else {
|
108 |
|
109 | if (needPushDownstream) {
|
110 | pushDownstream(conv, nextLine);
|
111 | }
|
112 | while (offset < lines.length) {
|
113 | const line = lines[offset];
|
114 | hook(line, conv.parseRuntime.parsedLineNumber + offset);
|
115 | offset++;
|
116 | if (needPushDownstream) {
|
117 | pushDownstream(conv, line);
|
118 | }
|
119 | }
|
120 | cb();
|
121 | }
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 |
|
129 | } else {
|
130 | if (needPushDownstream) {
|
131 | while (offset < lines.length) {
|
132 | const line = lines[offset++];
|
133 | pushDownstream(conv, line);
|
134 | }
|
135 |
|
136 | }
|
137 | cb();
|
138 | }
|
139 |
|
140 | }
|
141 | }
|
142 |
|
143 | function processRecursive(
|
144 | lines: ProcessLineResult[],
|
145 | hook: (data: any, lineNumber: number) => void | PromiseLike<void>,
|
146 | conv: Converter,
|
147 | offset: number,
|
148 | needPushDownstream: boolean,
|
149 | cb: (err?) => void,
|
150 | res: ProcessLineResult,
|
151 | ) {
|
152 | if (needPushDownstream) {
|
153 | pushDownstream(conv, res);
|
154 | }
|
155 | processLineByLine(lines, conv, offset, needPushDownstream, cb);
|
156 | }
|
157 | function pushDownstream(conv: Converter, res: ProcessLineResult) {
|
158 | if (typeof res === "object" && !conv.options.objectMode) {
|
159 | const data = JSON.stringify(res);
|
160 | conv.push(data + (conv.parseParam.downstreamFormat === "array" ? "," + EOL : EOL), "utf8");
|
161 | } else {
|
162 | conv.push(res);
|
163 | }
|
164 | } |
\ | No newline at end of file |