UNPKG

1.3 kBJavaScriptView Raw
1const Streamz = require('streamz');
2const util = require('util');
3
4function Collect(maxSize,maxDuration,maxTextLength) {
5 if (!(this instanceof Streamz))
6 return new Collect(maxSize,maxDuration,maxTextLength);
7 Streamz.call(this);
8 // Allow a custom collection function as first argument
9 if (typeof maxSize === 'function')
10 this._fn = maxSize;
11 this.maxSize = maxSize;
12 this.textLength = 0;
13 this.maxTextLength = maxTextLength;
14 this.maxDuration = maxDuration;
15 this.buffer = [];
16}
17
18util.inherits(Collect,Streamz);
19
20Collect.prototype.buffer = undefined;
21
22Collect.prototype._push = function() {
23 this.textLength = 0;
24 if (this.buffer.length)
25 this.push(this.buffer);
26
27 if (this.timeout) {
28 clearTimeout(this.timeout);
29 this.timeout = undefined;
30 }
31
32 this.buffer = [];
33};
34
35Collect.prototype._fn = function(d) {
36 this.buffer.push(d);
37
38 if (this.maxDuration && !this.timeout)
39 this.timeout = setTimeout(this._push.bind(this),this.maxDuration);
40
41 if (this.maxTextLength && (this.textLength += JSON.stringify(d).length) > this.maxTextLength)
42 this._push();
43
44 if(this.buffer.length >= this.maxSize)
45 this._push();
46};
47
48Collect.prototype._flush = function(cb) {
49 this._push();
50 setImmediate( () => Streamz.prototype._flush(cb));
51};
52
53module.exports = Collect;
\No newline at end of file