UNPKG

31.8 kBJavaScriptView Raw
1'use strict';
2
3// Utility that collects real time function call performance and reliability
4// metrics. The stats are computed using rolling windows of call successes,
5// failures, timeouts, circuit breaker rejections and latencies.
6
7var _ = require('underscore');
8var events = require('abacus-events');
9
10var keys = _.keys;
11var filter = _.filter;
12var last = _.last;
13var extend = _.extend;
14var reduce = _.reduce;
15var map = _.map;
16var find = _.find;
17
18// Setup debug log
19var debug = require('abacus-debug')('abacus-perf');
20
21// Set up an event emitter allowing other modules to listen to accumulated call
22// stats events
23var emitter = events.emitter('call-metrics/emitter');
24var on = function on(e, l) {
25 emitter.on(e, l);
26};
27
28// Set up an event emitter used to report function call metrics
29var calls = events.emitter('abacus-perf/calls');
30
31// Report function call metrics
32var report = function report(name, time, latency, err, timeout, reject, circuit) {
33 var t = time || Date.now();
34 var call = function call() {
35 return {
36 name: name,
37 time: t,
38 latency: latency === undefined ? Date.now() - t : latency,
39 error: err || undefined,
40 timeout: timeout || 0,
41 reject: reject || false,
42 circuit: circuit || 'closed'
43 };
44 };
45 calls.emit('message', {
46 metrics: {
47 call: call()
48 }
49 });
50};
51
52// Convert a list of buckets to a list of buckets in a rolling time window.
53// Filter out the buckets that are out of the time window, and create a new
54// bucket if necessary for the given time.
55var roll = function roll(buckets, time, win, max, proto) {
56 var i = Math.ceil(time / (win / max));
57 var current = filter(buckets, function (b) {
58 return b.i > i - max;
59 });
60 var all = current.length !== 0 && last(current).i === i ? current : current.concat([extend({}, proto(), {
61 i: i
62 })]);
63 return last(all, max);
64};
65
66// Return a rolling window of 10 secs of counts
67var rollCounts = function rollCounts(buckets, time) {
68 return roll(buckets, time, 10000, 10, function () {
69 return {
70 i: 0,
71 ok: 0,
72 errors: 0,
73 timeouts: 0,
74 rejects: 0
75 };
76 });
77};
78
79// Return a rolling window of 60 secs of latencies
80var rollLatencies = function rollLatencies(buckets, time) {
81 return roll(buckets, time, 60000, 6, function () {
82 return {
83 i: 0,
84 latencies: []
85 };
86 });
87};
88
89// Return a rolling window of 2 secs of health reports
90var rollHealth = function rollHealth(buckets, time, counts) {
91 // Compute health from the given counts
92 var b = reduce(counts, function (a, c) {
93 return {
94 ok: a.ok + c.ok,
95 errors: a.errors + c.errors + c.timeouts + c.rejects
96 };
97 }, {
98 ok: 0,
99 errors: 0
100 });
101
102 // Roll the window and store the computed health in the last bucket
103 var health = roll(buckets, time, 2000, 4, function () {
104 return {
105 i: 0,
106 ok: 0,
107 errors: 0
108 };
109 });
110 extend(last(health), b);
111 return health;
112};
113
114// Store accumulated function call stats per function name
115// Warning: accumulatedStats is a mutable variable
116var accumulatedStats = {};
117
118// Return the accumulated stats for a function
119var stats = function stats(name, time, roll) {
120 var t = time || Date.now();
121 var s = accumulatedStats[name] || {
122 name: name,
123 time: t,
124 counts: [],
125 latencies: [],
126 health: [],
127 circuit: 'closed'
128 };
129 /* eslint no-extra-parens: 1 */
130 return roll !== false ? (function () {
131 var counts = rollCounts(s.counts, t);
132 return {
133 name: name,
134 time: t,
135 counts: counts,
136 latencies: rollLatencies(s.latencies, t),
137 health: rollHealth(s.health, t, counts),
138 circuit: s.circuit
139 };
140 })() : {
141 name: name,
142 time: t,
143 counts: s.counts,
144 latencies: s.latencies,
145 health: s.health,
146 circuit: s.circuit
147 };
148};
149
150// Reset the accumulated reliability stats for a function (but keep the
151// accumulated latencies)
152var reset = function reset(name, time) {
153 var t = time || Date.now();
154
155 // Warning: mutating variable accumulatedStats
156 /* eslint no-extra-parens: 1 */
157 var astats = (function (s) {
158 return s ? {
159 name: name,
160 time: t,
161 counts: [],
162 latencies: s.latencies,
163 health: [],
164 circuit: 'closed'
165 } : {
166 name: name,
167 time: t,
168 counts: [],
169 latencies: [],
170 health: [],
171 circuit: 'closed'
172 };
173 })(accumulatedStats[name]);
174 accumulatedStats[name] = astats;
175
176 // Propagate new stats to all the listeners
177 debug('Emitting stats for function %s', name);
178 emitter.emit('message', {
179 metrics: {
180 stats: astats
181 }
182 });
183 return astats;
184};
185
186// Return all the accumulated stats
187var all = function all(time, roll) {
188 var t = time || Date.now();
189 return map(keys(accumulatedStats), function (k) {
190 return stats(k, t, roll);
191 });
192};
193
194// Process function call metrics and update accumulated call stats
195var accumulateStats = function accumulateStats(name, time, latency, err, timeout, reject, circuit) {
196 debug('Accumulating stats for function %s', name);
197 debug('latency %d, err %s, timeout %d, reject %s, circuit %s', latency, err, timeout, reject, circuit);
198
199 // Retrieve the current call stats for the given function
200 var s = stats(name, time, false);
201
202 // Compute up to date counts window and increment counts in the last bucket
203 var counts = rollCounts(s ? s.counts : [], time);
204 var updateCount = function updateCount(c) {
205 c.ok = c.ok + (!err && !timeout && !reject ? 1 : 0);
206 c.errors = c.errors + (err ? 1 : 0);
207 c.timeouts = c.timeouts + (timeout ? 1 : 0);
208 c.rejects = c.rejects + (reject ? 1 : 0);
209 debug('%d ok, %d errors, %d timeouts, %d rejects, %d count buckets', c.ok, c.errors, c.timeouts, c.rejects, counts.length);
210 };
211 updateCount(last(counts));
212
213 // Compute up to date latencies window and add latency to the last bucket,
214 // up to the max bucket size
215 var latencies = rollLatencies(s ? s.latencies : [], time);
216 if (!err && !timeout && !reject) {
217 var updateLatency = function updateLatency(l) {
218 l.latencies = l.latencies.length < 100 ? l.latencies.concat([latency]) : l.latencies;
219 debug('%d latencies, %d latencies buckets', l.latencies.length, latencies.length);
220 };
221 updateLatency(last(latencies));
222 }
223
224 // Compute up to date health report window
225 var health = rollHealth(s ? s.health : [], time, counts);
226 var h = last(health);
227 debug('%d ok, %d errors, %d health buckets', h.ok, h.errors, health.length);
228
229 // Store and return the new accumulated function call stats
230 // Warning: mutating variable accumulatedStats
231 var astats = {
232 name: name,
233 time: time,
234 counts: counts,
235 latencies: latencies,
236 health: health,
237 circuit: circuit
238 };
239 accumulatedStats[name] = astats;
240
241 // Propagate new stats to all the listeners
242 debug('Emitting stats for function %s', name);
243 emitter.emit('message', {
244 metrics: {
245 stats: astats
246 }
247 });
248 return astats;
249};
250
251// Process function call metrics messages and function call stats messages
252var onMessage = function onMessage(msg) {
253 if (msg.metrics) {
254 debug('Received message %o', keys(msg).concat(keys(msg.metrics)));
255 if (msg.metrics.call) {
256 // Process call metrics and emit updated accumulated call stats
257 var c = msg.metrics.call;
258 accumulateStats(c.name, c.time, c.latency, c.error, c.timeout, c.reject, c.circuit);
259 }
260 if (msg.metrics.stats) {
261 // Store latest accumulated stats
262 debug('Storing stats for function %s', msg.metrics.stats.name);
263 accumulatedStats[msg.metrics.stats.name] = msg.metrics.stats;
264 }
265 }
266};
267
268// Determine the health of the app based on the accumulated metrics
269var healthy = function healthy(threshold) {
270
271 // Go through each function call metrics
272 return find(all(Date.now(), false), function (stat) {
273
274 // Go through its health status and calculate total requests & errors
275 var total = reduce(stat.health, function (a, c) {
276 return {
277 requests: a.requests + a.errors + c.ok + c.errors,
278 errors: a.errors + c.errors
279 };
280 }, {
281 requests: 0,
282 errors: 0
283 });
284
285 var percent = 100 * (total.errors / total.requests || 1);
286 debug('%s has %d% failure rate', stat.name, percent);
287
288 // If one function call is not healthy, conclude that the app is
289 // not healthy.
290 return percent > (threshold || 5);
291 }) ? false : true;
292};
293
294calls.on('message', onMessage);
295
296// Export our public functions
297module.exports.report = report;
298module.exports.stats = stats;
299module.exports.reset = reset;
300module.exports.healthy = healthy;
301module.exports.all = all;
302module.exports.onMessage = onMessage;
303module.exports.on = on;
304//# sourceMappingURL=data:application/json;base64,
\No newline at end of file