UNPKG

8.39 kBJavaScriptView Raw
1'use strict';
2
3// Utility that collects real time function call performance and reliability
4// metrics. The stats are computed using rolling windows of call successes,
5// failures, timeouts, circuit breaker rejections and latencies.
6
7const _ = require('underscore');
8const events = require('abacus-events');
9
10const keys = _.keys;
11const filter = _.filter;
12const last = _.last;
13const extend = _.extend;
14const reduce = _.reduce;
15const map = _.map;
16const find = _.find;
17
18// Setup debug log
19const debug = require('abacus-debug')('abacus-perf');
20
21// Set up an event emitter allowing other modules to listen to accumulated call
22// stats events
23const emitter = events.emitter('call-metrics/emitter');
24const on = (e, l) => {
25 emitter.on(e, l);
26};
27
28// Set up an event emitter used to report function call metrics
29const calls = events.emitter('abacus-perf/calls');
30
31// Report function call metrics
32const report = (name, time, latency, err, timeout, reject, circuit) => {
33 const t = time || Date.now();
34 const call = () => ({
35 name: name,
36 time: t,
37 latency: latency === undefined ? Date.now() - t : latency,
38 error: err || undefined,
39 timeout: timeout || 0,
40 reject: reject || false,
41 circuit: circuit || 'closed'
42 });
43 calls.emit('message', {
44 metrics: {
45 call: call()
46 }
47 });
48};
49
50// Convert a list of buckets to a list of buckets in a rolling time window.
51// Filter out the buckets that are out of the time window, and create a new
52// bucket if necessary for the given time.
53const roll = (buckets, time, win, max, proto) => {
54 const i = Math.ceil(time / (win / max));
55 const current = filter(buckets, (b) => b.i > i - max);
56 const all = current.length !== 0 && last(current).i === i ? current :
57 current.concat([extend({}, proto(), {
58 i: i
59 })]);
60 return last(all, max);
61};
62
63// Return a rolling window of 10 secs of counts
64const rollCounts = (buckets, time) => {
65 return roll(buckets, time, 10000, 10, () => ({
66 i: 0,
67 ok: 0,
68 errors: 0,
69 timeouts: 0,
70 rejects: 0
71 }));
72};
73
74// Return a rolling window of 60 secs of latencies
75const rollLatencies = (buckets, time) => {
76 return roll(buckets, time, 60000, 6, () => ({
77 i: 0,
78 latencies: []
79 }));
80};
81
82// Return a rolling window of 2 secs of health reports
83const rollHealth = (buckets, time, counts) => {
84 // Compute health from the given counts
85 const b = reduce(counts, (a, c) => ({
86 ok: a.ok + c.ok,
87 errors: a.errors + c.errors + c.timeouts + c.rejects
88 }), {
89 ok: 0,
90 errors: 0
91 });
92
93 // Roll the window and store the computed health in the last bucket
94 const health = roll(buckets, time, 2000, 4, () => ({
95 i: 0,
96 ok: 0,
97 errors: 0
98 }));
99 extend(last(health), b);
100 return health;
101};
102
103// Store accumulated function call stats per function name
104// Warning: accumulatedStats is a mutable variable
105let accumulatedStats = {};
106
107// Return the accumulated stats for a function
108const stats = (name, time, roll) => {
109 const t = time || Date.now();
110 const s = accumulatedStats[name] || {
111 name: name,
112 time: t,
113 counts: [],
114 latencies: [],
115 health: [],
116 circuit: 'closed'
117 };
118 /* eslint no-extra-parens: 1 */
119 return roll !== false ? (() => {
120 const counts = rollCounts(s.counts, t);
121 return {
122 name: name,
123 time: t,
124 counts: counts,
125 latencies: rollLatencies(s.latencies, t),
126 health: rollHealth(s.health, t, counts),
127 circuit: s.circuit
128 };
129 })() : {
130 name: name,
131 time: t,
132 counts: s.counts,
133 latencies: s.latencies,
134 health: s.health,
135 circuit: s.circuit
136 };
137};
138
139// Reset the accumulated reliability stats for a function (but keep the
140// accumulated latencies)
141const reset = (name, time) => {
142 const t = time || Date.now();
143
144 // Warning: mutating variable accumulatedStats
145 /* eslint no-extra-parens: 1 */
146 const astats = ((s) => {
147 return s ? {
148 name: name,
149 time: t,
150 counts: [],
151 latencies: s.latencies,
152 health: [],
153 circuit: 'closed'
154 } :
155 {
156 name: name,
157 time: t,
158 counts: [],
159 latencies: [],
160 health: [],
161 circuit: 'closed'
162 };
163 })(accumulatedStats[name]);
164 accumulatedStats[name] = astats;
165
166 // Propagate new stats to all the listeners
167 debug('Emitting stats for function %s', name);
168 emitter.emit('message', {
169 metrics: {
170 stats: astats
171 }
172 });
173 return astats;
174};
175
176// Return all the accumulated stats
177const all = (time, roll) => {
178 const t = time || Date.now();
179 return map(keys(accumulatedStats), (k) => stats(k, t, roll));
180};
181
182// Process function call metrics and update accumulated call stats
183const accumulateStats = (name, time, latency, err, timeout,
184 reject, circuit) => {
185 debug('Accumulating stats for function %s', name);
186 debug('latency %d, err %s, timeout %d, reject %s, circuit %s',
187 latency, err, timeout, reject, circuit);
188
189 // Retrieve the current call stats for the given function
190 const s = stats(name, time, false);
191
192 // Compute up to date counts window and increment counts in the last bucket
193 const counts = rollCounts(s ? s.counts : [], time);
194 const updateCount = (c) => {
195 c.ok = c.ok + (!err && !timeout && !reject ? 1 : 0);
196 c.errors = c.errors + (err ? 1 : 0);
197 c.timeouts = c.timeouts + (timeout ? 1 : 0);
198 c.rejects = c.rejects + (reject ? 1 : 0);
199 debug('%d ok, %d errors, %d timeouts, %d rejects, %d count buckets',
200 c.ok, c.errors, c.timeouts, c.rejects, counts.length);
201 };
202 updateCount(last(counts));
203
204 // Compute up to date latencies window and add latency to the last bucket,
205 // up to the max bucket size
206 const latencies = rollLatencies(s ? s.latencies : [], time);
207 if(!err && !timeout && !reject) {
208 const updateLatency = (l) => {
209 l.latencies = l.latencies.length < 100 ?
210 l.latencies.concat([latency]) : l.latencies;
211 debug('%d latencies, %d latencies buckets',
212 l.latencies.length, latencies .length);
213 };
214 updateLatency(last(latencies));
215 }
216
217 // Compute up to date health report window
218 const health = rollHealth(s ? s.health : [], time, counts);
219 const h = last(health);
220 debug('%d ok, %d errors, %d health buckets', h.ok, h.errors, health.length);
221
222 // Store and return the new accumulated function call stats
223 // Warning: mutating variable accumulatedStats
224 const astats = {
225 name: name,
226 time: time,
227 counts: counts,
228 latencies: latencies,
229 health: health,
230 circuit: circuit
231 };
232 accumulatedStats[name] = astats;
233
234 // Propagate new stats to all the listeners
235 debug('Emitting stats for function %s', name);
236 emitter.emit('message', {
237 metrics: {
238 stats: astats
239 }
240 });
241 return astats;
242};
243
244// Process function call metrics messages and function call stats messages
245const onMessage = (msg) => {
246 if(msg.metrics) {
247 debug('Received message %o', keys(msg).concat(keys(msg.metrics)));
248 if(msg.metrics.call) {
249 // Process call metrics and emit updated accumulated call stats
250 const c = msg.metrics.call;
251 accumulateStats(
252 c.name, c.time, c.latency, c.error, c.timeout, c.reject, c.circuit);
253 }
254 if(msg.metrics.stats) {
255 // Store latest accumulated stats
256 debug('Storing stats for function %s', msg.metrics.stats.name);
257 accumulatedStats[msg.metrics.stats.name] = msg.metrics.stats;
258 }
259 }
260};
261
262// Determine the health of the app based on the accumulated metrics
263const healthy = (threshold) => {
264
265 // Go through each function call metrics
266 return find(all(Date.now(), false), (stat) => {
267
268 // Go through its health status and calculate total requests & errors
269 const total = reduce(stat.health, (a, c) => ({
270 requests: a.requests + a.errors + c.ok + c.errors,
271 errors: a.errors + c.errors
272 }), {
273 requests: 0,
274 errors: 0
275 });
276
277 const percent = 100 * (total.errors / total.requests || 1);
278 debug('%s has %d% failure rate', stat.name, percent);
279
280 // If one function call is not healthy, conclude that the app is
281 // not healthy.
282 return percent > (threshold || 5);
283
284 }) ? false : true;
285}
286
287calls.on('message', onMessage);
288
289// Export our public functions
290module.exports.report = report;
291module.exports.stats = stats;
292module.exports.reset = reset;
293module.exports.healthy = healthy;
294module.exports.all = all;
295module.exports.onMessage = onMessage;
296module.exports.on = on;