UNPKG

27.6 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.ChannelImplementation = void 0;
20const call_stream_1 = require("./call-stream");
21const channel_credentials_1 = require("./channel-credentials");
22const resolving_load_balancer_1 = require("./resolving-load-balancer");
23const subchannel_pool_1 = require("./subchannel-pool");
24const picker_1 = require("./picker");
25const constants_1 = require("./constants");
26const filter_stack_1 = require("./filter-stack");
27const call_credentials_filter_1 = require("./call-credentials-filter");
28const deadline_filter_1 = require("./deadline-filter");
29const compression_filter_1 = require("./compression-filter");
30const resolver_1 = require("./resolver");
31const logging_1 = require("./logging");
32const max_message_size_filter_1 = require("./max-message-size-filter");
33const http_proxy_1 = require("./http_proxy");
34const uri_parser_1 = require("./uri-parser");
35const connectivity_state_1 = require("./connectivity-state");
36const channelz_1 = require("./channelz");
37/**
38 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
39 */
40const MAX_TIMEOUT_TIME = 2147483647;
41let nextCallNumber = 0;
42function getNewCallNumber() {
43 const callNumber = nextCallNumber;
44 nextCallNumber += 1;
45 if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
46 nextCallNumber = 0;
47 }
48 return callNumber;
49}
50class ChannelImplementation {
51 constructor(target, credentials, options) {
52 var _a, _b, _c;
53 this.credentials = credentials;
54 this.options = options;
55 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
56 this.currentPicker = new picker_1.UnavailablePicker();
57 /**
58 * Calls queued up to get a call config. Should only be populated before the
59 * first time the resolver returns a result, which includes the ConfigSelector.
60 */
61 this.configSelectionQueue = [];
62 this.pickQueue = [];
63 this.connectivityStateWatchers = [];
64 this.configSelector = null;
65 // Channelz info
66 this.channelzEnabled = true;
67 this.callTracker = new channelz_1.ChannelzCallTracker();
68 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
69 if (typeof target !== 'string') {
70 throw new TypeError('Channel target must be a string');
71 }
72 if (!(credentials instanceof channel_credentials_1.ChannelCredentials)) {
73 throw new TypeError('Channel credentials must be a ChannelCredentials object');
74 }
75 if (options) {
76 if (typeof options !== 'object') {
77 throw new TypeError('Channel options must be an object');
78 }
79 }
80 this.originalTarget = target;
81 const originalTargetUri = uri_parser_1.parseUri(target);
82 if (originalTargetUri === null) {
83 throw new Error(`Could not parse target name "${target}"`);
84 }
85 /* This ensures that the target has a scheme that is registered with the
86 * resolver */
87 const defaultSchemeMapResult = resolver_1.mapUriDefaultScheme(originalTargetUri);
88 if (defaultSchemeMapResult === null) {
89 throw new Error(`Could not find a default scheme for target name "${target}"`);
90 }
91 this.callRefTimer = setInterval(() => { }, MAX_TIMEOUT_TIME);
92 (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
93 if (this.options['grpc.enable_channelz'] === 0) {
94 this.channelzEnabled = false;
95 }
96 this.channelzTrace = new channelz_1.ChannelzTrace();
97 if (this.channelzEnabled) {
98 this.channelzRef = channelz_1.registerChannelzChannel(target, () => this.getChannelzInfo());
99 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
100 }
101 else {
102 // Dummy channelz ref that will never be used
103 this.channelzRef = {
104 kind: 'channel',
105 id: -1,
106 name: ''
107 };
108 }
109 if (this.options['grpc.default_authority']) {
110 this.defaultAuthority = this.options['grpc.default_authority'];
111 }
112 else {
113 this.defaultAuthority = resolver_1.getDefaultAuthority(defaultSchemeMapResult);
114 }
115 const proxyMapResult = http_proxy_1.mapProxyName(defaultSchemeMapResult, options);
116 this.target = proxyMapResult.target;
117 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
118 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
119 * the grpc.use_local_subchannel_pool channel option means. */
120 this.subchannelPool = subchannel_pool_1.getSubchannelPool(((_c = options['grpc.use_local_subchannel_pool']) !== null && _c !== void 0 ? _c : 0) === 0);
121 const channelControlHelper = {
122 createSubchannel: (subchannelAddress, subchannelArgs) => {
123 const subchannel = this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials);
124 if (this.channelzEnabled) {
125 this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
126 }
127 return subchannel;
128 },
129 updateState: (connectivityState, picker) => {
130 this.currentPicker = picker;
131 const queueCopy = this.pickQueue.slice();
132 this.pickQueue = [];
133 this.callRefTimerUnref();
134 for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
135 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
136 }
137 this.updateState(connectivityState);
138 },
139 requestReresolution: () => {
140 // This should never be called.
141 throw new Error('Resolving load balancer should never call requestReresolution');
142 },
143 addChannelzChild: (child) => {
144 if (this.channelzEnabled) {
145 this.childrenTracker.refChild(child);
146 }
147 },
148 removeChannelzChild: (child) => {
149 if (this.channelzEnabled) {
150 this.childrenTracker.unrefChild(child);
151 }
152 }
153 };
154 this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, options, (configSelector) => {
155 if (this.channelzEnabled) {
156 this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
157 }
158 this.configSelector = configSelector;
159 /* We process the queue asynchronously to ensure that the corresponding
160 * load balancer update has completed. */
161 process.nextTick(() => {
162 const localQueue = this.configSelectionQueue;
163 this.configSelectionQueue = [];
164 this.callRefTimerUnref();
165 for (const { callStream, callMetadata } of localQueue) {
166 this.tryGetConfig(callStream, callMetadata);
167 }
168 this.configSelectionQueue = [];
169 });
170 }, (status) => {
171 if (this.channelzEnabled) {
172 this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
173 }
174 if (this.configSelectionQueue.length > 0) {
175 this.trace('Name resolution failed with calls queued for config selection');
176 }
177 const localQueue = this.configSelectionQueue;
178 this.configSelectionQueue = [];
179 this.callRefTimerUnref();
180 for (const { callStream, callMetadata } of localQueue) {
181 if (callMetadata.getOptions().waitForReady) {
182 this.callRefTimerRef();
183 this.configSelectionQueue.push({ callStream, callMetadata });
184 }
185 else {
186 callStream.cancelWithStatus(status.code, status.details);
187 }
188 }
189 });
190 this.filterStackFactory = new filter_stack_1.FilterStackFactory([
191 new call_credentials_filter_1.CallCredentialsFilterFactory(this),
192 new deadline_filter_1.DeadlineFilterFactory(this),
193 new max_message_size_filter_1.MaxMessageSizeFilterFactory(this.options),
194 new compression_filter_1.CompressionFilterFactory(this, this.options),
195 ]);
196 this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
197 }
198 getChannelzInfo() {
199 return {
200 target: this.originalTarget,
201 state: this.connectivityState,
202 trace: this.channelzTrace,
203 callTracker: this.callTracker,
204 children: this.childrenTracker.getChildLists()
205 };
206 }
207 trace(text, verbosityOverride) {
208 logging_1.trace(verbosityOverride !== null && verbosityOverride !== void 0 ? verbosityOverride : constants_1.LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uri_parser_1.uriToString(this.target) + ' ' + text);
209 }
210 callRefTimerRef() {
211 var _a, _b, _c, _d;
212 // If the hasRef function does not exist, always run the code
213 if (!((_b = (_a = this.callRefTimer).hasRef) === null || _b === void 0 ? void 0 : _b.call(_a))) {
214 this.trace('callRefTimer.ref | configSelectionQueue.length=' +
215 this.configSelectionQueue.length +
216 ' pickQueue.length=' +
217 this.pickQueue.length);
218 (_d = (_c = this.callRefTimer).ref) === null || _d === void 0 ? void 0 : _d.call(_c);
219 }
220 }
221 callRefTimerUnref() {
222 var _a, _b;
223 // If the hasRef function does not exist, always run the code
224 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
225 this.trace('callRefTimer.unref | configSelectionQueue.length=' +
226 this.configSelectionQueue.length +
227 ' pickQueue.length=' +
228 this.pickQueue.length);
229 (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
230 }
231 }
232 pushPick(callStream, callMetadata, callConfig, dynamicFilters) {
233 this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
234 this.callRefTimerRef();
235 }
236 /**
237 * Check the picker output for the given call and corresponding metadata,
238 * and take any relevant actions. Should not be called while iterating
239 * over pickQueue.
240 * @param callStream
241 * @param callMetadata
242 */
243 tryPick(callStream, callMetadata, callConfig, dynamicFilters) {
244 var _a, _b, _c;
245 const pickResult = this.currentPicker.pick({
246 metadata: callMetadata,
247 extraPickInfo: callConfig.pickInformation,
248 });
249 this.trace('Pick result: ' +
250 picker_1.PickResultType[pickResult.pickResultType] +
251 ' subchannel: ' + ((_a = pickResult.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) +
252 ' status: ' + ((_b = pickResult.status) === null || _b === void 0 ? void 0 : _b.code) +
253 ' ' + ((_c = pickResult.status) === null || _c === void 0 ? void 0 : _c.details));
254 switch (pickResult.pickResultType) {
255 case picker_1.PickResultType.COMPLETE:
256 if (pickResult.subchannel === null) {
257 callStream.cancelWithStatus(constants_1.Status.UNAVAILABLE, 'Request dropped by load balancing policy');
258 // End the call with an error
259 }
260 else {
261 /* If the subchannel is not in the READY state, that indicates a bug
262 * somewhere in the load balancer or picker. So, we log an error and
263 * queue the pick to be tried again later. */
264 if (pickResult.subchannel.getConnectivityState() !==
265 connectivity_state_1.ConnectivityState.READY) {
266 logging_1.log(constants_1.LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' +
267 pickResult.subchannel.getAddress() +
268 ' has state ' +
269 connectivity_state_1.ConnectivityState[pickResult.subchannel.getConnectivityState()]);
270 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
271 break;
272 }
273 /* We need to clone the callMetadata here because the transparent
274 * retry code in the promise resolution handler use the same
275 * callMetadata object, so it needs to stay unmodified */
276 callStream.filterStack
277 .sendMetadata(Promise.resolve(callMetadata.clone()))
278 .then((finalMetadata) => {
279 var _a, _b;
280 const subchannelState = pickResult.subchannel.getConnectivityState();
281 if (subchannelState === connectivity_state_1.ConnectivityState.READY) {
282 try {
283 const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
284 pickResult.subchannel.startCallStream(finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters]);
285 /* If we reach this point, the call stream has started
286 * successfully */
287 (_a = callConfig.onCommitted) === null || _a === void 0 ? void 0 : _a.call(callConfig);
288 (_b = pickResult.onCallStarted) === null || _b === void 0 ? void 0 : _b.call(pickResult);
289 }
290 catch (error) {
291 if (error.code ===
292 'ERR_HTTP2_GOAWAY_SESSION') {
293 /* An error here indicates that something went wrong with
294 * the picked subchannel's http2 stream right before we
295 * tried to start the stream. We are handling a promise
296 * result here, so this is asynchronous with respect to the
297 * original tryPick call, so calling it again is not
298 * recursive. We call tryPick immediately instead of
299 * queueing this pick again because handling the queue is
300 * triggered by state changes, and we want to immediately
301 * check if the state has already changed since the
302 * previous tryPick call. We do this instead of cancelling
303 * the stream because the correct behavior may be
304 * re-queueing instead, based on the logic in the rest of
305 * tryPick */
306 this.trace('Failed to start call on picked subchannel ' +
307 pickResult.subchannel.getAddress() +
308 ' with error ' +
309 error.message +
310 '. Retrying pick', constants_1.LogVerbosity.INFO);
311 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
312 }
313 else {
314 this.trace('Failed to start call on picked subchanel ' +
315 pickResult.subchannel.getAddress() +
316 ' with error ' +
317 error.message +
318 '. Ending call', constants_1.LogVerbosity.INFO);
319 callStream.cancelWithStatus(constants_1.Status.INTERNAL, `Failed to start HTTP/2 stream with error: ${error.message}`);
320 }
321 }
322 }
323 else {
324 /* The logic for doing this here is the same as in the catch
325 * block above */
326 this.trace('Picked subchannel ' +
327 pickResult.subchannel.getAddress() +
328 ' has state ' +
329 connectivity_state_1.ConnectivityState[subchannelState] +
330 ' after metadata filters. Retrying pick', constants_1.LogVerbosity.INFO);
331 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
332 }
333 }, (error) => {
334 // We assume the error code isn't 0 (Status.OK)
335 callStream.cancelWithStatus(typeof error.code === 'number' ? error.code : constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`);
336 });
337 }
338 break;
339 case picker_1.PickResultType.QUEUE:
340 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
341 break;
342 case picker_1.PickResultType.TRANSIENT_FAILURE:
343 if (callMetadata.getOptions().waitForReady) {
344 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
345 }
346 else {
347 callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details);
348 }
349 break;
350 case picker_1.PickResultType.DROP:
351 callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details);
352 break;
353 default:
354 throw new Error(`Invalid state: unknown pickResultType ${pickResult.pickResultType}`);
355 }
356 }
357 removeConnectivityStateWatcher(watcherObject) {
358 const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject);
359 if (watcherIndex >= 0) {
360 this.connectivityStateWatchers.splice(watcherIndex, 1);
361 }
362 }
363 updateState(newState) {
364 logging_1.trace(constants_1.LogVerbosity.DEBUG, 'connectivity_state', '(' + this.channelzRef.id + ') ' +
365 uri_parser_1.uriToString(this.target) +
366 ' ' +
367 connectivity_state_1.ConnectivityState[this.connectivityState] +
368 ' -> ' +
369 connectivity_state_1.ConnectivityState[newState]);
370 if (this.channelzEnabled) {
371 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
372 }
373 this.connectivityState = newState;
374 const watchersCopy = this.connectivityStateWatchers.slice();
375 for (const watcherObject of watchersCopy) {
376 if (newState !== watcherObject.currentState) {
377 if (watcherObject.timer) {
378 clearTimeout(watcherObject.timer);
379 }
380 this.removeConnectivityStateWatcher(watcherObject);
381 watcherObject.callback();
382 }
383 }
384 }
385 tryGetConfig(stream, metadata) {
386 if (stream.getStatus() !== null) {
387 /* If the stream has a status, it has already finished and we don't need
388 * to take any more actions on it. */
389 return;
390 }
391 if (this.configSelector === null) {
392 /* This branch will only be taken at the beginning of the channel's life,
393 * before the resolver ever returns a result. So, the
394 * ResolvingLoadBalancer may be idle and if so it needs to be kicked
395 * because it now has a pending request. */
396 this.resolvingLoadBalancer.exitIdle();
397 this.configSelectionQueue.push({
398 callStream: stream,
399 callMetadata: metadata,
400 });
401 this.callRefTimerRef();
402 }
403 else {
404 const callConfig = this.configSelector(stream.getMethod(), metadata);
405 if (callConfig.status === constants_1.Status.OK) {
406 if (callConfig.methodConfig.timeout) {
407 const deadline = new Date();
408 deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds);
409 deadline.setMilliseconds(deadline.getMilliseconds() +
410 callConfig.methodConfig.timeout.nanos / 1000000);
411 stream.setConfigDeadline(deadline);
412 // Refreshing the filters makes the deadline filter pick up the new deadline
413 stream.filterStack.refresh();
414 }
415 if (callConfig.dynamicFilterFactories.length > 0) {
416 /* These dynamicFilters are the mechanism for implementing gRFC A39:
417 * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
418 * We run them here instead of with the rest of the filters because
419 * that spec says "the xDS HTTP filters will run in between name
420 * resolution and load balancing".
421 *
422 * We use the filter stack here to simplify the multi-filter async
423 * waterfall logic, but we pass along the underlying list of filters
424 * to avoid having nested filter stacks when combining it with the
425 * original filter stack. We do not pass along the original filter
426 * factory list because these filters may need to persist data
427 * between sending headers and other operations. */
428 const dynamicFilterStackFactory = new filter_stack_1.FilterStackFactory(callConfig.dynamicFilterFactories);
429 const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
430 dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
431 this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
432 });
433 }
434 else {
435 this.tryPick(stream, metadata, callConfig, []);
436 }
437 }
438 else {
439 stream.cancelWithStatus(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
440 }
441 }
442 }
443 _startCallStream(stream, metadata) {
444 this.tryGetConfig(stream, metadata.clone());
445 }
446 close() {
447 this.resolvingLoadBalancer.destroy();
448 this.updateState(connectivity_state_1.ConnectivityState.SHUTDOWN);
449 clearInterval(this.callRefTimer);
450 if (this.channelzEnabled) {
451 channelz_1.unregisterChannelzRef(this.channelzRef);
452 }
453 this.subchannelPool.unrefUnusedSubchannels();
454 }
455 getTarget() {
456 return uri_parser_1.uriToString(this.target);
457 }
458 getConnectivityState(tryToConnect) {
459 const connectivityState = this.connectivityState;
460 if (tryToConnect) {
461 this.resolvingLoadBalancer.exitIdle();
462 }
463 return connectivityState;
464 }
465 watchConnectivityState(currentState, deadline, callback) {
466 if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) {
467 throw new Error('Channel has been shut down');
468 }
469 let timer = null;
470 if (deadline !== Infinity) {
471 const deadlineDate = deadline instanceof Date ? deadline : new Date(deadline);
472 const now = new Date();
473 if (deadline === -Infinity || deadlineDate <= now) {
474 process.nextTick(callback, new Error('Deadline passed without connectivity state change'));
475 return;
476 }
477 timer = setTimeout(() => {
478 this.removeConnectivityStateWatcher(watcherObject);
479 callback(new Error('Deadline passed without connectivity state change'));
480 }, deadlineDate.getTime() - now.getTime());
481 }
482 const watcherObject = {
483 currentState,
484 callback,
485 timer,
486 };
487 this.connectivityStateWatchers.push(watcherObject);
488 }
489 /**
490 * Get the channelz reference object for this channel. The returned value is
491 * garbage if channelz is disabled for this channel.
492 * @returns
493 */
494 getChannelzRef() {
495 return this.channelzRef;
496 }
497 createCall(method, deadline, host, parentCall, propagateFlags) {
498 if (typeof method !== 'string') {
499 throw new TypeError('Channel#createCall: method must be a string');
500 }
501 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
502 throw new TypeError('Channel#createCall: deadline must be a number or Date');
503 }
504 if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) {
505 throw new Error('Channel has been shut down');
506 }
507 const callNumber = getNewCallNumber();
508 this.trace('createCall [' +
509 callNumber +
510 '] method="' +
511 method +
512 '", deadline=' +
513 deadline);
514 const finalOptions = {
515 deadline: deadline,
516 flags: propagateFlags !== null && propagateFlags !== void 0 ? propagateFlags : constants_1.Propagate.DEFAULTS,
517 host: host !== null && host !== void 0 ? host : this.defaultAuthority,
518 parentCall: parentCall,
519 };
520 const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber);
521 if (this.channelzEnabled) {
522 this.callTracker.addCallStarted();
523 stream.addStatusWatcher(status => {
524 if (status.code === constants_1.Status.OK) {
525 this.callTracker.addCallSucceeded();
526 }
527 else {
528 this.callTracker.addCallFailed();
529 }
530 });
531 }
532 return stream;
533 }
534}
535exports.ChannelImplementation = ChannelImplementation;
536//# sourceMappingURL=channel.js.map
\No newline at end of file