UNPKG

19.9 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 LoadBalancer,
20 ChannelControlHelper,
21 TypedLoadBalancingConfig,
22 registerDefaultLoadBalancerType,
23 registerLoadBalancerType,
24 createChildChannelControlHelper,
25} from './load-balancer';
26import { ConnectivityState } from './connectivity-state';
27import {
28 QueuePicker,
29 Picker,
30 PickArgs,
31 CompletePickResult,
32 PickResultType,
33 UnavailablePicker,
34} from './picker';
35import { Endpoint, SubchannelAddress } from './subchannel-address';
36import * as logging from './logging';
37import { LogVerbosity } from './constants';
38import {
39 SubchannelInterface,
40 ConnectivityStateListener,
41 HealthListener,
42} from './subchannel-interface';
43import { isTcpSubchannelAddress } from './subchannel-address';
44import { isIPv6 } from 'net';
45import { ChannelOptions } from './channel-options';
46
47const TRACER_NAME = 'pick_first';
48
49function trace(text: string): void {
50 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
51}
52
53const TYPE_NAME = 'pick_first';
54
55/**
56 * Delay after starting a connection on a subchannel before starting a
57 * connection on the next subchannel in the list, for Happy Eyeballs algorithm.
58 */
59const CONNECTION_DELAY_INTERVAL_MS = 250;
60
61export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
62 constructor(private readonly shuffleAddressList: boolean) {}
63
64 getLoadBalancerName(): string {
65 return TYPE_NAME;
66 }
67
68 toJsonObject(): object {
69 return {
70 [TYPE_NAME]: {
71 shuffleAddressList: this.shuffleAddressList,
72 },
73 };
74 }
75
76 getShuffleAddressList() {
77 return this.shuffleAddressList;
78 }
79
80 // eslint-disable-next-line @typescript-eslint/no-explicit-any
81 static createFromJson(obj: any) {
82 if (
83 'shuffleAddressList' in obj &&
84 !(typeof obj.shuffleAddressList === 'boolean')
85 ) {
86 throw new Error(
87 'pick_first config field shuffleAddressList must be a boolean if provided'
88 );
89 }
90 return new PickFirstLoadBalancingConfig(obj.shuffleAddressList === true);
91 }
92}
93
94/**
95 * Picker for a `PickFirstLoadBalancer` in the READY state. Always returns the
96 * picked subchannel.
97 */
98class PickFirstPicker implements Picker {
99 constructor(private subchannel: SubchannelInterface) {}
100
101 pick(pickArgs: PickArgs): CompletePickResult {
102 return {
103 pickResultType: PickResultType.COMPLETE,
104 subchannel: this.subchannel,
105 status: null,
106 onCallStarted: null,
107 onCallEnded: null,
108 };
109 }
110}
111
112interface SubchannelChild {
113 subchannel: SubchannelInterface;
114 hasReportedTransientFailure: boolean;
115}
116
117/**
118 * Return a new array with the elements of the input array in a random order
119 * @param list The input array
120 * @returns A shuffled array of the elements of list
121 */
122export function shuffled<T>(list: T[]): T[] {
123 const result = list.slice();
124 for (let i = result.length - 1; i > 1; i--) {
125 const j = Math.floor(Math.random() * (i + 1));
126 const temp = result[i];
127 result[i] = result[j];
128 result[j] = temp;
129 }
130 return result;
131}
132
133/**
134 * Interleave addresses in addressList by family in accordance with RFC-8304 section 4
135 * @param addressList
136 * @returns
137 */
138function interleaveAddressFamilies(
139 addressList: SubchannelAddress[]
140): SubchannelAddress[] {
141 const result: SubchannelAddress[] = [];
142 const ipv6Addresses: SubchannelAddress[] = [];
143 const ipv4Addresses: SubchannelAddress[] = [];
144 const ipv6First =
145 isTcpSubchannelAddress(addressList[0]) && isIPv6(addressList[0].host);
146 for (const address of addressList) {
147 if (isTcpSubchannelAddress(address) && isIPv6(address.host)) {
148 ipv6Addresses.push(address);
149 } else {
150 ipv4Addresses.push(address);
151 }
152 }
153 const firstList = ipv6First ? ipv6Addresses : ipv4Addresses;
154 const secondList = ipv6First ? ipv4Addresses : ipv6Addresses;
155 for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) {
156 if (i < firstList.length) {
157 result.push(firstList[i]);
158 }
159 if (i < secondList.length) {
160 result.push(secondList[i]);
161 }
162 }
163 return result;
164}
165
166const REPORT_HEALTH_STATUS_OPTION_NAME =
167 'grpc-node.internal.pick-first.report_health_status';
168
169export class PickFirstLoadBalancer implements LoadBalancer {
170 /**
171 * The list of subchannels this load balancer is currently attempting to
172 * connect to.
173 */
174 private children: SubchannelChild[] = [];
175 /**
176 * The current connectivity state of the load balancer.
177 */
178 private currentState: ConnectivityState = ConnectivityState.IDLE;
179 /**
180 * The index within the `subchannels` array of the subchannel with the most
181 * recently started connection attempt.
182 */
183 private currentSubchannelIndex = 0;
184 /**
185 * The currently picked subchannel used for making calls. Populated if
186 * and only if the load balancer's current state is READY. In that case,
187 * the subchannel's current state is also READY.
188 */
189 private currentPick: SubchannelInterface | null = null;
190 /**
191 * Listener callback attached to each subchannel in the `subchannels` list
192 * while establishing a connection.
193 */
194 private subchannelStateListener: ConnectivityStateListener = (
195 subchannel,
196 previousState,
197 newState,
198 keepaliveTime,
199 errorMessage
200 ) => {
201 this.onSubchannelStateUpdate(
202 subchannel,
203 previousState,
204 newState,
205 errorMessage
206 );
207 };
208
209 private pickedSubchannelHealthListener: HealthListener = () =>
210 this.calculateAndReportNewState();
211 /**
212 * Timer reference for the timer tracking when to start
213 */
214 private connectionDelayTimeout: NodeJS.Timeout;
215
216 private triedAllSubchannels = false;
217
218 /**
219 * The LB policy enters sticky TRANSIENT_FAILURE mode when all
220 * subchannels have failed to connect at least once, and it stays in that
221 * mode until a connection attempt is successful. While in sticky TF mode,
222 * the LB policy continuously attempts to connect to all of its subchannels.
223 */
224 private stickyTransientFailureMode = false;
225
226 private reportHealthStatus: boolean;
227
228 /**
229 * Indicates whether we called channelControlHelper.requestReresolution since
230 * the last call to updateAddressList
231 */
232 private requestedResolutionSinceLastUpdate = false;
233
234 /**
235 * The most recent error reported by any subchannel as it transitioned to
236 * TRANSIENT_FAILURE.
237 */
238 private lastError: string | null = null;
239
240 private latestAddressList: SubchannelAddress[] | null = null;
241
242 /**
243 * Load balancer that attempts to connect to each backend in the address list
244 * in order, and picks the first one that connects, using it for every
245 * request.
246 * @param channelControlHelper `ChannelControlHelper` instance provided by
247 * this load balancer's owner.
248 */
249 constructor(
250 private readonly channelControlHelper: ChannelControlHelper,
251 options: ChannelOptions
252 ) {
253 this.connectionDelayTimeout = setTimeout(() => {}, 0);
254 clearTimeout(this.connectionDelayTimeout);
255 this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
256 }
257
258 private allChildrenHaveReportedTF(): boolean {
259 return this.children.every(child => child.hasReportedTransientFailure);
260 }
261
262 private calculateAndReportNewState() {
263 if (this.currentPick) {
264 if (this.reportHealthStatus && !this.currentPick.isHealthy()) {
265 this.updateState(
266 ConnectivityState.TRANSIENT_FAILURE,
267 new UnavailablePicker({
268 details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`,
269 })
270 );
271 } else {
272 this.updateState(
273 ConnectivityState.READY,
274 new PickFirstPicker(this.currentPick)
275 );
276 }
277 } else if (this.children.length === 0) {
278 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
279 } else {
280 if (this.stickyTransientFailureMode) {
281 this.updateState(
282 ConnectivityState.TRANSIENT_FAILURE,
283 new UnavailablePicker({
284 details: `No connection established. Last error: ${this.lastError}`,
285 })
286 );
287 } else {
288 this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
289 }
290 }
291 }
292
293 private requestReresolution() {
294 this.requestedResolutionSinceLastUpdate = true;
295 this.channelControlHelper.requestReresolution();
296 }
297
298 private maybeEnterStickyTransientFailureMode() {
299 if (!this.allChildrenHaveReportedTF()) {
300 return;
301 }
302 if (!this.requestedResolutionSinceLastUpdate) {
303 /* Each time we get an update we reset each subchannel's
304 * hasReportedTransientFailure flag, so the next time we get to this
305 * point after that, each subchannel has reported TRANSIENT_FAILURE
306 * at least once since then. That is the trigger for requesting
307 * reresolution, whether or not the LB policy is already in sticky TF
308 * mode. */
309 this.requestReresolution();
310 }
311 if (this.stickyTransientFailureMode) {
312 return;
313 }
314 this.stickyTransientFailureMode = true;
315 for (const { subchannel } of this.children) {
316 subchannel.startConnecting();
317 }
318 this.calculateAndReportNewState();
319 }
320
321 private removeCurrentPick() {
322 if (this.currentPick !== null) {
323 /* Unref can cause a state change, which can cause a change in the value
324 * of this.currentPick, so we hold a local reference to make sure that
325 * does not impact this function. */
326 const currentPick = this.currentPick;
327 this.currentPick = null;
328 currentPick.unref();
329 currentPick.removeConnectivityStateListener(this.subchannelStateListener);
330 this.channelControlHelper.removeChannelzChild(
331 currentPick.getChannelzRef()
332 );
333 if (this.reportHealthStatus) {
334 currentPick.removeHealthStateWatcher(
335 this.pickedSubchannelHealthListener
336 );
337 }
338 }
339 }
340
341 private onSubchannelStateUpdate(
342 subchannel: SubchannelInterface,
343 previousState: ConnectivityState,
344 newState: ConnectivityState,
345 errorMessage?: string
346 ) {
347 if (this.currentPick?.realSubchannelEquals(subchannel)) {
348 if (newState !== ConnectivityState.READY) {
349 this.removeCurrentPick();
350 this.calculateAndReportNewState();
351 this.requestReresolution();
352 }
353 return;
354 }
355 for (const [index, child] of this.children.entries()) {
356 if (subchannel.realSubchannelEquals(child.subchannel)) {
357 if (newState === ConnectivityState.READY) {
358 this.pickSubchannel(child.subchannel);
359 }
360 if (newState === ConnectivityState.TRANSIENT_FAILURE) {
361 child.hasReportedTransientFailure = true;
362 if (errorMessage) {
363 this.lastError = errorMessage;
364 }
365 this.maybeEnterStickyTransientFailureMode();
366 if (index === this.currentSubchannelIndex) {
367 this.startNextSubchannelConnecting(index + 1);
368 }
369 }
370 child.subchannel.startConnecting();
371 return;
372 }
373 }
374 }
375
376 private startNextSubchannelConnecting(startIndex: number) {
377 clearTimeout(this.connectionDelayTimeout);
378 if (this.triedAllSubchannels) {
379 return;
380 }
381 for (const [index, child] of this.children.entries()) {
382 if (index >= startIndex) {
383 const subchannelState = child.subchannel.getConnectivityState();
384 if (
385 subchannelState === ConnectivityState.IDLE ||
386 subchannelState === ConnectivityState.CONNECTING
387 ) {
388 this.startConnecting(index);
389 return;
390 }
391 }
392 }
393 this.triedAllSubchannels = true;
394 this.maybeEnterStickyTransientFailureMode();
395 }
396
397 /**
398 * Have a single subchannel in the `subchannels` list start connecting.
399 * @param subchannelIndex The index into the `subchannels` list.
400 */
401 private startConnecting(subchannelIndex: number) {
402 clearTimeout(this.connectionDelayTimeout);
403 this.currentSubchannelIndex = subchannelIndex;
404 if (
405 this.children[subchannelIndex].subchannel.getConnectivityState() ===
406 ConnectivityState.IDLE
407 ) {
408 trace(
409 'Start connecting to subchannel with address ' +
410 this.children[subchannelIndex].subchannel.getAddress()
411 );
412 process.nextTick(() => {
413 this.children[subchannelIndex]?.subchannel.startConnecting();
414 });
415 }
416 this.connectionDelayTimeout = setTimeout(() => {
417 this.startNextSubchannelConnecting(subchannelIndex + 1);
418 }, CONNECTION_DELAY_INTERVAL_MS);
419 this.connectionDelayTimeout.unref?.();
420 }
421
422 private pickSubchannel(subchannel: SubchannelInterface) {
423 if (this.currentPick && subchannel.realSubchannelEquals(this.currentPick)) {
424 return;
425 }
426 trace('Pick subchannel with address ' + subchannel.getAddress());
427 this.stickyTransientFailureMode = false;
428 this.removeCurrentPick();
429 this.currentPick = subchannel;
430 subchannel.ref();
431 if (this.reportHealthStatus) {
432 subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener);
433 }
434 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
435 this.resetSubchannelList();
436 clearTimeout(this.connectionDelayTimeout);
437 this.calculateAndReportNewState();
438 }
439
440 private updateState(newState: ConnectivityState, picker: Picker) {
441 trace(
442 ConnectivityState[this.currentState] +
443 ' -> ' +
444 ConnectivityState[newState]
445 );
446 this.currentState = newState;
447 this.channelControlHelper.updateState(newState, picker);
448 }
449
450 private resetSubchannelList() {
451 for (const child of this.children) {
452 if (
453 !(
454 this.currentPick &&
455 child.subchannel.realSubchannelEquals(this.currentPick)
456 )
457 ) {
458 /* The connectivity state listener is the same whether the subchannel
459 * is in the list of children or it is the currentPick, so if it is in
460 * both, removing it here would cause problems. In particular, that
461 * always happens immediately after the subchannel is picked. */
462 child.subchannel.removeConnectivityStateListener(
463 this.subchannelStateListener
464 );
465 }
466 /* Refs are counted independently for the children list and the
467 * currentPick, so we call unref whether or not the child is the
468 * currentPick. Channelz child references are also refcounted, so
469 * removeChannelzChild can be handled the same way. */
470 child.subchannel.unref();
471 this.channelControlHelper.removeChannelzChild(
472 child.subchannel.getChannelzRef()
473 );
474 }
475 this.currentSubchannelIndex = 0;
476 this.children = [];
477 this.triedAllSubchannels = false;
478 this.requestedResolutionSinceLastUpdate = false;
479 }
480
481 private connectToAddressList(addressList: SubchannelAddress[]) {
482 const newChildrenList = addressList.map(address => ({
483 subchannel: this.channelControlHelper.createSubchannel(address, {}),
484 hasReportedTransientFailure: false,
485 }));
486 /* Ref each subchannel before resetting the list, to ensure that
487 * subchannels shared between the list don't drop to 0 refs during the
488 * transition. */
489 for (const { subchannel } of newChildrenList) {
490 subchannel.ref();
491 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
492 }
493 this.resetSubchannelList();
494 this.children = newChildrenList;
495 for (const { subchannel } of this.children) {
496 subchannel.addConnectivityStateListener(this.subchannelStateListener);
497 if (subchannel.getConnectivityState() === ConnectivityState.READY) {
498 this.pickSubchannel(subchannel);
499 return;
500 }
501 }
502 for (const child of this.children) {
503 if (
504 child.subchannel.getConnectivityState() ===
505 ConnectivityState.TRANSIENT_FAILURE
506 ) {
507 child.hasReportedTransientFailure = true;
508 }
509 }
510 this.startNextSubchannelConnecting(0);
511 this.calculateAndReportNewState();
512 }
513
514 updateAddressList(
515 endpointList: Endpoint[],
516 lbConfig: TypedLoadBalancingConfig
517 ): void {
518 if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
519 return;
520 }
521 /* Previously, an update would be discarded if it was identical to the
522 * previous update, to minimize churn. Now the DNS resolver is
523 * rate-limited, so that is less of a concern. */
524 if (lbConfig.getShuffleAddressList()) {
525 endpointList = shuffled(endpointList);
526 }
527 const rawAddressList = ([] as SubchannelAddress[]).concat(
528 ...endpointList.map(endpoint => endpoint.addresses)
529 );
530 if (rawAddressList.length === 0) {
531 throw new Error('No addresses in endpoint list passed to pick_first');
532 }
533 const addressList = interleaveAddressFamilies(rawAddressList);
534 this.latestAddressList = addressList;
535 this.connectToAddressList(addressList);
536 }
537
538 exitIdle() {
539 if (
540 this.currentState === ConnectivityState.IDLE &&
541 this.latestAddressList
542 ) {
543 this.connectToAddressList(this.latestAddressList);
544 }
545 }
546
547 resetBackoff() {
548 /* The pick first load balancer does not have a connection backoff, so this
549 * does nothing */
550 }
551
552 destroy() {
553 this.resetSubchannelList();
554 this.removeCurrentPick();
555 }
556
557 getTypeName(): string {
558 return TYPE_NAME;
559 }
560}
561
562const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
563
564/**
565 * This class handles the leaf load balancing operations for a single endpoint.
566 * It is a thin wrapper around a PickFirstLoadBalancer with a different API
567 * that more closely reflects how it will be used as a leaf balancer.
568 */
569export class LeafLoadBalancer {
570 private pickFirstBalancer: PickFirstLoadBalancer;
571 private latestState: ConnectivityState = ConnectivityState.IDLE;
572 private latestPicker: Picker;
573 constructor(
574 private endpoint: Endpoint,
575 channelControlHelper: ChannelControlHelper,
576 options: ChannelOptions
577 ) {
578 const childChannelControlHelper = createChildChannelControlHelper(
579 channelControlHelper,
580 {
581 updateState: (connectivityState, picker) => {
582 this.latestState = connectivityState;
583 this.latestPicker = picker;
584 channelControlHelper.updateState(connectivityState, picker);
585 },
586 }
587 );
588 this.pickFirstBalancer = new PickFirstLoadBalancer(
589 childChannelControlHelper,
590 { ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
591 );
592 this.latestPicker = new QueuePicker(this.pickFirstBalancer);
593 }
594
595 startConnecting() {
596 this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
597 }
598
599 /**
600 * Update the endpoint associated with this LeafLoadBalancer to a new
601 * endpoint. Does not trigger connection establishment if a connection
602 * attempt is not already in progress.
603 * @param newEndpoint
604 */
605 updateEndpoint(newEndpoint: Endpoint) {
606 this.endpoint = newEndpoint;
607 if (this.latestState !== ConnectivityState.IDLE) {
608 this.startConnecting();
609 }
610 }
611
612 getConnectivityState() {
613 return this.latestState;
614 }
615
616 getPicker() {
617 return this.latestPicker;
618 }
619
620 getEndpoint() {
621 return this.endpoint;
622 }
623
624 exitIdle() {
625 this.pickFirstBalancer.exitIdle();
626 }
627
628 destroy() {
629 this.pickFirstBalancer.destroy();
630 }
631}
632
633export function setup(): void {
634 registerLoadBalancerType(
635 TYPE_NAME,
636 PickFirstLoadBalancer,
637 PickFirstLoadBalancingConfig
638 );
639 registerDefaultLoadBalancerType(TYPE_NAME);
640}