1 | const Streamz = require('streamz');
|
2 | const util = require('util');
|
3 |
|
4 | function Collect(maxSize,maxDuration,maxTextLength) {
|
5 | if (!(this instanceof Streamz))
|
6 | return new Collect(maxSize,maxDuration,maxTextLength);
|
7 | Streamz.call(this);
|
8 |
|
9 | if (typeof maxSize === 'function')
|
10 | this._fn = maxSize;
|
11 | this.maxSize = maxSize;
|
12 | this.textLength = 0;
|
13 | this.maxTextLength = maxTextLength;
|
14 | this.maxDuration = maxDuration;
|
15 | this.buffer = [];
|
16 | }
|
17 |
|
18 | util.inherits(Collect,Streamz);
|
19 |
|
20 | Collect.prototype.buffer = undefined;
|
21 |
|
22 | Collect.prototype._push = function() {
|
23 | this.textLength = 0;
|
24 | if (this.buffer.length)
|
25 | this.push(this.buffer);
|
26 |
|
27 | if (this.timeout) {
|
28 | clearTimeout(this.timeout);
|
29 | this.timeout = undefined;
|
30 | }
|
31 |
|
32 | this.buffer = [];
|
33 | };
|
34 |
|
35 | Collect.prototype._fn = function(d) {
|
36 | this.buffer.push(d);
|
37 |
|
38 | if (this.maxDuration && !this.timeout)
|
39 | this.timeout = setTimeout(this._push.bind(this),this.maxDuration);
|
40 |
|
41 | if (this.maxTextLength && (this.textLength += JSON.stringify(d).length) > this.maxTextLength)
|
42 | this._push();
|
43 |
|
44 | if(this.buffer.length >= this.maxSize)
|
45 | this._push();
|
46 | };
|
47 |
|
48 | Collect.prototype._flush = function(cb) {
|
49 | this._push();
|
50 | setImmediate( () => Streamz.prototype._flush(cb));
|
51 | };
|
52 |
|
53 | module.exports = Collect; |
\ | No newline at end of file |