1 | const Streamz = require('streamz');
|
2 | const util = require('util');
|
3 |
|
4 | function Timeout(ms) {
|
5 | if (!(this instanceof Streamz))
|
6 | return new Timeout(ms);
|
7 | Streamz.call(this);
|
8 |
|
9 | this.interval = setInterval(() => {
|
10 | if (this.last && (new Date()) - this.last > ms) {
|
11 | this.emit('error','ETL_TIMEOUT');
|
12 | clearInterval(this.interval);
|
13 | }
|
14 | },ms);
|
15 | }
|
16 |
|
17 | util.inherits(Timeout,Streamz);
|
18 |
|
19 | Timeout.prototype._fn = function(d) {
|
20 | this.last = new Date();
|
21 | return d;
|
22 | };
|
23 |
|
24 | Timeout.prototype._flush = function(cb) {
|
25 | clearInterval(this.interval);
|
26 | return Streamz.prototype._flush.call(this,cb);
|
27 | };
|
28 |
|
29 | module.exports = Timeout; |
\ | No newline at end of file |