1 | const Streamz = require('streamz');
|
2 | const util = require('util');
|
3 |
|
4 | function Fixed(layout,options) {
|
5 | if (!(this instanceof Fixed))
|
6 | return new Fixed(layout,options);
|
7 |
|
8 | Streamz.call(this);
|
9 |
|
10 | this.options = options || {};
|
11 |
|
12 | let n = 0;
|
13 |
|
14 |
|
15 | if(layout.length)
|
16 | layout = layout.reduce((p,d) => {
|
17 | p[d.field] = d;
|
18 | return p;
|
19 | },{});
|
20 |
|
21 |
|
22 | this.recordLength = Object.keys(layout).reduce((p,key) => {
|
23 | if (!isNaN(layout[key]))
|
24 | layout[key] = { length: layout[key] };
|
25 |
|
26 | const item = layout[key];
|
27 | if (!item.start)
|
28 | item.start = n;
|
29 | if (!item.end)
|
30 | item.end = item.start + item.length;
|
31 | n = item.end || 0;
|
32 | return Math.max(p, n);
|
33 | },0);
|
34 |
|
35 | this.layout = layout;
|
36 | }
|
37 |
|
38 | util.inherits(Fixed,Streamz);
|
39 |
|
40 | Fixed.prototype.__line = 0;
|
41 |
|
42 | Fixed.prototype._push = function() {
|
43 |
|
44 | if (!this.buffer || this.buffer.text.length < this.recordLength)
|
45 | return;
|
46 |
|
47 | const layout = this.layout;
|
48 |
|
49 | const obj = Object.create(this.buffer);
|
50 | obj.text = obj.text.slice(0,this.recordLength);
|
51 |
|
52 | Object.keys(layout)
|
53 | .forEach(key => {
|
54 | const e = layout[key];
|
55 | let val = obj.text.slice(e.start,e.end || e.start + e.length).trim();
|
56 | if (!val.length)
|
57 | return;
|
58 | if (e.transform)
|
59 | val = e.transform(val, (this.__line + 1));
|
60 | if (val !== undefined)
|
61 | obj[key] = val;
|
62 | });
|
63 |
|
64 | if (this.options.clean)
|
65 | delete obj.text;
|
66 | else
|
67 | obj.__line = ++this.__line;
|
68 |
|
69 | this.push(obj);
|
70 |
|
71 | this.buffer.text = this.buffer.text.slice(this.recordLength);
|
72 | return this._push();
|
73 | };
|
74 |
|
75 | Fixed.prototype._fn = function(d) {
|
76 |
|
77 | if (d instanceof Buffer || typeof d !== 'object') {
|
78 | d = Object.create({},{
|
79 |
|
80 | text: {
|
81 | value: d.toString('utf8'),
|
82 | writable: true,
|
83 | configurable: true
|
84 | }
|
85 | });
|
86 | }
|
87 |
|
88 | if (!this.buffer) {
|
89 | this.buffer = Object.create(!this.options.clean ? d : {});
|
90 | this.buffer.text = '';
|
91 | }
|
92 |
|
93 | this.buffer.text += d.text;
|
94 | this._push();
|
95 | };
|
96 |
|
97 | Fixed.prototype._flush = function(cb) {
|
98 | this._push();
|
99 | setImmediate( () => Streamz.prototype._flush(cb));
|
100 | };
|
101 |
|
102 | module.exports = Fixed; |
\ | No newline at end of file |