1 | const Streamz = require('streamz');
|
2 | const util = require('util');
|
3 |
|
4 | function Split(symbol) {
|
5 | if (!(this instanceof Streamz))
|
6 | return new Split(symbol);
|
7 |
|
8 | Streamz.call(this);
|
9 | this.symbol = symbol || '\n';
|
10 | }
|
11 |
|
12 | util.inherits(Split,Streamz);
|
13 |
|
14 | Split.prototype.buffer = '';
|
15 |
|
16 | Split.prototype.__line = 0;
|
17 |
|
18 | Split.prototype._push = function() {
|
19 | if (this.buffer) {
|
20 | this.buffer.__line = this.__line++;
|
21 | this.push(this.buffer);
|
22 | }
|
23 | delete this.buffer;
|
24 | };
|
25 |
|
26 | Split.prototype._fn = function(d) {
|
27 | if (d instanceof Buffer || typeof d !== 'object')
|
28 | d = { text: d.toString('utf8') };
|
29 |
|
30 | if (!this.buffer) {
|
31 | this.buffer = Object.create(d);
|
32 | this.buffer.text = '';
|
33 | }
|
34 |
|
35 | const buffer = (this.buffer.text += d.text).split(this.symbol);
|
36 |
|
37 | buffer.slice(0,buffer.length-1)
|
38 | .forEach(d => {
|
39 | const obj = Object.create(this.buffer);
|
40 | obj.text = d;
|
41 | obj.__line = this.__line++;
|
42 | this.push(obj);
|
43 | },this);
|
44 |
|
45 | this.buffer.text = buffer[buffer.length-1];
|
46 | };
|
47 |
|
48 | Split.prototype._flush = function(cb) {
|
49 | this._push();
|
50 | setImmediate( () => Streamz.prototype._flush(cb));
|
51 | };
|
52 |
|
53 | module.exports = Split; |
\ | No newline at end of file |