UNPKG

14.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 LoadBalancer,
20 ChannelControlHelper,
21 LoadBalancingConfig,
22 registerDefaultLoadBalancerType,
23 registerLoadBalancerType,
24} from './load-balancer';
25import { ConnectivityState } from './connectivity-state';
26import {
27 QueuePicker,
28 Picker,
29 PickArgs,
30 CompletePickResult,
31 PickResultType,
32 UnavailablePicker,
33} from './picker';
34import { SubchannelAddress } from './subchannel-address';
35import * as logging from './logging';
36import { LogVerbosity } from './constants';
37import {
38 SubchannelInterface,
39 ConnectivityStateListener,
40} from './subchannel-interface';
41
42const TRACER_NAME = 'pick_first';
43
44function trace(text: string): void {
45 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
46}
47
48const TYPE_NAME = 'pick_first';
49
50/**
51 * Delay after starting a connection on a subchannel before starting a
52 * connection on the next subchannel in the list, for Happy Eyeballs algorithm.
53 */
54const CONNECTION_DELAY_INTERVAL_MS = 250;
55
56export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
57 constructor(private readonly shuffleAddressList: boolean) {}
58
59 getLoadBalancerName(): string {
60 return TYPE_NAME;
61 }
62
63 toJsonObject(): object {
64 return {
65 [TYPE_NAME]: {
66 shuffleAddressList: this.shuffleAddressList,
67 },
68 };
69 }
70
71 getShuffleAddressList() {
72 return this.shuffleAddressList;
73 }
74
75 // eslint-disable-next-line @typescript-eslint/no-explicit-any
76 static createFromJson(obj: any) {
77 if (
78 'shuffleAddressList' in obj &&
79 !(typeof obj.shuffleAddressList === 'boolean')
80 ) {
81 throw new Error(
82 'pick_first config field shuffleAddressList must be a boolean if provided'
83 );
84 }
85 return new PickFirstLoadBalancingConfig(obj.shuffleAddressList === true);
86 }
87}
88
89/**
90 * Picker for a `PickFirstLoadBalancer` in the READY state. Always returns the
91 * picked subchannel.
92 */
93class PickFirstPicker implements Picker {
94 constructor(private subchannel: SubchannelInterface) {}
95
96 pick(pickArgs: PickArgs): CompletePickResult {
97 return {
98 pickResultType: PickResultType.COMPLETE,
99 subchannel: this.subchannel,
100 status: null,
101 onCallStarted: null,
102 onCallEnded: null,
103 };
104 }
105}
106
107interface SubchannelChild {
108 subchannel: SubchannelInterface;
109 hasReportedTransientFailure: boolean;
110}
111
112/**
113 * Return a new array with the elements of the input array in a random order
114 * @param list The input array
115 * @returns A shuffled array of the elements of list
116 */
117export function shuffled<T>(list: T[]): T[] {
118 const result = list.slice();
119 for (let i = result.length - 1; i > 1; i--) {
120 const j = Math.floor(Math.random() * (i + 1));
121 const temp = result[i];
122 result[i] = result[j];
123 result[j] = temp;
124 }
125 return result;
126}
127
128export class PickFirstLoadBalancer implements LoadBalancer {
129 /**
130 * The list of subchannels this load balancer is currently attempting to
131 * connect to.
132 */
133 private children: SubchannelChild[] = [];
134 /**
135 * The current connectivity state of the load balancer.
136 */
137 private currentState: ConnectivityState = ConnectivityState.IDLE;
138 /**
139 * The index within the `subchannels` array of the subchannel with the most
140 * recently started connection attempt.
141 */
142 private currentSubchannelIndex = 0;
143 /**
144 * The currently picked subchannel used for making calls. Populated if
145 * and only if the load balancer's current state is READY. In that case,
146 * the subchannel's current state is also READY.
147 */
148 private currentPick: SubchannelInterface | null = null;
149 /**
150 * Listener callback attached to each subchannel in the `subchannels` list
151 * while establishing a connection.
152 */
153 private subchannelStateListener: ConnectivityStateListener = (
154 subchannel,
155 previousState,
156 newState
157 ) => {
158 this.onSubchannelStateUpdate(subchannel, previousState, newState);
159 };
160 /**
161 * Timer reference for the timer tracking when to start
162 */
163 private connectionDelayTimeout: NodeJS.Timeout;
164
165 private triedAllSubchannels = false;
166
167 /**
168 * The LB policy enters sticky TRANSIENT_FAILURE mode when all
169 * subchannels have failed to connect at least once, and it stays in that
170 * mode until a connection attempt is successful. While in sticky TF mode,
171 * the LB policy continuously attempts to connect to all of its subchannels.
172 */
173 private stickyTransientFailureMode = false;
174
175 /**
176 * Load balancer that attempts to connect to each backend in the address list
177 * in order, and picks the first one that connects, using it for every
178 * request.
179 * @param channelControlHelper `ChannelControlHelper` instance provided by
180 * this load balancer's owner.
181 */
182 constructor(private readonly channelControlHelper: ChannelControlHelper) {
183 this.connectionDelayTimeout = setTimeout(() => {}, 0);
184 clearTimeout(this.connectionDelayTimeout);
185 }
186
187 private allChildrenHaveReportedTF(): boolean {
188 return this.children.every(child => child.hasReportedTransientFailure);
189 }
190
191 private calculateAndReportNewState() {
192 if (this.currentPick) {
193 this.updateState(
194 ConnectivityState.READY,
195 new PickFirstPicker(this.currentPick)
196 );
197 } else if (this.children.length === 0) {
198 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
199 } else {
200 if (this.stickyTransientFailureMode) {
201 this.updateState(
202 ConnectivityState.TRANSIENT_FAILURE,
203 new UnavailablePicker()
204 );
205 } else {
206 this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
207 }
208 }
209 }
210
211 private maybeEnterStickyTransientFailureMode() {
212 if (this.stickyTransientFailureMode) {
213 return;
214 }
215 if (!this.allChildrenHaveReportedTF()) {
216 return;
217 }
218 this.stickyTransientFailureMode = true;
219 this.channelControlHelper.requestReresolution();
220 for (const { subchannel } of this.children) {
221 subchannel.startConnecting();
222 }
223 this.calculateAndReportNewState();
224 }
225
226 private removeCurrentPick() {
227 if (this.currentPick !== null) {
228 /* Unref can cause a state change, which can cause a change in the value
229 * of this.currentPick, so we hold a local reference to make sure that
230 * does not impact this function. */
231 const currentPick = this.currentPick;
232 this.currentPick = null;
233 currentPick.unref();
234 currentPick.removeConnectivityStateListener(this.subchannelStateListener);
235 this.channelControlHelper.removeChannelzChild(
236 currentPick.getChannelzRef()
237 );
238 }
239 }
240
241 private onSubchannelStateUpdate(
242 subchannel: SubchannelInterface,
243 previousState: ConnectivityState,
244 newState: ConnectivityState
245 ) {
246 if (this.currentPick?.realSubchannelEquals(subchannel)) {
247 if (newState !== ConnectivityState.READY) {
248 this.removeCurrentPick();
249 this.calculateAndReportNewState();
250 this.channelControlHelper.requestReresolution();
251 }
252 return;
253 }
254 for (const [index, child] of this.children.entries()) {
255 if (subchannel.realSubchannelEquals(child.subchannel)) {
256 if (newState === ConnectivityState.READY) {
257 this.pickSubchannel(child.subchannel);
258 }
259 if (newState === ConnectivityState.TRANSIENT_FAILURE) {
260 child.hasReportedTransientFailure = true;
261 this.maybeEnterStickyTransientFailureMode();
262 if (index === this.currentSubchannelIndex) {
263 this.startNextSubchannelConnecting(index + 1);
264 }
265 }
266 child.subchannel.startConnecting();
267 return;
268 }
269 }
270 }
271
272 private startNextSubchannelConnecting(startIndex: number) {
273 clearTimeout(this.connectionDelayTimeout);
274 if (this.triedAllSubchannels || this.stickyTransientFailureMode) {
275 return;
276 }
277 for (const [index, child] of this.children.entries()) {
278 if (index >= startIndex) {
279 const subchannelState = child.subchannel.getConnectivityState();
280 if (
281 subchannelState === ConnectivityState.IDLE ||
282 subchannelState === ConnectivityState.CONNECTING
283 ) {
284 this.startConnecting(index);
285 return;
286 }
287 }
288 }
289 this.triedAllSubchannels = true;
290 this.maybeEnterStickyTransientFailureMode();
291 }
292
293 /**
294 * Have a single subchannel in the `subchannels` list start connecting.
295 * @param subchannelIndex The index into the `subchannels` list.
296 */
297 private startConnecting(subchannelIndex: number) {
298 clearTimeout(this.connectionDelayTimeout);
299 this.currentSubchannelIndex = subchannelIndex;
300 if (
301 this.children[subchannelIndex].subchannel.getConnectivityState() ===
302 ConnectivityState.IDLE
303 ) {
304 trace(
305 'Start connecting to subchannel with address ' +
306 this.children[subchannelIndex].subchannel.getAddress()
307 );
308 process.nextTick(() => {
309 this.children[subchannelIndex]?.subchannel.startConnecting();
310 });
311 }
312 this.connectionDelayTimeout = setTimeout(() => {
313 this.startNextSubchannelConnecting(subchannelIndex + 1);
314 }, CONNECTION_DELAY_INTERVAL_MS).unref?.();
315 }
316
317 private pickSubchannel(subchannel: SubchannelInterface) {
318 if (this.currentPick && subchannel.realSubchannelEquals(this.currentPick)) {
319 return;
320 }
321 trace('Pick subchannel with address ' + subchannel.getAddress());
322 this.stickyTransientFailureMode = false;
323 if (this.currentPick !== null) {
324 this.currentPick.unref();
325 this.channelControlHelper.removeChannelzChild(
326 this.currentPick.getChannelzRef()
327 );
328 this.currentPick.removeConnectivityStateListener(
329 this.subchannelStateListener
330 );
331 }
332 this.currentPick = subchannel;
333 subchannel.ref();
334 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
335 this.resetSubchannelList();
336 clearTimeout(this.connectionDelayTimeout);
337 this.calculateAndReportNewState();
338 }
339
340 private updateState(newState: ConnectivityState, picker: Picker) {
341 trace(
342 ConnectivityState[this.currentState] +
343 ' -> ' +
344 ConnectivityState[newState]
345 );
346 this.currentState = newState;
347 this.channelControlHelper.updateState(newState, picker);
348 }
349
350 private resetSubchannelList() {
351 for (const child of this.children) {
352 if (child.subchannel !== this.currentPick) {
353 /* The connectivity state listener is the same whether the subchannel
354 * is in the list of children or it is the currentPick, so if it is in
355 * both, removing it here would cause problems. In particular, that
356 * always happens immediately after the subchannel is picked. */
357 child.subchannel.removeConnectivityStateListener(
358 this.subchannelStateListener
359 );
360 }
361 /* Refs are counted independently for the children list and the
362 * currentPick, so we call unref whether or not the child is the
363 * currentPick. Channelz child references are also refcounted, so
364 * removeChannelzChild can be handled the same way. */
365 child.subchannel.unref();
366 this.channelControlHelper.removeChannelzChild(
367 child.subchannel.getChannelzRef()
368 );
369 }
370 this.currentSubchannelIndex = 0;
371 this.children = [];
372 this.triedAllSubchannels = false;
373 }
374
375 updateAddressList(
376 addressList: SubchannelAddress[],
377 lbConfig: LoadBalancingConfig
378 ): void {
379 if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
380 return;
381 }
382 /* Previously, an update would be discarded if it was identical to the
383 * previous update, to minimize churn. Now the DNS resolver is
384 * rate-limited, so that is less of a concern. */
385 if (lbConfig.getShuffleAddressList()) {
386 addressList = shuffled(addressList);
387 }
388 const newChildrenList = addressList.map(address => ({
389 subchannel: this.channelControlHelper.createSubchannel(address, {}),
390 hasReportedTransientFailure: false,
391 }));
392 /* Ref each subchannel before resetting the list, to ensure that
393 * subchannels shared between the list don't drop to 0 refs during the
394 * transition. */
395 for (const { subchannel } of newChildrenList) {
396 subchannel.ref();
397 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
398 }
399 this.resetSubchannelList();
400 this.children = newChildrenList;
401 for (const { subchannel } of this.children) {
402 subchannel.addConnectivityStateListener(this.subchannelStateListener);
403 if (subchannel.getConnectivityState() === ConnectivityState.READY) {
404 this.pickSubchannel(subchannel);
405 return;
406 }
407 }
408 for (const child of this.children) {
409 if (
410 child.subchannel.getConnectivityState() ===
411 ConnectivityState.TRANSIENT_FAILURE
412 ) {
413 child.hasReportedTransientFailure = true;
414 }
415 }
416 this.startNextSubchannelConnecting(0);
417 this.calculateAndReportNewState();
418 }
419
420 exitIdle() {
421 /* The pick_first LB policy is only in the IDLE state if it has no
422 * addresses to try to connect to and it has no picked subchannel.
423 * In that case, there is no meaningful action that can be taken here. */
424 }
425
426 resetBackoff() {
427 /* The pick first load balancer does not have a connection backoff, so this
428 * does nothing */
429 }
430
431 destroy() {
432 this.resetSubchannelList();
433 this.removeCurrentPick();
434 }
435
436 getTypeName(): string {
437 return TYPE_NAME;
438 }
439}
440
441export function setup(): void {
442 registerLoadBalancerType(
443 TYPE_NAME,
444 PickFirstLoadBalancer,
445 PickFirstLoadBalancingConfig
446 );
447 registerDefaultLoadBalancerType(TYPE_NAME);
448}