1 | //
|
2 | //
|
3 | //
|
4 |
|
5 | // Heartbeats. In AMQP both clients and servers may expect a heartbeat
|
6 | // frame if there is no activity on the connection for a negotiated
|
7 | // period of time. If there's no activity for two such intervals, the
|
8 | // server or client is allowed to close the connection on the
|
9 | // presumption that the other party is dead.
|
10 | //
|
11 | // The client has two jobs here: the first is to send a heartbeat
|
12 | // frame if it's not sent any frames for a while, so that the server
|
13 | // doesn't think it's dead; the second is to check periodically that
|
14 | // it's seen activity from the server, and to advise if there doesn't
|
15 | // appear to have been any for over two intervals.
|
16 | //
|
17 | // Node.JS timers are a bit unreliable, in that they endeavour only to
|
18 | // fire at some indeterminate point *after* the given time (rather
|
19 | // gives the lie to 'realtime', dunnit). Because the scheduler is just
|
20 | // an event loop, it's quite easy to delay timers indefinitely by
|
21 | // reacting to some I/O with a lot of computation.
|
22 | //
|
23 | // To mitigate this I need a bit of creative interpretation:
|
24 | //
|
25 | // - I'll schedule a server activity check for every `interval`, and
|
26 | // check just how much time has passed. It will overshoot by at
|
27 | // least a small margin; modulo missing timer deadlines, it'll
|
28 | // notice between two and three intervals after activity actually
|
29 | // stops (otherwise, at some point after two intervals).
|
30 | //
|
31 | // - Every `interval / 2` I'll check that we've sent something since
|
32 | // the last check, and if not, send a heartbeat frame. If we're
|
33 | // really too busy to even run the check for two whole heartbeat
|
34 | // intervals, there must be a lot of I (but not O, at least not on
|
35 | // the connection), or computation, in which case perhaps it's best
|
36 | // the server cuts us off anyway. Why `interval / 2`? Because the
|
37 | // edge case is that the client sent a frame just after a
|
38 | // heartbeat, which would mean I only send one after almost two
|
39 | // intervals. (NB a heartbeat counts as a send, so it'll be checked
|
40 | // at least twice before sending another)
|
41 | //
|
42 | // This design is based largely on RabbitMQ's heartbeating:
|
43 | // https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_heartbeat.erl
|
44 |
|
45 | // %% Yes, I could apply the same 'actually passage of time' thing to
|
46 | // %% send as well as to recv.
|
47 |
|
48 | var inherits = require('util').inherits;
|
49 | var EventEmitter = require('events').EventEmitter;
|
50 |
|
51 | // Exported so that we can mess with it in tests
|
52 | module.exports.UNITS_TO_MS = 1000;
|
53 |
|
54 | function Heart(interval, checkSend, checkRecv) {
|
55 | EventEmitter.call(this);
|
56 | this.interval = interval;
|
57 |
|
58 | var intervalMs = interval * module.exports.UNITS_TO_MS;
|
59 | // Function#bind is my new best friend
|
60 | var beat = this.emit.bind(this, 'beat');
|
61 | var timeout = this.emit.bind(this, 'timeout');
|
62 |
|
63 | this.sendTimer = setInterval(
|
64 | this.runHeartbeat.bind(this, checkSend, beat), intervalMs / 2);
|
65 |
|
66 | // A timeout occurs if I see nothing for *two consecutive* intervals
|
67 | var recvMissed = 0;
|
68 | function missedTwo() {
|
69 | if (!checkRecv()) return (++recvMissed < 2);
|
70 | else { recvMissed = 0; return true; }
|
71 | }
|
72 | this.recvTimer = setInterval(
|
73 | this.runHeartbeat.bind(this, missedTwo, timeout), intervalMs);
|
74 | }
|
75 | inherits(Heart, EventEmitter);
|
76 |
|
77 | module.exports.Heart = Heart;
|
78 |
|
79 | Heart.prototype.clear = function() {
|
80 | clearInterval(this.sendTimer);
|
81 | clearInterval(this.recvTimer);
|
82 | };
|
83 |
|
84 | Heart.prototype.runHeartbeat = function(check, fail) {
|
85 | // Have we seen activity?
|
86 | if (!check()) fail();
|
87 | };
|