UNPKG

26.7 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelOptions } from './channel-options';
19import { ConnectivityState } from './connectivity-state';
20import { LogVerbosity, Status } from './constants';
21import { durationToMs, isDuration, msToDuration } from './duration';
22import {
23 ChannelControlHelper,
24 createChildChannelControlHelper,
25 registerLoadBalancerType,
26} from './experimental';
27import {
28 getFirstUsableConfig,
29 LoadBalancer,
30 LoadBalancingConfig,
31 validateLoadBalancingConfig,
32} from './load-balancer';
33import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
34import { PickArgs, Picker, PickResult, PickResultType } from './picker';
35import {
36 SubchannelAddress,
37 subchannelAddressToString,
38} from './subchannel-address';
39import {
40 BaseSubchannelWrapper,
41 ConnectivityStateListener,
42 SubchannelInterface,
43} from './subchannel-interface';
44import * as logging from './logging';
45
46const TRACER_NAME = 'outlier_detection';
47
48function trace(text: string): void {
49 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
50}
51
52const TYPE_NAME = 'outlier_detection';
53
54const OUTLIER_DETECTION_ENABLED =
55 (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
56
57export interface SuccessRateEjectionConfig {
58 readonly stdev_factor: number;
59 readonly enforcement_percentage: number;
60 readonly minimum_hosts: number;
61 readonly request_volume: number;
62}
63
64export interface FailurePercentageEjectionConfig {
65 readonly threshold: number;
66 readonly enforcement_percentage: number;
67 readonly minimum_hosts: number;
68 readonly request_volume: number;
69}
70
71const defaultSuccessRateEjectionConfig: SuccessRateEjectionConfig = {
72 stdev_factor: 1900,
73 enforcement_percentage: 100,
74 minimum_hosts: 5,
75 request_volume: 100,
76};
77
78const defaultFailurePercentageEjectionConfig: FailurePercentageEjectionConfig =
79 {
80 threshold: 85,
81 enforcement_percentage: 100,
82 minimum_hosts: 5,
83 request_volume: 50,
84 };
85
86type TypeofValues =
87 | 'object'
88 | 'boolean'
89 | 'function'
90 | 'number'
91 | 'string'
92 | 'undefined';
93
94function validateFieldType(
95 obj: any,
96 fieldName: string,
97 expectedType: TypeofValues,
98 objectName?: string
99) {
100 if (fieldName in obj && typeof obj[fieldName] !== expectedType) {
101 const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
102 throw new Error(
103 `outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[
104 fieldName
105 ]}`
106 );
107 }
108}
109
110function validatePositiveDuration(
111 obj: any,
112 fieldName: string,
113 objectName?: string
114) {
115 const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
116 if (fieldName in obj) {
117 if (!isDuration(obj[fieldName])) {
118 throw new Error(
119 `outlier detection config ${fullFieldName} parse error: expected Duration, got ${typeof obj[
120 fieldName
121 ]}`
122 );
123 }
124 if (
125 !(
126 obj[fieldName].seconds >= 0 &&
127 obj[fieldName].seconds <= 315_576_000_000 &&
128 obj[fieldName].nanos >= 0 &&
129 obj[fieldName].nanos <= 999_999_999
130 )
131 ) {
132 throw new Error(
133 `outlier detection config ${fullFieldName} parse error: values out of range for non-negative Duaration`
134 );
135 }
136 }
137}
138
139function validatePercentage(obj: any, fieldName: string, objectName?: string) {
140 const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
141 validateFieldType(obj, fieldName, 'number', objectName);
142 if (fieldName in obj && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
143 throw new Error(
144 `outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`
145 );
146 }
147}
148
149export class OutlierDetectionLoadBalancingConfig
150 implements LoadBalancingConfig
151{
152 private readonly intervalMs: number;
153 private readonly baseEjectionTimeMs: number;
154 private readonly maxEjectionTimeMs: number;
155 private readonly maxEjectionPercent: number;
156 private readonly successRateEjection: SuccessRateEjectionConfig | null;
157 private readonly failurePercentageEjection: FailurePercentageEjectionConfig | null;
158
159 constructor(
160 intervalMs: number | null,
161 baseEjectionTimeMs: number | null,
162 maxEjectionTimeMs: number | null,
163 maxEjectionPercent: number | null,
164 successRateEjection: Partial<SuccessRateEjectionConfig> | null,
165 failurePercentageEjection: Partial<FailurePercentageEjectionConfig> | null,
166 private readonly childPolicy: LoadBalancingConfig[]
167 ) {
168 if (
169 childPolicy.length > 0 &&
170 childPolicy[0].getLoadBalancerName() === 'pick_first'
171 ) {
172 throw new Error(
173 'outlier_detection LB policy cannot have a pick_first child policy'
174 );
175 }
176 this.intervalMs = intervalMs ?? 10_000;
177 this.baseEjectionTimeMs = baseEjectionTimeMs ?? 30_000;
178 this.maxEjectionTimeMs = maxEjectionTimeMs ?? 300_000;
179 this.maxEjectionPercent = maxEjectionPercent ?? 10;
180 this.successRateEjection = successRateEjection
181 ? { ...defaultSuccessRateEjectionConfig, ...successRateEjection }
182 : null;
183 this.failurePercentageEjection = failurePercentageEjection
184 ? {
185 ...defaultFailurePercentageEjectionConfig,
186 ...failurePercentageEjection,
187 }
188 : null;
189 }
190 getLoadBalancerName(): string {
191 return TYPE_NAME;
192 }
193 toJsonObject(): object {
194 return {
195 interval: msToDuration(this.intervalMs),
196 base_ejection_time: msToDuration(this.baseEjectionTimeMs),
197 max_ejection_time: msToDuration(this.maxEjectionTimeMs),
198 max_ejection_percent: this.maxEjectionPercent,
199 success_rate_ejection: this.successRateEjection,
200 failure_percentage_ejection: this.failurePercentageEjection,
201 child_policy: this.childPolicy.map(policy => policy.toJsonObject()),
202 };
203 }
204
205 getIntervalMs(): number {
206 return this.intervalMs;
207 }
208 getBaseEjectionTimeMs(): number {
209 return this.baseEjectionTimeMs;
210 }
211 getMaxEjectionTimeMs(): number {
212 return this.maxEjectionTimeMs;
213 }
214 getMaxEjectionPercent(): number {
215 return this.maxEjectionPercent;
216 }
217 getSuccessRateEjectionConfig(): SuccessRateEjectionConfig | null {
218 return this.successRateEjection;
219 }
220 getFailurePercentageEjectionConfig(): FailurePercentageEjectionConfig | null {
221 return this.failurePercentageEjection;
222 }
223 getChildPolicy(): LoadBalancingConfig[] {
224 return this.childPolicy;
225 }
226
227 copyWithChildPolicy(
228 childPolicy: LoadBalancingConfig[]
229 ): OutlierDetectionLoadBalancingConfig {
230 return new OutlierDetectionLoadBalancingConfig(
231 this.intervalMs,
232 this.baseEjectionTimeMs,
233 this.maxEjectionTimeMs,
234 this.maxEjectionPercent,
235 this.successRateEjection,
236 this.failurePercentageEjection,
237 childPolicy
238 );
239 }
240
241 static createFromJson(obj: any): OutlierDetectionLoadBalancingConfig {
242 validatePositiveDuration(obj, 'interval');
243 validatePositiveDuration(obj, 'base_ejection_time');
244 validatePositiveDuration(obj, 'max_ejection_time');
245 validatePercentage(obj, 'max_ejection_percent');
246 if ('success_rate_ejection' in obj) {
247 if (typeof obj.success_rate_ejection !== 'object') {
248 throw new Error(
249 'outlier detection config success_rate_ejection must be an object'
250 );
251 }
252 validateFieldType(
253 obj.success_rate_ejection,
254 'stdev_factor',
255 'number',
256 'success_rate_ejection'
257 );
258 validatePercentage(
259 obj.success_rate_ejection,
260 'enforcement_percentage',
261 'success_rate_ejection'
262 );
263 validateFieldType(
264 obj.success_rate_ejection,
265 'minimum_hosts',
266 'number',
267 'success_rate_ejection'
268 );
269 validateFieldType(
270 obj.success_rate_ejection,
271 'request_volume',
272 'number',
273 'success_rate_ejection'
274 );
275 }
276 if ('failure_percentage_ejection' in obj) {
277 if (typeof obj.failure_percentage_ejection !== 'object') {
278 throw new Error(
279 'outlier detection config failure_percentage_ejection must be an object'
280 );
281 }
282 validatePercentage(
283 obj.failure_percentage_ejection,
284 'threshold',
285 'failure_percentage_ejection'
286 );
287 validatePercentage(
288 obj.failure_percentage_ejection,
289 'enforcement_percentage',
290 'failure_percentage_ejection'
291 );
292 validateFieldType(
293 obj.failure_percentage_ejection,
294 'minimum_hosts',
295 'number',
296 'failure_percentage_ejection'
297 );
298 validateFieldType(
299 obj.failure_percentage_ejection,
300 'request_volume',
301 'number',
302 'failure_percentage_ejection'
303 );
304 }
305
306 return new OutlierDetectionLoadBalancingConfig(
307 obj.interval ? durationToMs(obj.interval) : null,
308 obj.base_ejection_time ? durationToMs(obj.base_ejection_time) : null,
309 obj.max_ejection_time ? durationToMs(obj.max_ejection_time) : null,
310 obj.max_ejection_percent ?? null,
311 obj.success_rate_ejection,
312 obj.failure_percentage_ejection,
313 obj.child_policy.map(validateLoadBalancingConfig)
314 );
315 }
316}
317
318class OutlierDetectionSubchannelWrapper
319 extends BaseSubchannelWrapper
320 implements SubchannelInterface
321{
322 private childSubchannelState: ConnectivityState;
323 private stateListeners: ConnectivityStateListener[] = [];
324 private ejected = false;
325 private refCount = 0;
326 constructor(
327 childSubchannel: SubchannelInterface,
328 private mapEntry?: MapEntry
329 ) {
330 super(childSubchannel);
331 this.childSubchannelState = childSubchannel.getConnectivityState();
332 childSubchannel.addConnectivityStateListener(
333 (subchannel, previousState, newState, keepaliveTime) => {
334 this.childSubchannelState = newState;
335 if (!this.ejected) {
336 for (const listener of this.stateListeners) {
337 listener(this, previousState, newState, keepaliveTime);
338 }
339 }
340 }
341 );
342 }
343
344 getConnectivityState(): ConnectivityState {
345 if (this.ejected) {
346 return ConnectivityState.TRANSIENT_FAILURE;
347 } else {
348 return this.childSubchannelState;
349 }
350 }
351
352 /**
353 * Add a listener function to be called whenever the wrapper's
354 * connectivity state changes.
355 * @param listener
356 */
357 addConnectivityStateListener(listener: ConnectivityStateListener) {
358 this.stateListeners.push(listener);
359 }
360
361 /**
362 * Remove a listener previously added with `addConnectivityStateListener`
363 * @param listener A reference to a function previously passed to
364 * `addConnectivityStateListener`
365 */
366 removeConnectivityStateListener(listener: ConnectivityStateListener) {
367 const listenerIndex = this.stateListeners.indexOf(listener);
368 if (listenerIndex > -1) {
369 this.stateListeners.splice(listenerIndex, 1);
370 }
371 }
372
373 ref() {
374 this.child.ref();
375 this.refCount += 1;
376 }
377
378 unref() {
379 this.child.unref();
380 this.refCount -= 1;
381 if (this.refCount <= 0) {
382 if (this.mapEntry) {
383 const index = this.mapEntry.subchannelWrappers.indexOf(this);
384 if (index >= 0) {
385 this.mapEntry.subchannelWrappers.splice(index, 1);
386 }
387 }
388 }
389 }
390
391 eject() {
392 this.ejected = true;
393 for (const listener of this.stateListeners) {
394 listener(
395 this,
396 this.childSubchannelState,
397 ConnectivityState.TRANSIENT_FAILURE,
398 -1
399 );
400 }
401 }
402
403 uneject() {
404 this.ejected = false;
405 for (const listener of this.stateListeners) {
406 listener(
407 this,
408 ConnectivityState.TRANSIENT_FAILURE,
409 this.childSubchannelState,
410 -1
411 );
412 }
413 }
414
415 getMapEntry(): MapEntry | undefined {
416 return this.mapEntry;
417 }
418
419 getWrappedSubchannel(): SubchannelInterface {
420 return this.child;
421 }
422}
423
424interface CallCountBucket {
425 success: number;
426 failure: number;
427}
428
429function createEmptyBucket(): CallCountBucket {
430 return {
431 success: 0,
432 failure: 0,
433 };
434}
435
436class CallCounter {
437 private activeBucket: CallCountBucket = createEmptyBucket();
438 private inactiveBucket: CallCountBucket = createEmptyBucket();
439 addSuccess() {
440 this.activeBucket.success += 1;
441 }
442 addFailure() {
443 this.activeBucket.failure += 1;
444 }
445 switchBuckets() {
446 this.inactiveBucket = this.activeBucket;
447 this.activeBucket = createEmptyBucket();
448 }
449 getLastSuccesses() {
450 return this.inactiveBucket.success;
451 }
452 getLastFailures() {
453 return this.inactiveBucket.failure;
454 }
455}
456
457interface MapEntry {
458 counter: CallCounter;
459 currentEjectionTimestamp: Date | null;
460 ejectionTimeMultiplier: number;
461 subchannelWrappers: OutlierDetectionSubchannelWrapper[];
462}
463
464class OutlierDetectionPicker implements Picker {
465 constructor(private wrappedPicker: Picker, private countCalls: boolean) {}
466 pick(pickArgs: PickArgs): PickResult {
467 const wrappedPick = this.wrappedPicker.pick(pickArgs);
468 if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
469 const subchannelWrapper =
470 wrappedPick.subchannel as OutlierDetectionSubchannelWrapper;
471 const mapEntry = subchannelWrapper.getMapEntry();
472 if (mapEntry) {
473 let onCallEnded = wrappedPick.onCallEnded;
474 if (this.countCalls) {
475 onCallEnded = statusCode => {
476 if (statusCode === Status.OK) {
477 mapEntry.counter.addSuccess();
478 } else {
479 mapEntry.counter.addFailure();
480 }
481 wrappedPick.onCallEnded?.(statusCode);
482 };
483 }
484 return {
485 ...wrappedPick,
486 subchannel: subchannelWrapper.getWrappedSubchannel(),
487 onCallEnded: onCallEnded,
488 };
489 } else {
490 return {
491 ...wrappedPick,
492 subchannel: subchannelWrapper.getWrappedSubchannel(),
493 };
494 }
495 } else {
496 return wrappedPick;
497 }
498 }
499}
500
501export class OutlierDetectionLoadBalancer implements LoadBalancer {
502 private childBalancer: ChildLoadBalancerHandler;
503 private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
504 private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
505 private ejectionTimer: NodeJS.Timeout;
506 private timerStartTime: Date | null = null;
507
508 constructor(channelControlHelper: ChannelControlHelper) {
509 this.childBalancer = new ChildLoadBalancerHandler(
510 createChildChannelControlHelper(channelControlHelper, {
511 createSubchannel: (
512 subchannelAddress: SubchannelAddress,
513 subchannelArgs: ChannelOptions
514 ) => {
515 const originalSubchannel = channelControlHelper.createSubchannel(
516 subchannelAddress,
517 subchannelArgs
518 );
519 const mapEntry = this.addressMap.get(
520 subchannelAddressToString(subchannelAddress)
521 );
522 const subchannelWrapper = new OutlierDetectionSubchannelWrapper(
523 originalSubchannel,
524 mapEntry
525 );
526 if (mapEntry?.currentEjectionTimestamp !== null) {
527 // If the address is ejected, propagate that to the new subchannel wrapper
528 subchannelWrapper.eject();
529 }
530 mapEntry?.subchannelWrappers.push(subchannelWrapper);
531 return subchannelWrapper;
532 },
533 updateState: (connectivityState: ConnectivityState, picker: Picker) => {
534 if (connectivityState === ConnectivityState.READY) {
535 channelControlHelper.updateState(
536 connectivityState,
537 new OutlierDetectionPicker(picker, this.isCountingEnabled())
538 );
539 } else {
540 channelControlHelper.updateState(connectivityState, picker);
541 }
542 },
543 })
544 );
545 this.ejectionTimer = setInterval(() => {}, 0);
546 clearInterval(this.ejectionTimer);
547 }
548
549 private isCountingEnabled(): boolean {
550 return (
551 this.latestConfig !== null &&
552 (this.latestConfig.getSuccessRateEjectionConfig() !== null ||
553 this.latestConfig.getFailurePercentageEjectionConfig() !== null)
554 );
555 }
556
557 private getCurrentEjectionPercent() {
558 let ejectionCount = 0;
559 for (const mapEntry of this.addressMap.values()) {
560 if (mapEntry.currentEjectionTimestamp !== null) {
561 ejectionCount += 1;
562 }
563 }
564 return (ejectionCount * 100) / this.addressMap.size;
565 }
566
567 private runSuccessRateCheck(ejectionTimestamp: Date) {
568 if (!this.latestConfig) {
569 return;
570 }
571 const successRateConfig = this.latestConfig.getSuccessRateEjectionConfig();
572 if (!successRateConfig) {
573 return;
574 }
575 trace('Running success rate check');
576 // Step 1
577 const targetRequestVolume = successRateConfig.request_volume;
578 let addresesWithTargetVolume = 0;
579 const successRates: number[] = [];
580 for (const [address, mapEntry] of this.addressMap) {
581 const successes = mapEntry.counter.getLastSuccesses();
582 const failures = mapEntry.counter.getLastFailures();
583 trace(
584 'Stats for ' +
585 address +
586 ': successes=' +
587 successes +
588 ' failures=' +
589 failures +
590 ' targetRequestVolume=' +
591 targetRequestVolume
592 );
593 if (successes + failures >= targetRequestVolume) {
594 addresesWithTargetVolume += 1;
595 successRates.push(successes / (successes + failures));
596 }
597 }
598 trace(
599 'Found ' +
600 addresesWithTargetVolume +
601 ' success rate candidates; currentEjectionPercent=' +
602 this.getCurrentEjectionPercent() +
603 ' successRates=[' +
604 successRates +
605 ']'
606 );
607 if (addresesWithTargetVolume < successRateConfig.minimum_hosts) {
608 return;
609 }
610
611 // Step 2
612 const successRateMean =
613 successRates.reduce((a, b) => a + b) / successRates.length;
614 let successRateDeviationSum = 0;
615 for (const rate of successRates) {
616 const deviation = rate - successRateMean;
617 successRateDeviationSum += deviation * deviation;
618 }
619 const successRateVariance = successRateDeviationSum / successRates.length;
620 const successRateStdev = Math.sqrt(successRateVariance);
621 const ejectionThreshold =
622 successRateMean -
623 successRateStdev * (successRateConfig.stdev_factor / 1000);
624 trace(
625 'stdev=' + successRateStdev + ' ejectionThreshold=' + ejectionThreshold
626 );
627
628 // Step 3
629 for (const [address, mapEntry] of this.addressMap.entries()) {
630 // Step 3.i
631 if (
632 this.getCurrentEjectionPercent() >=
633 this.latestConfig.getMaxEjectionPercent()
634 ) {
635 break;
636 }
637 // Step 3.ii
638 const successes = mapEntry.counter.getLastSuccesses();
639 const failures = mapEntry.counter.getLastFailures();
640 if (successes + failures < targetRequestVolume) {
641 continue;
642 }
643 // Step 3.iii
644 const successRate = successes / (successes + failures);
645 trace('Checking candidate ' + address + ' successRate=' + successRate);
646 if (successRate < ejectionThreshold) {
647 const randomNumber = Math.random() * 100;
648 trace(
649 'Candidate ' +
650 address +
651 ' randomNumber=' +
652 randomNumber +
653 ' enforcement_percentage=' +
654 successRateConfig.enforcement_percentage
655 );
656 if (randomNumber < successRateConfig.enforcement_percentage) {
657 trace('Ejecting candidate ' + address);
658 this.eject(mapEntry, ejectionTimestamp);
659 }
660 }
661 }
662 }
663
664 private runFailurePercentageCheck(ejectionTimestamp: Date) {
665 if (!this.latestConfig) {
666 return;
667 }
668 const failurePercentageConfig =
669 this.latestConfig.getFailurePercentageEjectionConfig();
670 if (!failurePercentageConfig) {
671 return;
672 }
673 trace(
674 'Running failure percentage check. threshold=' +
675 failurePercentageConfig.threshold +
676 ' request volume threshold=' +
677 failurePercentageConfig.request_volume
678 );
679 // Step 1
680 let addressesWithTargetVolume = 0;
681 for (const mapEntry of this.addressMap.values()) {
682 const successes = mapEntry.counter.getLastSuccesses();
683 const failures = mapEntry.counter.getLastFailures();
684 if (successes + failures >= failurePercentageConfig.request_volume) {
685 addressesWithTargetVolume += 1;
686 }
687 }
688 if (addressesWithTargetVolume < failurePercentageConfig.minimum_hosts) {
689 return;
690 }
691
692 // Step 2
693 for (const [address, mapEntry] of this.addressMap.entries()) {
694 // Step 2.i
695 if (
696 this.getCurrentEjectionPercent() >=
697 this.latestConfig.getMaxEjectionPercent()
698 ) {
699 break;
700 }
701 // Step 2.ii
702 const successes = mapEntry.counter.getLastSuccesses();
703 const failures = mapEntry.counter.getLastFailures();
704 trace('Candidate successes=' + successes + ' failures=' + failures);
705 if (successes + failures < failurePercentageConfig.request_volume) {
706 continue;
707 }
708 // Step 2.iii
709 const failurePercentage = (failures * 100) / (failures + successes);
710 if (failurePercentage > failurePercentageConfig.threshold) {
711 const randomNumber = Math.random() * 100;
712 trace(
713 'Candidate ' +
714 address +
715 ' randomNumber=' +
716 randomNumber +
717 ' enforcement_percentage=' +
718 failurePercentageConfig.enforcement_percentage
719 );
720 if (randomNumber < failurePercentageConfig.enforcement_percentage) {
721 trace('Ejecting candidate ' + address);
722 this.eject(mapEntry, ejectionTimestamp);
723 }
724 }
725 }
726 }
727
728 private eject(mapEntry: MapEntry, ejectionTimestamp: Date) {
729 mapEntry.currentEjectionTimestamp = new Date();
730 mapEntry.ejectionTimeMultiplier += 1;
731 for (const subchannelWrapper of mapEntry.subchannelWrappers) {
732 subchannelWrapper.eject();
733 }
734 }
735
736 private uneject(mapEntry: MapEntry) {
737 mapEntry.currentEjectionTimestamp = null;
738 for (const subchannelWrapper of mapEntry.subchannelWrappers) {
739 subchannelWrapper.uneject();
740 }
741 }
742
743 private switchAllBuckets() {
744 for (const mapEntry of this.addressMap.values()) {
745 mapEntry.counter.switchBuckets();
746 }
747 }
748
749 private startTimer(delayMs: number) {
750 this.ejectionTimer = setTimeout(() => this.runChecks(), delayMs);
751 this.ejectionTimer.unref?.();
752 }
753
754 private runChecks() {
755 const ejectionTimestamp = new Date();
756 trace('Ejection timer running');
757
758 this.switchAllBuckets();
759
760 if (!this.latestConfig) {
761 return;
762 }
763 this.timerStartTime = ejectionTimestamp;
764 this.startTimer(this.latestConfig.getIntervalMs());
765
766 this.runSuccessRateCheck(ejectionTimestamp);
767 this.runFailurePercentageCheck(ejectionTimestamp);
768
769 for (const [address, mapEntry] of this.addressMap.entries()) {
770 if (mapEntry.currentEjectionTimestamp === null) {
771 if (mapEntry.ejectionTimeMultiplier > 0) {
772 mapEntry.ejectionTimeMultiplier -= 1;
773 }
774 } else {
775 const baseEjectionTimeMs = this.latestConfig.getBaseEjectionTimeMs();
776 const maxEjectionTimeMs = this.latestConfig.getMaxEjectionTimeMs();
777 const returnTime = new Date(
778 mapEntry.currentEjectionTimestamp.getTime()
779 );
780 returnTime.setMilliseconds(
781 returnTime.getMilliseconds() +
782 Math.min(
783 baseEjectionTimeMs * mapEntry.ejectionTimeMultiplier,
784 Math.max(baseEjectionTimeMs, maxEjectionTimeMs)
785 )
786 );
787 if (returnTime < new Date()) {
788 trace('Unejecting ' + address);
789 this.uneject(mapEntry);
790 }
791 }
792 }
793 }
794
795 updateAddressList(
796 addressList: SubchannelAddress[],
797 lbConfig: LoadBalancingConfig,
798 attributes: { [key: string]: unknown }
799 ): void {
800 if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
801 return;
802 }
803 const subchannelAddresses = new Set<string>();
804 for (const address of addressList) {
805 subchannelAddresses.add(subchannelAddressToString(address));
806 }
807 for (const address of subchannelAddresses) {
808 if (!this.addressMap.has(address)) {
809 trace('Adding map entry for ' + address);
810 this.addressMap.set(address, {
811 counter: new CallCounter(),
812 currentEjectionTimestamp: null,
813 ejectionTimeMultiplier: 0,
814 subchannelWrappers: [],
815 });
816 }
817 }
818 for (const key of this.addressMap.keys()) {
819 if (!subchannelAddresses.has(key)) {
820 trace('Removing map entry for ' + key);
821 this.addressMap.delete(key);
822 }
823 }
824 const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
825 lbConfig.getChildPolicy(),
826 true
827 );
828 this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
829
830 if (
831 lbConfig.getSuccessRateEjectionConfig() ||
832 lbConfig.getFailurePercentageEjectionConfig()
833 ) {
834 if (this.timerStartTime) {
835 trace('Previous timer existed. Replacing timer');
836 clearTimeout(this.ejectionTimer);
837 const remainingDelay =
838 lbConfig.getIntervalMs() -
839 (new Date().getTime() - this.timerStartTime.getTime());
840 this.startTimer(remainingDelay);
841 } else {
842 trace('Starting new timer');
843 this.timerStartTime = new Date();
844 this.startTimer(lbConfig.getIntervalMs());
845 this.switchAllBuckets();
846 }
847 } else {
848 trace('Counting disabled. Cancelling timer.');
849 this.timerStartTime = null;
850 clearTimeout(this.ejectionTimer);
851 for (const mapEntry of this.addressMap.values()) {
852 this.uneject(mapEntry);
853 mapEntry.ejectionTimeMultiplier = 0;
854 }
855 }
856
857 this.latestConfig = lbConfig;
858 }
859 exitIdle(): void {
860 this.childBalancer.exitIdle();
861 }
862 resetBackoff(): void {
863 this.childBalancer.resetBackoff();
864 }
865 destroy(): void {
866 clearTimeout(this.ejectionTimer);
867 this.childBalancer.destroy();
868 }
869 getTypeName(): string {
870 return TYPE_NAME;
871 }
872}
873
874export function setup() {
875 if (OUTLIER_DETECTION_ENABLED) {
876 registerLoadBalancerType(
877 TYPE_NAME,
878 OutlierDetectionLoadBalancer,
879 OutlierDetectionLoadBalancingConfig
880 );
881 }
882}