1 | const Streamz = require('streamz');
|
2 | const util = require('util');
|
3 | const Csv = require('csv-parser');
|
4 |
|
5 | function Csv_parser(options) {
|
6 | if (!(this instanceof Streamz))
|
7 | return new Csv_parser(options);
|
8 |
|
9 | Streamz.call(this);
|
10 |
|
11 | this.options = options = options || {};
|
12 | this.options.transform = options.transform || {};
|
13 | this.csv = Csv(options);
|
14 |
|
15 | const _compile = this.csv._compile;
|
16 | const self = this;
|
17 |
|
18 | this.csv._compile = function() {
|
19 | if (options.sanitize)
|
20 | this.headers = this.headers.map(header => String(header)
|
21 | .trim()
|
22 | .toLowerCase()
|
23 | .replace(/\./g,'')
|
24 | .replace(/\'/g,'')
|
25 | .replace(/\s+/g,'_')
|
26 | .replace(/\u2013|\u2014/g, '-')
|
27 | .replace(/\//g,'_')
|
28 | );
|
29 |
|
30 | self.emit('headers',this.headers);
|
31 | return _compile.call(this);
|
32 | };
|
33 | this.csv.on('data',data => this._push(data));
|
34 | }
|
35 |
|
36 | util.inherits(Csv_parser,Streamz);
|
37 |
|
38 | Csv_parser.prototype.base = {};
|
39 |
|
40 | Csv_parser.prototype.line = 1;
|
41 |
|
42 | Csv_parser.prototype._fn = function(d) {
|
43 | if (d instanceof Buffer || typeof d !== 'object')
|
44 | d = Object.create({},{
|
45 |
|
46 | text: {
|
47 | value: d.toString('utf8'),
|
48 | writable: true,
|
49 | configurable: true
|
50 | }
|
51 | });
|
52 | if (typeof d === 'object')
|
53 | this.base = d;
|
54 | this.csv.write(d.text || d);
|
55 | };
|
56 |
|
57 | Csv_parser.prototype._push = function(d) {
|
58 | const obj = Object.create(this.base);
|
59 | for (let key in d) {
|
60 | if (this.options.sanitize && typeof d[key] === 'string' && !d[key].trim().length) {
|
61 | d[key] = undefined;
|
62 | } else {
|
63 | const transform = this.options.transform[key];
|
64 | if (typeof transform === 'function')
|
65 | obj[key] = transform(d[key]);
|
66 | else if (transform !== null)
|
67 | obj[key] = d[key];
|
68 | }
|
69 | }
|
70 | obj.__line = ++this.line;
|
71 | this.push(obj);
|
72 | };
|
73 |
|
74 | Csv_parser.prototype._flush = function(cb) {
|
75 | this.csv.end();
|
76 | setImmediate( () => Streamz.prototype._flush(cb));
|
77 | };
|
78 |
|
79 | module.exports = Csv_parser; |
\ | No newline at end of file |