1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import {
|
19 | LoadBalancer,
|
20 | ChannelControlHelper,
|
21 | LoadBalancingConfig,
|
22 | registerLoadBalancerType,
|
23 | } from './load-balancer';
|
24 | import { ConnectivityState } from './connectivity-state';
|
25 | import {
|
26 | QueuePicker,
|
27 | Picker,
|
28 | PickArgs,
|
29 | CompletePickResult,
|
30 | PickResultType,
|
31 | UnavailablePicker,
|
32 | } from './picker';
|
33 | import {
|
34 | SubchannelAddress,
|
35 | subchannelAddressToString,
|
36 | } from './subchannel-address';
|
37 | import * as logging from './logging';
|
38 | import { LogVerbosity } from './constants';
|
39 | import { ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
|
40 |
|
41 | const TRACER_NAME = 'round_robin';
|
42 |
|
43 | function trace(text: string): void {
|
44 | logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
45 | }
|
46 |
|
47 | const TYPE_NAME = 'round_robin';
|
48 |
|
49 | class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
|
50 | getLoadBalancerName(): string {
|
51 | return TYPE_NAME;
|
52 | }
|
53 |
|
54 | constructor() {}
|
55 |
|
56 | toJsonObject(): object {
|
57 | return {
|
58 | [TYPE_NAME]: {},
|
59 | };
|
60 | }
|
61 |
|
62 |
|
63 | static createFromJson(obj: any) {
|
64 | return new RoundRobinLoadBalancingConfig();
|
65 | }
|
66 | }
|
67 |
|
68 | class RoundRobinPicker implements Picker {
|
69 | constructor(
|
70 | private readonly subchannelList: SubchannelInterface[],
|
71 | private nextIndex = 0
|
72 | ) {}
|
73 |
|
74 | pick(pickArgs: PickArgs): CompletePickResult {
|
75 | const pickedSubchannel = this.subchannelList[this.nextIndex];
|
76 | this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length;
|
77 | return {
|
78 | pickResultType: PickResultType.COMPLETE,
|
79 | subchannel: pickedSubchannel,
|
80 | status: null,
|
81 | extraFilterFactories: [],
|
82 | onCallStarted: null,
|
83 | };
|
84 | }
|
85 |
|
86 | |
87 |
|
88 |
|
89 |
|
90 |
|
91 | peekNextSubchannel(): SubchannelInterface {
|
92 | return this.subchannelList[this.nextIndex];
|
93 | }
|
94 | }
|
95 |
|
96 | interface ConnectivityStateCounts {
|
97 | [ConnectivityState.CONNECTING]: number;
|
98 | [ConnectivityState.IDLE]: number;
|
99 | [ConnectivityState.READY]: number;
|
100 | [ConnectivityState.SHUTDOWN]: number;
|
101 | [ConnectivityState.TRANSIENT_FAILURE]: number;
|
102 | }
|
103 |
|
104 | export class RoundRobinLoadBalancer implements LoadBalancer {
|
105 | private subchannels: SubchannelInterface[] = [];
|
106 |
|
107 | private currentState: ConnectivityState = ConnectivityState.IDLE;
|
108 |
|
109 | private subchannelStateListener: ConnectivityStateListener;
|
110 |
|
111 | private subchannelStateCounts: ConnectivityStateCounts;
|
112 |
|
113 | private currentReadyPicker: RoundRobinPicker | null = null;
|
114 |
|
115 | constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
116 | this.subchannelStateCounts = {
|
117 | [ConnectivityState.CONNECTING]: 0,
|
118 | [ConnectivityState.IDLE]: 0,
|
119 | [ConnectivityState.READY]: 0,
|
120 | [ConnectivityState.SHUTDOWN]: 0,
|
121 | [ConnectivityState.TRANSIENT_FAILURE]: 0,
|
122 | };
|
123 | this.subchannelStateListener = (
|
124 | subchannel: SubchannelInterface,
|
125 | previousState: ConnectivityState,
|
126 | newState: ConnectivityState
|
127 | ) => {
|
128 | this.subchannelStateCounts[previousState] -= 1;
|
129 | this.subchannelStateCounts[newState] += 1;
|
130 | this.calculateAndUpdateState();
|
131 |
|
132 | if (
|
133 | newState === ConnectivityState.TRANSIENT_FAILURE ||
|
134 | newState === ConnectivityState.IDLE
|
135 | ) {
|
136 | this.channelControlHelper.requestReresolution();
|
137 | subchannel.startConnecting();
|
138 | }
|
139 | };
|
140 | }
|
141 |
|
142 | private calculateAndUpdateState() {
|
143 | if (this.subchannelStateCounts[ConnectivityState.READY] > 0) {
|
144 | const readySubchannels = this.subchannels.filter(
|
145 | (subchannel) =>
|
146 | subchannel.getConnectivityState() === ConnectivityState.READY
|
147 | );
|
148 | let index = 0;
|
149 | if (this.currentReadyPicker !== null) {
|
150 | index = readySubchannels.indexOf(
|
151 | this.currentReadyPicker.peekNextSubchannel()
|
152 | );
|
153 | if (index < 0) {
|
154 | index = 0;
|
155 | }
|
156 | }
|
157 | this.updateState(
|
158 | ConnectivityState.READY,
|
159 | new RoundRobinPicker(readySubchannels, index)
|
160 | );
|
161 | } else if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) {
|
162 | this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
163 | } else if (
|
164 | this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > 0
|
165 | ) {
|
166 | this.updateState(
|
167 | ConnectivityState.TRANSIENT_FAILURE,
|
168 | new UnavailablePicker()
|
169 | );
|
170 | } else {
|
171 | this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
172 | }
|
173 | }
|
174 |
|
175 | private updateState(newState: ConnectivityState, picker: Picker) {
|
176 | trace(
|
177 | ConnectivityState[this.currentState] +
|
178 | ' -> ' +
|
179 | ConnectivityState[newState]
|
180 | );
|
181 | if (newState === ConnectivityState.READY) {
|
182 | this.currentReadyPicker = picker as RoundRobinPicker;
|
183 | } else {
|
184 | this.currentReadyPicker = null;
|
185 | }
|
186 | this.currentState = newState;
|
187 | this.channelControlHelper.updateState(newState, picker);
|
188 | }
|
189 |
|
190 | private resetSubchannelList() {
|
191 | for (const subchannel of this.subchannels) {
|
192 | subchannel.removeConnectivityStateListener(this.subchannelStateListener);
|
193 | subchannel.unref();
|
194 | this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
|
195 | }
|
196 | this.subchannelStateCounts = {
|
197 | [ConnectivityState.CONNECTING]: 0,
|
198 | [ConnectivityState.IDLE]: 0,
|
199 | [ConnectivityState.READY]: 0,
|
200 | [ConnectivityState.SHUTDOWN]: 0,
|
201 | [ConnectivityState.TRANSIENT_FAILURE]: 0,
|
202 | };
|
203 | this.subchannels = [];
|
204 | }
|
205 |
|
206 | updateAddressList(
|
207 | addressList: SubchannelAddress[],
|
208 | lbConfig: LoadBalancingConfig
|
209 | ): void {
|
210 | this.resetSubchannelList();
|
211 | trace(
|
212 | 'Connect to address list ' +
|
213 | addressList.map((address) => subchannelAddressToString(address))
|
214 | );
|
215 | this.subchannels = addressList.map((address) =>
|
216 | this.channelControlHelper.createSubchannel(address, {})
|
217 | );
|
218 | for (const subchannel of this.subchannels) {
|
219 | subchannel.ref();
|
220 | subchannel.addConnectivityStateListener(this.subchannelStateListener);
|
221 | this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
|
222 | const subchannelState = subchannel.getConnectivityState();
|
223 | this.subchannelStateCounts[subchannelState] += 1;
|
224 | if (
|
225 | subchannelState === ConnectivityState.IDLE ||
|
226 | subchannelState === ConnectivityState.TRANSIENT_FAILURE
|
227 | ) {
|
228 | subchannel.startConnecting();
|
229 | }
|
230 | }
|
231 | this.calculateAndUpdateState();
|
232 | }
|
233 |
|
234 | exitIdle(): void {
|
235 | for (const subchannel of this.subchannels) {
|
236 | subchannel.startConnecting();
|
237 | }
|
238 | }
|
239 | resetBackoff(): void {
|
240 | |
241 |
|
242 | }
|
243 | destroy(): void {
|
244 | this.resetSubchannelList();
|
245 | }
|
246 | getTypeName(): string {
|
247 | return TYPE_NAME;
|
248 | }
|
249 | }
|
250 |
|
251 | export function setup() {
|
252 | registerLoadBalancerType(
|
253 | TYPE_NAME,
|
254 | RoundRobinLoadBalancer,
|
255 | RoundRobinLoadBalancingConfig
|
256 | );
|
257 | }
|