UNPKG

628 BJavaScriptView Raw
1const Streamz = require('streamz');
2const util = require('util');
3
4function Timeout(ms) {
5 if (!(this instanceof Streamz))
6 return new Timeout(ms);
7 Streamz.call(this);
8
9 this.interval = setInterval(() => {
10 if (this.last && (new Date()) - this.last > ms) {
11 this.emit('error','ETL_TIMEOUT');
12 clearInterval(this.interval);
13 }
14 },ms);
15}
16
17util.inherits(Timeout,Streamz);
18
19Timeout.prototype._fn = function(d) {
20 this.last = new Date();
21 return d;
22};
23
24Timeout.prototype._flush = function(cb) {
25 clearInterval(this.interval);
26 return Streamz.prototype._flush.call(this,cb);
27};
28
29module.exports = Timeout;
\No newline at end of file