UNPKG

16.1 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 LoadBalancer,
20 ChannelControlHelper,
21 LoadBalancingConfig,
22 registerDefaultLoadBalancerType,
23 registerLoadBalancerType,
24} from './load-balancer';
25import { ConnectivityState } from './connectivity-state';
26import {
27 QueuePicker,
28 Picker,
29 PickArgs,
30 CompletePickResult,
31 PickResultType,
32 UnavailablePicker,
33} from './picker';
34import {
35 SubchannelAddress,
36 subchannelAddressEqual,
37 subchannelAddressToString,
38} from './subchannel-address';
39import * as logging from './logging';
40import { LogVerbosity } from './constants';
41import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface';
42
43const TRACER_NAME = 'pick_first';
44
45function trace(text: string): void {
46 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
47}
48
49const TYPE_NAME = 'pick_first';
50
51/**
52 * Delay after starting a connection on a subchannel before starting a
53 * connection on the next subchannel in the list, for Happy Eyeballs algorithm.
54 */
55const CONNECTION_DELAY_INTERVAL_MS = 250;
56
57export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
58 getLoadBalancerName(): string {
59 return TYPE_NAME;
60 }
61
62 constructor() {}
63
64 toJsonObject(): object {
65 return {
66 [TYPE_NAME]: {},
67 };
68 }
69
70 // eslint-disable-next-line @typescript-eslint/no-explicit-any
71 static createFromJson(obj: any) {
72 return new PickFirstLoadBalancingConfig();
73 }
74}
75
76/**
77 * Picker for a `PickFirstLoadBalancer` in the READY state. Always returns the
78 * picked subchannel.
79 */
80class PickFirstPicker implements Picker {
81 constructor(private subchannel: SubchannelInterface) {}
82
83 pick(pickArgs: PickArgs): CompletePickResult {
84 return {
85 pickResultType: PickResultType.COMPLETE,
86 subchannel: this.subchannel,
87 status: null,
88 onCallStarted: null,
89 onCallEnded: null
90 };
91 }
92}
93
94interface ConnectivityStateCounts {
95 [ConnectivityState.CONNECTING]: number;
96 [ConnectivityState.IDLE]: number;
97 [ConnectivityState.READY]: number;
98 [ConnectivityState.SHUTDOWN]: number;
99 [ConnectivityState.TRANSIENT_FAILURE]: number;
100}
101
102export class PickFirstLoadBalancer implements LoadBalancer {
103 /**
104 * The list of backend addresses most recently passed to `updateAddressList`.
105 */
106 private latestAddressList: SubchannelAddress[] = [];
107 /**
108 * The list of subchannels this load balancer is currently attempting to
109 * connect to.
110 */
111 private subchannels: SubchannelInterface[] = [];
112 /**
113 * The current connectivity state of the load balancer.
114 */
115 private currentState: ConnectivityState = ConnectivityState.IDLE;
116 /**
117 * The index within the `subchannels` array of the subchannel with the most
118 * recently started connection attempt.
119 */
120 private currentSubchannelIndex = 0;
121
122 private subchannelStateCounts: ConnectivityStateCounts;
123 /**
124 * The currently picked subchannel used for making calls. Populated if
125 * and only if the load balancer's current state is READY. In that case,
126 * the subchannel's current state is also READY.
127 */
128 private currentPick: SubchannelInterface | null = null;
129 /**
130 * Listener callback attached to each subchannel in the `subchannels` list
131 * while establishing a connection.
132 */
133 private subchannelStateListener: ConnectivityStateListener;
134 /**
135 * Listener callback attached to the current picked subchannel.
136 */
137 private pickedSubchannelStateListener: ConnectivityStateListener;
138 /**
139 * Timer reference for the timer tracking when to start
140 */
141 private connectionDelayTimeout: NodeJS.Timeout;
142
143 private triedAllSubchannels = false;
144
145 /**
146 * Load balancer that attempts to connect to each backend in the address list
147 * in order, and picks the first one that connects, using it for every
148 * request.
149 * @param channelControlHelper `ChannelControlHelper` instance provided by
150 * this load balancer's owner.
151 */
152 constructor(private readonly channelControlHelper: ChannelControlHelper) {
153 this.subchannelStateCounts = {
154 [ConnectivityState.CONNECTING]: 0,
155 [ConnectivityState.IDLE]: 0,
156 [ConnectivityState.READY]: 0,
157 [ConnectivityState.SHUTDOWN]: 0,
158 [ConnectivityState.TRANSIENT_FAILURE]: 0,
159 };
160 this.subchannelStateListener = (
161 subchannel: SubchannelInterface,
162 previousState: ConnectivityState,
163 newState: ConnectivityState
164 ) => {
165 this.subchannelStateCounts[previousState] -= 1;
166 this.subchannelStateCounts[newState] += 1;
167 /* If the subchannel we most recently attempted to start connecting
168 * to goes into TRANSIENT_FAILURE, immediately try to start
169 * connecting to the next one instead of waiting for the connection
170 * delay timer. */
171 if (
172 subchannel.getRealSubchannel() === this.subchannels[this.currentSubchannelIndex].getRealSubchannel() &&
173 newState === ConnectivityState.TRANSIENT_FAILURE
174 ) {
175 this.startNextSubchannelConnecting();
176 }
177 if (newState === ConnectivityState.READY) {
178 this.pickSubchannel(subchannel);
179 return;
180 } else {
181 if (
182 this.triedAllSubchannels &&
183 this.subchannelStateCounts[ConnectivityState.IDLE] ===
184 this.subchannels.length
185 ) {
186 /* If all of the subchannels are IDLE we should go back to a
187 * basic IDLE state where there is no subchannel list to avoid
188 * holding unused resources. We do not reset triedAllSubchannels
189 * because that is a reminder to request reresolution the next time
190 * this LB policy needs to connect. */
191 this.resetSubchannelList(false);
192 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
193 return;
194 }
195 if (this.currentPick === null) {
196 if (this.triedAllSubchannels) {
197 let newLBState: ConnectivityState;
198 if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
199 newLBState = ConnectivityState.CONNECTING;
200 } else if (
201 this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] >
202 0
203 ) {
204 newLBState = ConnectivityState.TRANSIENT_FAILURE;
205 } else {
206 newLBState = ConnectivityState.IDLE;
207 }
208 if (newLBState !== this.currentState) {
209 if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
210 this.updateState(newLBState, new UnavailablePicker());
211 } else {
212 this.updateState(newLBState, new QueuePicker(this));
213 }
214 }
215 } else {
216 this.updateState(
217 ConnectivityState.CONNECTING,
218 new QueuePicker(this)
219 );
220 }
221 }
222 }
223 };
224 this.pickedSubchannelStateListener = (
225 subchannel: SubchannelInterface,
226 previousState: ConnectivityState,
227 newState: ConnectivityState
228 ) => {
229 if (newState !== ConnectivityState.READY) {
230 this.currentPick = null;
231 subchannel.unref();
232 subchannel.removeConnectivityStateListener(
233 this.pickedSubchannelStateListener
234 );
235 this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
236 if (this.subchannels.length > 0) {
237 if (this.triedAllSubchannels) {
238 let newLBState: ConnectivityState;
239 if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
240 newLBState = ConnectivityState.CONNECTING;
241 } else if (
242 this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] >
243 0
244 ) {
245 newLBState = ConnectivityState.TRANSIENT_FAILURE;
246 } else {
247 newLBState = ConnectivityState.IDLE;
248 }
249 if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
250 this.updateState(newLBState, new UnavailablePicker());
251 } else {
252 this.updateState(newLBState, new QueuePicker(this));
253 }
254 } else {
255 this.updateState(
256 ConnectivityState.CONNECTING,
257 new QueuePicker(this)
258 );
259 }
260 } else {
261 /* We don't need to backoff here because this only happens if a
262 * subchannel successfully connects then disconnects, so it will not
263 * create a loop of attempting to connect to an unreachable backend
264 */
265 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
266 }
267 }
268 };
269 this.connectionDelayTimeout = setTimeout(() => {}, 0);
270 clearTimeout(this.connectionDelayTimeout);
271 }
272
273 private startNextSubchannelConnecting() {
274 if (this.triedAllSubchannels) {
275 return;
276 }
277 for (const [index, subchannel] of this.subchannels.entries()) {
278 if (index > this.currentSubchannelIndex) {
279 const subchannelState = subchannel.getConnectivityState();
280 if (
281 subchannelState === ConnectivityState.IDLE ||
282 subchannelState === ConnectivityState.CONNECTING
283 ) {
284 this.startConnecting(index);
285 return;
286 }
287 }
288 }
289 this.triedAllSubchannels = true;
290 }
291
292 /**
293 * Have a single subchannel in the `subchannels` list start connecting.
294 * @param subchannelIndex The index into the `subchannels` list.
295 */
296 private startConnecting(subchannelIndex: number) {
297 clearTimeout(this.connectionDelayTimeout);
298 this.currentSubchannelIndex = subchannelIndex;
299 if (
300 this.subchannels[subchannelIndex].getConnectivityState() ===
301 ConnectivityState.IDLE
302 ) {
303 trace(
304 'Start connecting to subchannel with address ' +
305 this.subchannels[subchannelIndex].getAddress()
306 );
307 process.nextTick(() => {
308 this.subchannels[subchannelIndex].startConnecting();
309 });
310 }
311 this.connectionDelayTimeout = setTimeout(() => {
312 this.startNextSubchannelConnecting();
313 }, CONNECTION_DELAY_INTERVAL_MS);
314 }
315
316 private pickSubchannel(subchannel: SubchannelInterface) {
317 trace('Pick subchannel with address ' + subchannel.getAddress());
318 if (this.currentPick !== null) {
319 this.currentPick.unref();
320 this.currentPick.removeConnectivityStateListener(
321 this.pickedSubchannelStateListener
322 );
323 }
324 this.currentPick = subchannel;
325 this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
326 subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
327 subchannel.ref();
328 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
329 this.resetSubchannelList();
330 clearTimeout(this.connectionDelayTimeout);
331 }
332
333 private updateState(newState: ConnectivityState, picker: Picker) {
334 trace(
335 ConnectivityState[this.currentState] +
336 ' -> ' +
337 ConnectivityState[newState]
338 );
339 this.currentState = newState;
340 this.channelControlHelper.updateState(newState, picker);
341 }
342
343 private resetSubchannelList(resetTriedAllSubchannels = true) {
344 for (const subchannel of this.subchannels) {
345 subchannel.removeConnectivityStateListener(this.subchannelStateListener);
346 subchannel.unref();
347 this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
348 }
349 this.currentSubchannelIndex = 0;
350 this.subchannelStateCounts = {
351 [ConnectivityState.CONNECTING]: 0,
352 [ConnectivityState.IDLE]: 0,
353 [ConnectivityState.READY]: 0,
354 [ConnectivityState.SHUTDOWN]: 0,
355 [ConnectivityState.TRANSIENT_FAILURE]: 0,
356 };
357 this.subchannels = [];
358 if (resetTriedAllSubchannels) {
359 this.triedAllSubchannels = false;
360 }
361 }
362
363 /**
364 * Start connecting to the address list most recently passed to
365 * `updateAddressList`.
366 */
367 private connectToAddressList(): void {
368 this.resetSubchannelList();
369 trace(
370 'Connect to address list ' +
371 this.latestAddressList.map((address) =>
372 subchannelAddressToString(address)
373 )
374 );
375 this.subchannels = this.latestAddressList.map((address) =>
376 this.channelControlHelper.createSubchannel(address, {})
377 );
378 for (const subchannel of this.subchannels) {
379 subchannel.ref();
380 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
381 }
382 for (const subchannel of this.subchannels) {
383 subchannel.addConnectivityStateListener(this.subchannelStateListener);
384 this.subchannelStateCounts[subchannel.getConnectivityState()] += 1;
385 if (subchannel.getConnectivityState() === ConnectivityState.READY) {
386 this.pickSubchannel(subchannel);
387 this.resetSubchannelList();
388 return;
389 }
390 }
391 for (const [index, subchannel] of this.subchannels.entries()) {
392 const subchannelState = subchannel.getConnectivityState();
393 if (
394 subchannelState === ConnectivityState.IDLE ||
395 subchannelState === ConnectivityState.CONNECTING
396 ) {
397 this.startConnecting(index);
398 if (this.currentPick === null) {
399 this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
400 }
401 return;
402 }
403 }
404 // If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE
405 if (this.currentPick === null) {
406 this.updateState(
407 ConnectivityState.TRANSIENT_FAILURE,
408 new UnavailablePicker()
409 );
410 }
411 }
412
413 updateAddressList(
414 addressList: SubchannelAddress[],
415 lbConfig: LoadBalancingConfig
416 ): void {
417 // lbConfig has no useful information for pick first load balancing
418 /* To avoid unnecessary churn, we only do something with this address list
419 * if we're not currently trying to establish a connection, or if the new
420 * address list is different from the existing one */
421 if (
422 this.subchannels.length === 0 ||
423 this.latestAddressList.length !== addressList.length ||
424 !this.latestAddressList.every(
425 (value, index) => addressList[index] && subchannelAddressEqual(addressList[index], value)
426 )
427 ) {
428 this.latestAddressList = addressList;
429 this.connectToAddressList();
430 }
431 }
432
433 exitIdle() {
434 if (
435 this.currentState === ConnectivityState.IDLE ||
436 this.triedAllSubchannels
437 ) {
438 this.channelControlHelper.requestReresolution();
439 }
440 for (const subchannel of this.subchannels) {
441 subchannel.startConnecting();
442 }
443 if (this.currentState === ConnectivityState.IDLE) {
444 if (this.latestAddressList.length > 0) {
445 this.connectToAddressList();
446 }
447 }
448 }
449
450 resetBackoff() {
451 /* The pick first load balancer does not have a connection backoff, so this
452 * does nothing */
453 }
454
455 destroy() {
456 this.resetSubchannelList();
457 if (this.currentPick !== null) {
458 /* Unref can cause a state change, which can cause a change in the value
459 * of this.currentPick, so we hold a local reference to make sure that
460 * does not impact this function. */
461 const currentPick = this.currentPick;
462 currentPick.unref();
463 currentPick.removeConnectivityStateListener(
464 this.pickedSubchannelStateListener
465 );
466 this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef());
467 }
468 }
469
470 getTypeName(): string {
471 return TYPE_NAME;
472 }
473}
474
475export function setup(): void {
476 registerLoadBalancerType(
477 TYPE_NAME,
478 PickFirstLoadBalancer,
479 PickFirstLoadBalancingConfig
480 );
481 registerDefaultLoadBalancerType(TYPE_NAME);
482}