UNPKG

5.39 kBPlain TextView Raw
1import { Converter } from "./Converter";
2import { ProcessLineResult } from "./Processor";
3import P from "bluebird";
4import CSVError from "./CSVError";
5import { EOL } from "os";
6export class Result {
7 private get needEmitLine(): boolean {
8 return !!this.converter.parseRuntime.subscribe && !!this.converter.parseRuntime.subscribe.onNext || this.needPushDownstream
9 }
10 private _needPushDownstream?: boolean;
11 private get needPushDownstream(): boolean {
12 if (this._needPushDownstream === undefined) {
13 this._needPushDownstream = this.converter.listeners("data").length > 0 || this.converter.listeners("readable").length > 0;
14 }
15 return this._needPushDownstream;
16 }
17 private get needEmitAll(): boolean {
18 return !!this.converter.parseRuntime.then && this.converter.parseParam.needEmitAll;
19 // return !!this.converter.parseRuntime.then;
20 }
21 private finalResult: any[] = [];
22 constructor(private converter: Converter) { }
23 processResult(resultLines: ProcessLineResult[]): P<any> {
24 const startPos = this.converter.parseRuntime.parsedLineNumber;
25 if (this.needPushDownstream && this.converter.parseParam.downstreamFormat === "array") {
26 if (startPos === 0) {
27 pushDownstream(this.converter, "[" + EOL);
28 }
29 }
30 // let prom: P<any>;
31 return new P((resolve, reject) => {
32 if (this.needEmitLine) {
33 processLineByLine(
34 resultLines,
35 this.converter,
36 0,
37 this.needPushDownstream,
38 (err) => {
39 if (err) {
40 reject(err);
41 } else {
42 this.appendFinalResult(resultLines);
43 resolve();
44 }
45 },
46 )
47 // resolve();
48 } else {
49 this.appendFinalResult(resultLines);
50 resolve();
51 }
52 })
53 }
54 appendFinalResult(lines: any[]) {
55 if (this.needEmitAll) {
56 this.finalResult = this.finalResult.concat(lines);
57 }
58 this.converter.parseRuntime.parsedLineNumber += lines.length;
59 }
60 processError(err: CSVError) {
61 if (this.converter.parseRuntime.subscribe && this.converter.parseRuntime.subscribe.onError) {
62 this.converter.parseRuntime.subscribe.onError(err);
63 }
64 if (this.converter.parseRuntime.then && this.converter.parseRuntime.then.onrejected) {
65 this.converter.parseRuntime.then.onrejected(err);
66 }
67 }
68 endProcess() {
69
70 if (this.converter.parseRuntime.then && this.converter.parseRuntime.then.onfulfilled) {
71 if (this.needEmitAll) {
72 this.converter.parseRuntime.then.onfulfilled(this.finalResult);
73 }else{
74 this.converter.parseRuntime.then.onfulfilled([]);
75 }
76 }
77 if (this.converter.parseRuntime.subscribe && this.converter.parseRuntime.subscribe.onCompleted) {
78 this.converter.parseRuntime.subscribe.onCompleted();
79 }
80 if (this.needPushDownstream && this.converter.parseParam.downstreamFormat === "array") {
81 pushDownstream(this.converter, "]" + EOL);
82 }
83 }
84}
85
86function processLineByLine(
87 lines: ProcessLineResult[],
88
89 conv: Converter,
90 offset: number,
91 needPushDownstream: boolean,
92 cb: (err?) => void,
93) {
94 if (offset >= lines.length) {
95 cb();
96 } else {
97 if (conv.parseRuntime.subscribe && conv.parseRuntime.subscribe.onNext) {
98 const hook = conv.parseRuntime.subscribe.onNext;
99 const nextLine = lines[offset];
100 const res = hook(nextLine, conv.parseRuntime.parsedLineNumber + offset);
101 offset++;
102 // if (isAsync === undefined) {
103 if (res && res.then) {
104 res.then(function () {
105 processRecursive(lines, hook, conv, offset, needPushDownstream, cb, nextLine);
106 }, cb);
107 } else {
108 // processRecursive(lines, hook, conv, offset, needPushDownstream, cb, nextLine, false);
109 if (needPushDownstream) {
110 pushDownstream(conv, nextLine);
111 }
112 while (offset < lines.length) {
113 const line = lines[offset];
114 hook(line, conv.parseRuntime.parsedLineNumber + offset);
115 offset++;
116 if (needPushDownstream) {
117 pushDownstream(conv, line);
118 }
119 }
120 cb();
121 }
122 // } else if (isAsync === true) {
123 // (res as PromiseLike<void>).then(function () {
124 // processRecursive(lines, hook, conv, offset, needPushDownstream, cb, nextLine, true);
125 // }, cb);
126 // } else if (isAsync === false) {
127 // processRecursive(lines, hook, conv, offset, needPushDownstream, cb, nextLine, false);
128 // }
129 } else {
130 if (needPushDownstream) {
131 while (offset < lines.length) {
132 const line = lines[offset++];
133 pushDownstream(conv, line);
134 }
135
136 }
137 cb();
138 }
139
140 }
141}
142
143function processRecursive(
144 lines: ProcessLineResult[],
145 hook: (data: any, lineNumber: number) => void | PromiseLike<void>,
146 conv: Converter,
147 offset: number,
148 needPushDownstream: boolean,
149 cb: (err?) => void,
150 res: ProcessLineResult,
151) {
152 if (needPushDownstream) {
153 pushDownstream(conv, res);
154 }
155 processLineByLine(lines, conv, offset, needPushDownstream, cb);
156}
157function pushDownstream(conv: Converter, res: ProcessLineResult) {
158 if (typeof res === "object" && !conv.options.objectMode) {
159 const data = JSON.stringify(res);
160 conv.push(data + (conv.parseParam.downstreamFormat === "array" ? "," + EOL : EOL), "utf8");
161 } else {
162 conv.push(res);
163 }
164}
\No newline at end of file