UNPKG

15.9 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 LoadBalancer,
20 ChannelControlHelper,
21 LoadBalancingConfig,
22 registerDefaultLoadBalancerType,
23 registerLoadBalancerType,
24} from './load-balancer';
25import { ConnectivityState } from './connectivity-state';
26import {
27 QueuePicker,
28 Picker,
29 PickArgs,
30 CompletePickResult,
31 PickResultType,
32 UnavailablePicker,
33} from './picker';
34import {
35 SubchannelAddress,
36 subchannelAddressToString,
37} from './subchannel-address';
38import * as logging from './logging';
39import { LogVerbosity } from './constants';
40import { SubchannelInterface, ConnectivityStateListener } from './subchannel-interface';
41
42const TRACER_NAME = 'pick_first';
43
44function trace(text: string): void {
45 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
46}
47
48const TYPE_NAME = 'pick_first';
49
50/**
51 * Delay after starting a connection on a subchannel before starting a
52 * connection on the next subchannel in the list, for Happy Eyeballs algorithm.
53 */
54const CONNECTION_DELAY_INTERVAL_MS = 250;
55
56export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
57 getLoadBalancerName(): string {
58 return TYPE_NAME;
59 }
60
61 constructor() {}
62
63 toJsonObject(): object {
64 return {
65 [TYPE_NAME]: {},
66 };
67 }
68
69 // eslint-disable-next-line @typescript-eslint/no-explicit-any
70 static createFromJson(obj: any) {
71 return new PickFirstLoadBalancingConfig();
72 }
73}
74
75/**
76 * Picker for a `PickFirstLoadBalancer` in the READY state. Always returns the
77 * picked subchannel.
78 */
79class PickFirstPicker implements Picker {
80 constructor(private subchannel: SubchannelInterface) {}
81
82 pick(pickArgs: PickArgs): CompletePickResult {
83 return {
84 pickResultType: PickResultType.COMPLETE,
85 subchannel: this.subchannel,
86 status: null,
87 extraFilterFactories: [],
88 onCallStarted: null,
89 };
90 }
91}
92
93interface ConnectivityStateCounts {
94 [ConnectivityState.CONNECTING]: number;
95 [ConnectivityState.IDLE]: number;
96 [ConnectivityState.READY]: number;
97 [ConnectivityState.SHUTDOWN]: number;
98 [ConnectivityState.TRANSIENT_FAILURE]: number;
99}
100
101export class PickFirstLoadBalancer implements LoadBalancer {
102 /**
103 * The list of backend addresses most recently passed to `updateAddressList`.
104 */
105 private latestAddressList: SubchannelAddress[] = [];
106 /**
107 * The list of subchannels this load balancer is currently attempting to
108 * connect to.
109 */
110 private subchannels: SubchannelInterface[] = [];
111 /**
112 * The current connectivity state of the load balancer.
113 */
114 private currentState: ConnectivityState = ConnectivityState.IDLE;
115 /**
116 * The index within the `subchannels` array of the subchannel with the most
117 * recently started connection attempt.
118 */
119 private currentSubchannelIndex = 0;
120
121 private subchannelStateCounts: ConnectivityStateCounts;
122 /**
123 * The currently picked subchannel used for making calls. Populated if
124 * and only if the load balancer's current state is READY. In that case,
125 * the subchannel's current state is also READY.
126 */
127 private currentPick: SubchannelInterface | null = null;
128 /**
129 * Listener callback attached to each subchannel in the `subchannels` list
130 * while establishing a connection.
131 */
132 private subchannelStateListener: ConnectivityStateListener;
133 /**
134 * Listener callback attached to the current picked subchannel.
135 */
136 private pickedSubchannelStateListener: ConnectivityStateListener;
137 /**
138 * Timer reference for the timer tracking when to start
139 */
140 private connectionDelayTimeout: NodeJS.Timeout;
141
142 private triedAllSubchannels = false;
143
144 /**
145 * Load balancer that attempts to connect to each backend in the address list
146 * in order, and picks the first one that connects, using it for every
147 * request.
148 * @param channelControlHelper `ChannelControlHelper` instance provided by
149 * this load balancer's owner.
150 */
151 constructor(private readonly channelControlHelper: ChannelControlHelper) {
152 this.subchannelStateCounts = {
153 [ConnectivityState.CONNECTING]: 0,
154 [ConnectivityState.IDLE]: 0,
155 [ConnectivityState.READY]: 0,
156 [ConnectivityState.SHUTDOWN]: 0,
157 [ConnectivityState.TRANSIENT_FAILURE]: 0,
158 };
159 this.subchannelStateListener = (
160 subchannel: SubchannelInterface,
161 previousState: ConnectivityState,
162 newState: ConnectivityState
163 ) => {
164 this.subchannelStateCounts[previousState] -= 1;
165 this.subchannelStateCounts[newState] += 1;
166 /* If the subchannel we most recently attempted to start connecting
167 * to goes into TRANSIENT_FAILURE, immediately try to start
168 * connecting to the next one instead of waiting for the connection
169 * delay timer. */
170 if (
171 subchannel === this.subchannels[this.currentSubchannelIndex] &&
172 newState === ConnectivityState.TRANSIENT_FAILURE
173 ) {
174 this.startNextSubchannelConnecting();
175 }
176 if (newState === ConnectivityState.READY) {
177 this.pickSubchannel(subchannel);
178 return;
179 } else {
180 if (
181 this.triedAllSubchannels &&
182 this.subchannelStateCounts[ConnectivityState.IDLE] ===
183 this.subchannels.length
184 ) {
185 /* If all of the subchannels are IDLE we should go back to a
186 * basic IDLE state where there is no subchannel list to avoid
187 * holding unused resources. We do not reset triedAllSubchannels
188 * because that is a reminder to request reresolution the next time
189 * this LB policy needs to connect. */
190 this.resetSubchannelList(false);
191 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
192 return;
193 }
194 if (this.currentPick === null) {
195 if (this.triedAllSubchannels) {
196 let newLBState: ConnectivityState;
197 if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
198 newLBState = ConnectivityState.CONNECTING;
199 } else if (
200 this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] >
201 0
202 ) {
203 newLBState = ConnectivityState.TRANSIENT_FAILURE;
204 } else {
205 newLBState = ConnectivityState.IDLE;
206 }
207 if (newLBState !== this.currentState) {
208 if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
209 this.updateState(newLBState, new UnavailablePicker());
210 } else {
211 this.updateState(newLBState, new QueuePicker(this));
212 }
213 }
214 } else {
215 this.updateState(
216 ConnectivityState.CONNECTING,
217 new QueuePicker(this)
218 );
219 }
220 }
221 }
222 };
223 this.pickedSubchannelStateListener = (
224 subchannel: SubchannelInterface,
225 previousState: ConnectivityState,
226 newState: ConnectivityState
227 ) => {
228 if (newState !== ConnectivityState.READY) {
229 this.currentPick = null;
230 subchannel.unref();
231 subchannel.removeConnectivityStateListener(
232 this.pickedSubchannelStateListener
233 );
234 this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
235 if (this.subchannels.length > 0) {
236 if (this.triedAllSubchannels) {
237 let newLBState: ConnectivityState;
238 if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
239 newLBState = ConnectivityState.CONNECTING;
240 } else if (
241 this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] >
242 0
243 ) {
244 newLBState = ConnectivityState.TRANSIENT_FAILURE;
245 } else {
246 newLBState = ConnectivityState.IDLE;
247 }
248 if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
249 this.updateState(newLBState, new UnavailablePicker());
250 } else {
251 this.updateState(newLBState, new QueuePicker(this));
252 }
253 } else {
254 this.updateState(
255 ConnectivityState.CONNECTING,
256 new QueuePicker(this)
257 );
258 }
259 } else {
260 /* We don't need to backoff here because this only happens if a
261 * subchannel successfully connects then disconnects, so it will not
262 * create a loop of attempting to connect to an unreachable backend
263 */
264 this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
265 }
266 }
267 };
268 this.connectionDelayTimeout = setTimeout(() => {}, 0);
269 clearTimeout(this.connectionDelayTimeout);
270 }
271
272 private startNextSubchannelConnecting() {
273 if (this.triedAllSubchannels) {
274 return;
275 }
276 for (const [index, subchannel] of this.subchannels.entries()) {
277 if (index > this.currentSubchannelIndex) {
278 const subchannelState = subchannel.getConnectivityState();
279 if (
280 subchannelState === ConnectivityState.IDLE ||
281 subchannelState === ConnectivityState.CONNECTING
282 ) {
283 this.startConnecting(index);
284 return;
285 }
286 }
287 }
288 this.triedAllSubchannels = true;
289 }
290
291 /**
292 * Have a single subchannel in the `subchannels` list start connecting.
293 * @param subchannelIndex The index into the `subchannels` list.
294 */
295 private startConnecting(subchannelIndex: number) {
296 clearTimeout(this.connectionDelayTimeout);
297 this.currentSubchannelIndex = subchannelIndex;
298 if (
299 this.subchannels[subchannelIndex].getConnectivityState() ===
300 ConnectivityState.IDLE
301 ) {
302 trace(
303 'Start connecting to subchannel with address ' +
304 this.subchannels[subchannelIndex].getAddress()
305 );
306 process.nextTick(() => {
307 this.subchannels[subchannelIndex].startConnecting();
308 });
309 }
310 this.connectionDelayTimeout = setTimeout(() => {
311 this.startNextSubchannelConnecting();
312 }, CONNECTION_DELAY_INTERVAL_MS);
313 }
314
315 private pickSubchannel(subchannel: SubchannelInterface) {
316 trace('Pick subchannel with address ' + subchannel.getAddress());
317 if (this.currentPick !== null) {
318 this.currentPick.unref();
319 this.currentPick.removeConnectivityStateListener(
320 this.pickedSubchannelStateListener
321 );
322 }
323 this.currentPick = subchannel;
324 this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
325 subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
326 subchannel.ref();
327 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
328 this.resetSubchannelList();
329 clearTimeout(this.connectionDelayTimeout);
330 }
331
332 private updateState(newState: ConnectivityState, picker: Picker) {
333 trace(
334 ConnectivityState[this.currentState] +
335 ' -> ' +
336 ConnectivityState[newState]
337 );
338 this.currentState = newState;
339 this.channelControlHelper.updateState(newState, picker);
340 }
341
342 private resetSubchannelList(resetTriedAllSubchannels = true) {
343 for (const subchannel of this.subchannels) {
344 subchannel.removeConnectivityStateListener(this.subchannelStateListener);
345 subchannel.unref();
346 this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
347 }
348 this.currentSubchannelIndex = 0;
349 this.subchannelStateCounts = {
350 [ConnectivityState.CONNECTING]: 0,
351 [ConnectivityState.IDLE]: 0,
352 [ConnectivityState.READY]: 0,
353 [ConnectivityState.SHUTDOWN]: 0,
354 [ConnectivityState.TRANSIENT_FAILURE]: 0,
355 };
356 this.subchannels = [];
357 if (resetTriedAllSubchannels) {
358 this.triedAllSubchannels = false;
359 }
360 }
361
362 /**
363 * Start connecting to the address list most recently passed to
364 * `updateAddressList`.
365 */
366 private connectToAddressList(): void {
367 this.resetSubchannelList();
368 trace(
369 'Connect to address list ' +
370 this.latestAddressList.map((address) =>
371 subchannelAddressToString(address)
372 )
373 );
374 this.subchannels = this.latestAddressList.map((address) =>
375 this.channelControlHelper.createSubchannel(address, {})
376 );
377 for (const subchannel of this.subchannels) {
378 subchannel.ref();
379 this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
380 }
381 for (const subchannel of this.subchannels) {
382 subchannel.addConnectivityStateListener(this.subchannelStateListener);
383 this.subchannelStateCounts[subchannel.getConnectivityState()] += 1;
384 if (subchannel.getConnectivityState() === ConnectivityState.READY) {
385 this.pickSubchannel(subchannel);
386 this.resetSubchannelList();
387 return;
388 }
389 }
390 for (const [index, subchannel] of this.subchannels.entries()) {
391 const subchannelState = subchannel.getConnectivityState();
392 if (
393 subchannelState === ConnectivityState.IDLE ||
394 subchannelState === ConnectivityState.CONNECTING
395 ) {
396 this.startConnecting(index);
397 if (this.currentPick === null) {
398 this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
399 }
400 return;
401 }
402 }
403 // If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE
404 if (this.currentPick === null) {
405 this.updateState(
406 ConnectivityState.TRANSIENT_FAILURE,
407 new UnavailablePicker()
408 );
409 }
410 }
411
412 updateAddressList(
413 addressList: SubchannelAddress[],
414 lbConfig: LoadBalancingConfig
415 ): void {
416 // lbConfig has no useful information for pick first load balancing
417 /* To avoid unnecessary churn, we only do something with this address list
418 * if we're not currently trying to establish a connection, or if the new
419 * address list is different from the existing one */
420 if (
421 this.subchannels.length === 0 ||
422 !this.latestAddressList.every(
423 (value, index) => addressList[index] === value
424 )
425 ) {
426 this.latestAddressList = addressList;
427 this.connectToAddressList();
428 }
429 }
430
431 exitIdle() {
432 if (
433 this.currentState === ConnectivityState.IDLE ||
434 this.triedAllSubchannels
435 ) {
436 this.channelControlHelper.requestReresolution();
437 }
438 for (const subchannel of this.subchannels) {
439 subchannel.startConnecting();
440 }
441 if (this.currentState === ConnectivityState.IDLE) {
442 if (this.latestAddressList.length > 0) {
443 this.connectToAddressList();
444 }
445 }
446 }
447
448 resetBackoff() {
449 /* The pick first load balancer does not have a connection backoff, so this
450 * does nothing */
451 }
452
453 destroy() {
454 this.resetSubchannelList();
455 if (this.currentPick !== null) {
456 /* Unref can cause a state change, which can cause a change in the value
457 * of this.currentPick, so we hold a local reference to make sure that
458 * does not impact this function. */
459 const currentPick = this.currentPick;
460 currentPick.unref();
461 currentPick.removeConnectivityStateListener(
462 this.pickedSubchannelStateListener
463 );
464 this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef());
465 }
466 }
467
468 getTypeName(): string {
469 return TYPE_NAME;
470 }
471}
472
473export function setup(): void {
474 registerLoadBalancerType(
475 TYPE_NAME,
476 PickFirstLoadBalancer,
477 PickFirstLoadBalancingConfig
478 );
479 registerDefaultLoadBalancerType(TYPE_NAME);
480}