UNPKG

2.24 kBJavaScriptView Raw
1const Streamz = require('streamz');
2const util = require('util');
3
4function Fixed(layout,options) {
5 if (!(this instanceof Fixed))
6 return new Fixed(layout,options);
7
8 Streamz.call(this);
9
10 this.options = options || {};
11
12 let n = 0;
13
14 // If the layout is an array, we reduce to an object
15 if(layout.length)
16 layout = layout.reduce((p,d) => {
17 p[d.field] = d;
18 return p;
19 },{});
20
21 // Take note of the record length by looking for last `end`
22 this.recordLength = Object.keys(layout).reduce((p,key) => {
23 if (!isNaN(layout[key]))
24 layout[key] = { length: layout[key] };
25
26 const item = layout[key];
27 if (!item.start)
28 item.start = n;
29 if (!item.end)
30 item.end = item.start + item.length;
31 n = item.end || 0;
32 return Math.max(p, n);
33 },0);
34
35 this.layout = layout;
36}
37
38util.inherits(Fixed,Streamz);
39
40Fixed.prototype.__line = 0;
41
42Fixed.prototype._push = function() {
43
44 if (!this.buffer || this.buffer.text.length < this.recordLength)
45 return;
46
47 const layout = this.layout;
48
49 const obj = Object.create(this.buffer);
50 obj.text = obj.text.slice(0,this.recordLength);
51
52 Object.keys(layout)
53 .forEach(key => {
54 const e = layout[key];
55 let val = obj.text.slice(e.start,e.end || e.start + e.length).trim();
56 if (!val.length)
57 return;
58 if (e.transform)
59 val = e.transform(val, (this.__line + 1));
60 if (val !== undefined)
61 obj[key] = val;
62 });
63
64 if (this.options.clean)
65 delete obj.text;
66 else
67 obj.__line = ++this.__line;
68
69 this.push(obj);
70
71 this.buffer.text = this.buffer.text.slice(this.recordLength);
72 return this._push();
73};
74
75Fixed.prototype._fn = function(d) {
76
77 if (d instanceof Buffer || typeof d !== 'object') {
78 d = Object.create({},{
79 // text should be non-enumerable
80 text: {
81 value: d.toString('utf8'),
82 writable: true,
83 configurable: true
84 }
85 });
86 }
87
88 if (!this.buffer) {
89 this.buffer = Object.create(!this.options.clean ? d : {});
90 this.buffer.text = '';
91 }
92
93 this.buffer.text += d.text;
94 this._push();
95};
96
97Fixed.prototype._flush = function(cb) {
98 this._push();
99 setImmediate( () => Streamz.prototype._flush(cb));
100};
101
102module.exports = Fixed;
\No newline at end of file